provider/aws: Added support for redshift destination to firehose delivery streams (supersedes #5304) (#7375)

* Added support for redshift destination to firehose delivery streams

* Small documentation fix

* go fmt after rebase

* small fixes after rebase

* provider/aws: Firehose test cleanups

* provider/aws: Update docs

* Convert Redshift and S3 blocks to TypeList

* provider/aws: Add migration for S3 Configuration in Kinesis firehose

* providers/aws: Safety first when building Redshift config options

* restore commented out log statements in the migration

* provider/aws: use MaxItems in schema
This commit is contained in:
Clint 2016-06-30 15:03:31 -05:00 committed by GitHub
parent 2ba1b0fb01
commit 14fa3a88fd
5 changed files with 683 additions and 268 deletions

View File

@ -20,6 +20,8 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Update: resourceAwsKinesisFirehoseDeliveryStreamUpdate,
Delete: resourceAwsKinesisFirehoseDeliveryStreamDelete,
SchemaVersion: 1,
MigrateState: resourceAwsKinesisFirehoseMigrateState,
Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
@ -37,37 +39,132 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},
// elements removed in v0.7.0
"role_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
Removed: "role_arn has been removed. Use a s3_configuration block instead. See https://terraform.io/docs/providers/aws/r/kinesis_firehose_delivery_stream.html",
},
"s3_bucket_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
Removed: "s3_bucket_arn has been removed. Use a s3_configuration block instead. See https://terraform.io/docs/providers/aws/r/kinesis_firehose_delivery_stream.html",
},
"s3_prefix": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Removed: "s3_prefix has been removed. Use a s3_configuration block instead. See https://terraform.io/docs/providers/aws/r/kinesis_firehose_delivery_stream.html",
},
"s3_buffer_size": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 5,
Removed: "s3_buffer_size has been removed. Use a s3_configuration block instead. See https://terraform.io/docs/providers/aws/r/kinesis_firehose_delivery_stream.html",
},
"s3_buffer_interval": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 300,
Removed: "s3_buffer_interval has been removed. Use a s3_configuration block instead. See https://terraform.io/docs/providers/aws/r/kinesis_firehose_delivery_stream.html",
},
"s3_data_compression": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
Removed: "s3_data_compression has been removed. Use a s3_configuration block instead. See https://terraform.io/docs/providers/aws/r/kinesis_firehose_delivery_stream.html",
},
"s3_configuration": &schema.Schema{
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"buffer_size": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 5,
},
"buffer_interval": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 300,
},
"compression_format": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},
"kms_key_arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"role_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"prefix": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
},
},
},
"redshift_configuration": &schema.Schema{
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"cluster_jdbcurl": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"username": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"password": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"role_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"copy_options": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"data_table_columns": &schema.Schema{
Type: schema.TypeString,
Optional: true,
},
"data_table_name": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
},
},
},
"arn": &schema.Schema{
@ -91,49 +188,168 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
}
}
func validateConfiguration(d *schema.ResourceData) error {
destination := d.Get("destination").(string)
if destination != "s3" && destination != "redshift" {
return fmt.Errorf("[ERROR] Destination must be s3 or redshift")
}
return nil
}
func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})
return &firehose.S3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}
}
func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})
return &firehose.S3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}
}
func extractEncryptionConfiguration(s3 map[string]interface{}) *firehose.EncryptionConfiguration {
if key, ok := s3["kms_key_arn"]; ok && len(key.(string)) > 0 {
return &firehose.EncryptionConfiguration{
KMSEncryptionConfig: &firehose.KMSEncryptionConfig{
AWSKMSKeyARN: aws.String(key.(string)),
},
}
}
return &firehose.EncryptionConfiguration{
NoEncryptionConfig: aws.String("NoEncryption"),
}
}
func extractPrefixConfiguration(s3 map[string]interface{}) *string {
if v, ok := s3["prefix"]; ok {
return aws.String(v.(string))
}
return nil
}
func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.RedshiftDestinationConfiguration, error) {
redshiftRaw, ok := d.GetOk("redshift_configuration")
if !ok {
return nil, fmt.Errorf("[ERR] Error loading Redshift Configuration for Kinesis Firehose: redshift_configuration not found")
}
rl := redshiftRaw.([]interface{})
redshift := rl[0].(map[string]interface{})
return &firehose.RedshiftDestinationConfiguration{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Configuration: s3Config,
}, nil
}
func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.RedshiftDestinationUpdate, error) {
redshiftRaw, ok := d.GetOk("redshift_configuration")
if !ok {
return nil, fmt.Errorf("[ERR] Error loading Redshift Configuration for Kinesis Firehose: redshift_configuration not found")
}
rl := redshiftRaw.([]interface{})
redshift := rl[0].(map[string]interface{})
return &firehose.RedshiftDestinationUpdate{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Update: s3Update,
}, nil
}
func extractCopyCommandConfiguration(redshift map[string]interface{}) *firehose.CopyCommand {
cmd := &firehose.CopyCommand{
DataTableName: aws.String(redshift["data_table_name"].(string)),
}
if copyOptions, ok := redshift["copy_options"]; ok {
cmd.CopyOptions = aws.String(copyOptions.(string))
}
if columns, ok := redshift["data_table_columns"]; ok {
cmd.DataTableColumns = aws.String(columns.(string))
}
return cmd
}
func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if d.Get("destination").(string) != "s3" {
return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation")
if err := validateConfiguration(d); err != nil {
return err
}
sn := d.Get("name").(string)
input := &firehose.CreateDeliveryStreamInput{
s3Config := createS3Config(d)
createInput := &firehose.CreateDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
s3Config := &firehose.S3DestinationConfiguration{
BucketARN: aws.String(d.Get("s3_bucket_arn").(string)),
RoleARN: aws.String(d.Get("role_arn").(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(d.Get("s3_buffer_interval").(int))),
SizeInMBs: aws.Int64(int64(d.Get("s3_buffer_size").(int))),
},
CompressionFormat: aws.String(d.Get("s3_data_compression").(string)),
}
if v, ok := d.GetOk("s3_prefix"); ok {
s3Config.Prefix = aws.String(v.(string))
}
input.S3DestinationConfiguration = s3Config
var err error
for i := 0; i < 5; i++ {
_, err := conn.CreateDeliveryStream(input)
if awsErr, ok := err.(awserr.Error); ok {
// IAM roles can take ~10 seconds to propagate in AWS:
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#launch-instance-with-role-console
if awsErr.Code() == "InvalidArgumentException" && strings.Contains(awsErr.Message(), "Firehose is unable to assume role") {
log.Printf("[DEBUG] Firehose could not assume role referenced, retrying...")
time.Sleep(2 * time.Second)
continue
}
if d.Get("destination").(string) == "s3" {
createInput.S3DestinationConfiguration = s3Config
} else {
rc, err := createRedshiftConfig(d, s3Config)
if err != nil {
return err
}
break
createInput.RedshiftDestinationConfiguration = rc
}
var lastError error
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
_, err := conn.CreateDeliveryStream(createInput)
if err != nil {
log.Printf("[DEBUG] Error creating Firehose Delivery Stream: %s", err)
lastError = err
if awsErr, ok := err.(awserr.Error); ok {
// IAM roles can take ~10 seconds to propagate in AWS:
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#launch-instance-with-role-console
if awsErr.Code() == "InvalidArgumentException" && strings.Contains(awsErr.Message(), "Firehose is unable to assume role") {
log.Printf("[DEBUG] Firehose could not assume role referenced, retrying...")
return resource.RetryableError(awsErr)
}
}
// Not retryable
return resource.NonRetryableError(err)
}
return nil
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr, ok := lastError.(awserr.Error); ok {
return fmt.Errorf("[WARN] Error creating Kinesis Firehose Delivery Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code())
}
return err
@ -165,45 +381,30 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if d.Get("destination").(string) != "s3" {
return fmt.Errorf("[ERROR] AWS Kinesis Firehose only supports S3 destinations for the first implementation")
if err := validateConfiguration(d); err != nil {
return err
}
sn := d.Get("name").(string)
s3_config := &firehose.S3DestinationUpdate{}
s3Config := updateS3Config(d)
if d.HasChange("role_arn") {
s3_config.RoleARN = aws.String(d.Get("role_arn").(string))
}
if d.HasChange("s3_bucket_arn") {
s3_config.BucketARN = aws.String(d.Get("s3_bucket_arn").(string))
}
if d.HasChange("s3_prefix") {
s3_config.Prefix = aws.String(d.Get("s3_prefix").(string))
}
if d.HasChange("s3_data_compression") {
s3_config.CompressionFormat = aws.String(d.Get("s3_data_compression").(string))
}
if d.HasChange("s3_buffer_interval") || d.HasChange("s3_buffer_size") {
bufferingHints := &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(d.Get("s3_buffer_interval").(int))),
SizeInMBs: aws.Int64(int64(d.Get("s3_buffer_size").(int))),
}
s3_config.BufferingHints = bufferingHints
}
destOpts := &firehose.UpdateDestinationInput{
updateInput := &firehose.UpdateDestinationInput{
DeliveryStreamName: aws.String(sn),
CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)),
DestinationId: aws.String(d.Get("destination_id").(string)),
S3DestinationUpdate: s3_config,
}
_, err := conn.UpdateDestination(destOpts)
if d.Get("destination").(string) == "s3" {
updateInput.S3DestinationUpdate = s3Config
} else {
rc, err := updateRedshiftConfig(d, s3Config)
if err != nil {
return err
}
updateInput.RedshiftDestinationUpdate = rc
}
_, err := conn.UpdateDestination(updateInput)
if err != nil {
return fmt.Errorf(
"Error Updating Kinesis Firehose Delivery Stream: \"%s\"\n%s",
@ -215,11 +416,11 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
sn := d.Get("name").(string)
describeOpts := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
resp, err := conn.DescribeDeliveryStream(&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(d.Get("name").(string)),
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {

View File

@ -0,0 +1,59 @@
package aws
import (
"fmt"
"log"
"github.com/hashicorp/terraform/terraform"
)
func resourceAwsKinesisFirehoseMigrateState(
v int, is *terraform.InstanceState, meta interface{}) (*terraform.InstanceState, error) {
switch v {
case 0:
log.Println("[INFO] Found AWS Kinesis Firehose Delivery Stream State v0; migrating to v1")
return migrateKinesisFirehoseV0toV1(is)
default:
return is, fmt.Errorf("Unexpected schema version: %d", v)
}
}
func migrateKinesisFirehoseV0toV1(is *terraform.InstanceState) (*terraform.InstanceState, error) {
if is.Empty() {
log.Println("[DEBUG] Empty Kinesis Firehose Delivery State; nothing to migrate.")
return is, nil
}
log.Printf("[DEBUG] Attributes before migration: %#v", is.Attributes)
// migrate flate S3 configuration to a s3_configuration block
// grab initial values
is.Attributes["s3_configuration.#"] = "1"
// Required parameters
is.Attributes["s3_configuration.0.role_arn"] = is.Attributes["role_arn"]
is.Attributes["s3_configuration.0.bucket_arn"] = is.Attributes["s3_bucket_arn"]
// Optional parameters
if is.Attributes["s3_buffer_size"] != "" {
is.Attributes["s3_configuration.0.buffer_size"] = is.Attributes["s3_buffer_size"]
}
if is.Attributes["s3_data_compression"] != "" {
is.Attributes["s3_configuration.0.compression_format"] = is.Attributes["s3_data_compression"]
}
if is.Attributes["s3_buffer_interval"] != "" {
is.Attributes["s3_configuration.0.buffer_interval"] = is.Attributes["s3_buffer_interval"]
}
if is.Attributes["s3_prefix"] != "" {
is.Attributes["s3_configuration.0.prefix"] = is.Attributes["s3_prefix"]
}
delete(is.Attributes, "role_arn")
delete(is.Attributes, "s3_bucket_arn")
delete(is.Attributes, "s3_buffer_size")
delete(is.Attributes, "s3_data_compression")
delete(is.Attributes, "s3_buffer_interval")
delete(is.Attributes, "s3_prefix")
log.Printf("[DEBUG] Attributes after migration: %#v", is.Attributes)
return is, nil
}

View File

@ -0,0 +1,93 @@
package aws
import (
"testing"
"github.com/hashicorp/terraform/terraform"
)
func TestAWSKinesisFirehoseMigrateState(t *testing.T) {
cases := map[string]struct {
StateVersion int
Attributes map[string]string
Expected map[string]string
Meta interface{}
}{
"v0.6.16 and earlier": {
StateVersion: 0,
Attributes: map[string]string{
// EBS
"role_arn": "arn:aws:iam::somenumber:role/tf_acctest_4271506651559170635",
"s3_bucket_arn": "arn:aws:s3:::tf-test-bucket",
"s3_buffer_interval": "400",
"s3_buffer_size": "10",
"s3_data_compression": "GZIP",
},
Expected: map[string]string{
"s3_configuration.#": "1",
"s3_configuration.0.bucket_arn": "arn:aws:s3:::tf-test-bucket",
"s3_configuration.0.buffer_interval": "400",
"s3_configuration.0.buffer_size": "10",
"s3_configuration.0.compression_format": "GZIP",
"s3_configuration.0.role_arn": "arn:aws:iam::somenumber:role/tf_acctest_4271506651559170635",
},
},
"v0.6.16 and earlier, sparse": {
StateVersion: 0,
Attributes: map[string]string{
// EBS
"role_arn": "arn:aws:iam::somenumber:role/tf_acctest_4271506651559170635",
"s3_bucket_arn": "arn:aws:s3:::tf-test-bucket",
},
Expected: map[string]string{
"s3_configuration.#": "1",
"s3_configuration.0.bucket_arn": "arn:aws:s3:::tf-test-bucket",
"s3_configuration.0.role_arn": "arn:aws:iam::somenumber:role/tf_acctest_4271506651559170635",
},
},
}
for tn, tc := range cases {
is := &terraform.InstanceState{
ID: "i-abc123",
Attributes: tc.Attributes,
}
is, err := resourceAwsKinesisFirehoseMigrateState(
tc.StateVersion, is, tc.Meta)
if err != nil {
t.Fatalf("bad: %s, err: %#v", tn, err)
}
for k, v := range tc.Expected {
if is.Attributes[k] != v {
t.Fatalf(
"bad: %s\n\n expected: %#v -> %#v\n got: %#v -> %#v\n in: %#v",
tn, k, v, k, is.Attributes[k], is.Attributes)
}
}
}
}
func TestAWSKinesisFirehoseMigrateState_empty(t *testing.T) {
var is *terraform.InstanceState
var meta interface{}
// should handle nil
is, err := resourceAwsKinesisFirehoseMigrateState(0, is, meta)
if err != nil {
t.Fatalf("err: %#v", err)
}
if is != nil {
t.Fatalf("expected nil instancestate, got: %#v", is)
}
// should handle non-nil but empty
is = &terraform.InstanceState{}
is, err = resourceAwsInstanceMigrateState(0, is, meta)
if err != nil {
t.Fatalf("err: %#v", err)
}
}

View File

@ -3,31 +3,25 @@ package aws
import (
"fmt"
"log"
"math/rand"
"os"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)
func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) {
func TestAccAWSKinesisFirehoseDeliveryStream_s3basic(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_basic,
os.Getenv("AWS_ACCOUNT_ID"), ri, ri)
ri := acctest.RandInt()
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3basic,
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri)
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
if os.Getenv("AWS_ACCOUNT_ID") == "" {
t.Fatal("AWS_ACCOUNT_ID must be set")
}
},
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
@ -35,7 +29,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) {
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil),
),
},
},
@ -45,33 +39,29 @@ func TestAccAWSKinesisFirehoseDeliveryStream_basic(t *testing.T) {
func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
preconfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3,
os.Getenv("AWS_ACCOUNT_ID"), ri, ri)
ri := acctest.RandInt()
preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3basic,
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3Updates,
os.Getenv("AWS_ACCOUNT_ID"), ri, ri)
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri)
updatedS3DestinationConfig := &firehose.S3DestinationDescription{
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(400),
SizeInMBs: aws.Int64(10),
},
}
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
if os.Getenv("AWS_ACCOUNT_ID") == "" {
t.Fatal("AWS_ACCOUNT_ID must be set")
}
},
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: preconfig,
Config: preConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "5"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "300"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "UNCOMPRESSED"),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil),
),
},
@ -79,13 +69,46 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_size", "10"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_buffer_interval", "400"),
resource.TestCheckResourceAttr(
"aws_kinesis_firehose_delivery_stream.test_stream", "s3_data_compression", "GZIP"),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, updatedS3DestinationConfig, nil),
),
},
},
})
}
func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_RedshiftBasic,
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_RedshiftUpdates,
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri, ri)
updatedRedshiftConfig := &firehose.RedshiftDestinationDescription{
CopyCommand: &firehose.CopyCommand{
CopyOptions: aws.String("GZIP"),
},
}
resource.Test(t, resource.TestCase{
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: preConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil),
),
},
resource.TestStep{
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, updatedRedshiftConfig),
),
},
},
@ -119,7 +142,9 @@ func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose.
}
}
func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription) resource.TestCheckFunc {
func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription, s3config interface{}, redshiftConfig interface{}) resource.TestCheckFunc {
// *firehose.RedshiftDestinationDescription
// *firehose.S3DestinationDescription
return func(s *terraform.State) error {
if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose") {
return fmt.Errorf("Bad Stream name: %s", *stream.DeliveryStreamName)
@ -131,6 +156,43 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
if *stream.DeliveryStreamARN != rs.Primary.Attributes["arn"] {
return fmt.Errorf("Bad Delivery Stream ARN\n\t expected: %s\n\tgot: %s\n", rs.Primary.Attributes["arn"], *stream.DeliveryStreamARN)
}
if s3config != nil {
s := s3config.(*firehose.S3DestinationDescription)
// Range over the Stream Destinations, looking for the matching S3
// destination. For simplicity, our test only have a single S3 or
// Redshift destination, so at this time it's safe to match on the first
// one
var match bool
for _, d := range stream.Destinations {
if d.S3DestinationDescription != nil {
if *d.S3DestinationDescription.BufferingHints.SizeInMBs == *s.BufferingHints.SizeInMBs {
match = true
}
}
}
if !match {
return fmt.Errorf("Mismatch s3 buffer size, expected: %s, got: %s", s, stream.Destinations)
}
}
if redshiftConfig != nil {
r := redshiftConfig.(*firehose.RedshiftDestinationDescription)
// Range over the Stream Destinations, looking for the matching Redshift
// destination
var match bool
for _, d := range stream.Destinations {
if d.RedshiftDestinationDescription != nil {
if *d.RedshiftDestinationDescription.CopyCommand.CopyOptions == *r.CopyCommand.CopyOptions {
match = true
}
}
}
if !match {
return fmt.Errorf("Mismatch Redshift CopyOptions, expected: %s, got: %s", r, stream.Destinations)
}
}
}
return nil
}
@ -159,10 +221,19 @@ func testAccCheckKinesisFirehoseDeliveryStreamDestroy(s *terraform.State) error
return nil
}
var testAccKinesisFirehoseDeliveryStreamConfig_basic = `
func testAccKinesisFirehosePreCheck(t *testing.T) func() {
return func() {
testAccPreCheck(t)
if os.Getenv("AWS_ACCOUNT_ID") == "" {
t.Fatal("AWS_ACCOUNT_ID must be set")
}
}
}
const testAccKinesisFirehoseDeliveryStreamBaseConfig = `
resource "aws_iam_role" "firehose" {
name = "terraform_acctest_firehose_delivery_role"
assume_role_policy = <<EOF
name = "tf_acctest_firehose_delivery_role_%d"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
@ -185,14 +256,14 @@ EOF
}
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
bucket = "tf-test-bucket-%d"
acl = "private"
}
resource "aws_iam_role_policy" "firehose" {
name = "terraform_acctest_firehose_delivery_policy"
role = "${aws_iam_role.firehose.id}"
policy = <<EOF
name = "tf_acctest_firehose_delivery_policy_%d"
role = "${aws_iam_role.firehose.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
@ -217,145 +288,80 @@ resource "aws_iam_role_policy" "firehose" {
EOF
}
`
var testAccKinesisFirehoseDeliveryStreamConfig_s3basic = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
name = "terraform-kinesis-firehose-basictest-%d"
destination = "s3"
role_arn = "${aws_iam_role.firehose.arn}"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
depends_on = ["aws_iam_role_policy.firehose"]
name = "terraform-kinesis-firehose-basictest-%d"
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
}`
var testAccKinesisFirehoseDeliveryStreamConfig_s3 = `
resource "aws_iam_role" "firehose" {
name = "terraform_acctest_firehose_delivery_role_s3"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "%s"
}
}
}
]
}
EOF
}
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}
resource "aws_iam_role_policy" "firehose" {
name = "terraform_acctest_firehose_delivery_policy_s3"
role = "${aws_iam_role.firehose.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::${aws_s3_bucket.bucket.id}",
"arn:aws:s3:::${aws_s3_bucket.bucket.id}/*"
]
}
]
}
EOF
}
var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
role_arn = "${aws_iam_role.firehose.arn}"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
depends_on = ["aws_iam_role_policy.firehose"]
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 10
buffer_interval = 400
compression_format = "GZIP"
}
}`
var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = `
resource "aws_iam_role" "firehose" {
name = "terraform_acctest_firehose_delivery_role_s3"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "%s"
}
}
}
]
}
EOF
}
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}
resource "aws_iam_role_policy" "firehose" {
name = "terraform_acctest_firehose_delivery_policy_s3"
role = "${aws_iam_role.firehose.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::${aws_s3_bucket.bucket.id}",
"arn:aws:s3:::${aws_s3_bucket.bucket.id}/*"
]
}
]
}
EOF
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
name = "terraform-kinesis-firehose-s3test-%d"
destination = "s3"
role_arn = "${aws_iam_role.firehose.arn}"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
s3_buffer_size = 10
s3_buffer_interval = 400
s3_data_compression = "GZIP"
var testAccKinesisFirehoseDeliveryStreamBaseRedshiftConfig = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_redshift_cluster" "test_cluster" {
cluster_identifier = "tf-redshift-cluster-%d"
database_name = "test"
master_username = "testuser"
master_password = "T3stPass"
node_type = "dc1.large"
cluster_type = "single-node"
}`
var testAccKinesisFirehoseDeliveryStreamConfig_RedshiftBasic = testAccKinesisFirehoseDeliveryStreamBaseRedshiftConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose", "aws_redshift_cluster.test_cluster"]
name = "terraform-kinesis-firehose-basicredshifttest-%d"
destination = "redshift"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
redshift_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}"
username = "testuser"
password = "T3stPass"
data_table_name = "test-table"
}
}`
var testAccKinesisFirehoseDeliveryStreamConfig_RedshiftUpdates = testAccKinesisFirehoseDeliveryStreamBaseRedshiftConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose", "aws_redshift_cluster.test_cluster"]
name = "terraform-kinesis-firehose-basicredshifttest-%d"
destination = "redshift"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 10
buffer_interval = 400
compression_format = "GZIP"
}
redshift_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}"
username = "testuser"
password = "T3stPass"
data_table_name = "test-table"
copy_options = "GZIP"
data_table_columns = "test-col"
}
}`

View File

@ -14,10 +14,11 @@ For more details, see the [Amazon Kinesis Firehose Documentation][1].
## Example Usage
### S3 Destination
```
resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket"
acl = "private"
bucket = "tf-test-bucket"
acl = "private"
}
resource "aws_iam_role" "firehose_role" {
@ -40,14 +41,50 @@ EOF
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-test-stream"
destination = "s3"
role_arn = "${aws_iam_role.firehose_role.arn}"
s3_bucket_arn = "${aws_s3_bucket.bucket.arn}"
name = "terraform-kinesis-firehose-test-stream"
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
}
```
~> **NOTE:** Kinesis Firehose is currently only supported in us-east-1, us-west-2 and eu-west-1. This implementation of Kinesis Firehose only supports the s3 destination type as Terraform doesn't support Redshift yet.
### Redshift Destination
```
resource "aws_redshift_cluster" "test_cluster" {
cluster_identifier = "tf-redshift-cluster-%d"
database_name = "test"
master_username = "testuser"
master_password = "T3stPass"
node_type = "dc1.large"
cluster_type = "single-node"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-test-stream"
destination = "redshift"
s3_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 10
buffer_interval = 400
compression_format = "GZIP"
}
redshift_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}"
username = "testuser"
password = "T3stPass"
data_table_name = "test-table"
copy_options = "GZIP"
data_table_columns = "test-col"
}
}
```
~> **NOTE:** Kinesis Firehose is currently only supported in us-east-1, us-west-2 and eu-west-1.
## Argument Reference
@ -55,15 +92,34 @@ The following arguments are supported:
* `name` - (Required) A name to identify the stream. This is unique to the
AWS account and region the Stream is created in.
* `destination`  (Required) This is the destination to where the data is delivered. The only options are `s3` & `redshift`
* `role_arn` - (Required) The ARN of the AWS credentials.
* `s3_bucket_arn` - (Required) The ARN of the S3 bucket
* `s3_prefix` - (Optional) The "YYYY/MM/DD/HH" time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket
* `s3_buffer_size` - (Optional) Buffer incoming data to the specified size, in MBs, before delivering it to the destination. The default value is 5.
We recommend setting SizeInMBs to a value greater than the amount of data you typically ingest into the delivery stream in 10 seconds. For example, if you typically ingest data at 1 MB/sec set SizeInMBs to be 10 MB or highe
* `s3_buffer_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds, before delivering it to the destination. The default value is 300
* `s3_data_compression` - (Optional) The compression format. If no value is specified, the default is NOCOMPRESSION. Other supported values are GZIP, ZIP & Snappy
* `destination` (Required) This is the destination to where the data is delivered. The only options are `s3` & `redshift`.
* `s3_configuration` - (Required) Configuration options for the s3 destination (or the intermediate bucket if the destination
is redshift). More details are given below.
* `redshift_configuration` - (Optional) Configuration options if redshift is the destination.
Using `redshift_configuration` requires the user to also specify a
`s3_configuration` block. More details are given below.
The `s3_configuration` object supports the following:
* `role_arn` - (Required) The ARN of the AWS credentials.
* `bucket_arn` - (Required) The ARN of the S3 bucket
* `prefix` - (Optional) The "YYYY/MM/DD/HH" time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket
* `buffer_size` - (Optional) Buffer incoming data to the specified size, in MBs, before delivering it to the destination. The default value is 5.
We recommend setting SizeInMBs to a value greater than the amount of data you typically ingest into the delivery stream in 10 seconds. For example, if you typically ingest data at 1 MB/sec set SizeInMBs to be 10 MB or higher.
* `buffer_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds, before delivering it to the destination. The default value is 300.
* `compression_format` - (Optional) The compression format. If no value is specified, the default is NOCOMPRESSION. Other supported values are GZIP, ZIP & Snappy. If the destination is redshift you cannot use ZIP or Snappy.
* `kms_key_arn` - (Optional) If set, the stream will encrypt data using the key in KMS, otherwise, no encryption will
be used.
The `redshift_configuration` object supports the following:
* `cluster_jdbcurl` - (Required) The jdbcurl of the redshift cluster.
* `username` - (Required) The username that the firehose delivery stream will assume. It is strongly recommend that the username and password provided is used exclusively for Amazon Kinesis Firehose purposes, and that the permissions for the account are restricted for Amazon Redshift INSERT permissions.
* `password` - (Required) The passowrd for the username above.
* `role_arn` - (Required) The arn of the role the stream assumes.
* `data_table_name` - (Required) The name of the table in the redshift cluster that the s3 bucket will copy to.
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift.
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
## Attributes Reference