diff --git a/builtin/providers/aws/resource_aws_kinesis_stream.go b/builtin/providers/aws/resource_aws_kinesis_stream.go index 100fd08da..5fe18b7cc 100644 --- a/builtin/providers/aws/resource_aws_kinesis_stream.go +++ b/builtin/providers/aws/resource_aws_kinesis_stream.go @@ -32,6 +32,20 @@ func resourceAwsKinesisStream() *schema.Resource { ForceNew: true, }, + "retention_period": &schema.Schema{ + Type: schema.TypeInt, + Optional: true, + Default: 24, + ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { + value := v.(int) + if value < 24 || value > 168 { + errors = append(errors, fmt.Errorf( + "%q must be between 24 and 168 hours", k)) + } + return + }, + }, + "arn": &schema.Schema{ Type: schema.TypeString, Optional: true, @@ -93,6 +107,10 @@ func resourceAwsKinesisStreamUpdate(d *schema.ResourceData, meta interface{}) er d.SetPartial("tags") d.Partial(false) + if err := setKinesisRetentionPeriod(conn, d); err != nil { + return err + } + return resourceAwsKinesisStreamRead(d, meta) } @@ -114,6 +132,7 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro } d.Set("arn", state.arn) d.Set("shard_count", state.shardCount) + d.Set("retention_period", state.retentionPeriod) // set tags describeTagsOpts := &kinesis.ListTagsForStreamInput{ @@ -160,10 +179,63 @@ func resourceAwsKinesisStreamDelete(d *schema.ResourceData, meta interface{}) er return nil } +func setKinesisRetentionPeriod(conn *kinesis.Kinesis, d *schema.ResourceData) error { + sn := d.Get("name").(string) + + oraw, nraw := d.GetChange("retention_period") + o := oraw.(int) + n := nraw.(int) + + if n == 0 { + log.Printf("[DEBUG] Kinesis Stream (%q) Retention Period Not Changed", sn) + return nil + } + + if n > o { + log.Printf("[DEBUG] Increasing %s Stream Retention Period to %d", sn, n) + _, err := conn.IncreaseStreamRetentionPeriod(&kinesis.IncreaseStreamRetentionPeriodInput{ + StreamName: aws.String(sn), + RetentionPeriodHours: aws.Int64(int64(n)), + }) + if err != nil { + return err + } + + } else { + log.Printf("[DEBUG] Decreasing %s Stream Retention Period to %d", sn, n) + _, err := conn.DecreaseStreamRetentionPeriod(&kinesis.DecreaseStreamRetentionPeriodInput{ + StreamName: aws.String(sn), + RetentionPeriodHours: aws.Int64(int64(n)), + }) + if err != nil { + return err + } + } + + stateConf := &resource.StateChangeConf{ + Pending: []string{"UPDATING"}, + Target: []string{"ACTIVE"}, + Refresh: streamStateRefreshFunc(conn, sn), + Timeout: 5 * time.Minute, + Delay: 10 * time.Second, + MinTimeout: 3 * time.Second, + } + + _, err := stateConf.WaitForState() + if err != nil { + return fmt.Errorf( + "Error waiting for Kinesis Stream (%s) to become active: %s", + sn, err) + } + + return nil +} + type kinesisStreamState struct { - arn string - status string - shardCount int + arn string + status string + shardCount int + retentionPeriod int64 } func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) { @@ -176,6 +248,7 @@ func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamStat state.arn = aws.StringValue(page.StreamDescription.StreamARN) state.status = aws.StringValue(page.StreamDescription.StreamStatus) state.shardCount += len(page.StreamDescription.Shards) + state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours) return !last }) return state, err diff --git a/builtin/providers/aws/resource_aws_kinesis_stream_test.go b/builtin/providers/aws/resource_aws_kinesis_stream_test.go index 82c0b64fa..d5cc8c8d9 100644 --- a/builtin/providers/aws/resource_aws_kinesis_stream_test.go +++ b/builtin/providers/aws/resource_aws_kinesis_stream_test.go @@ -17,13 +17,15 @@ import ( func TestAccAWSKinesisStream_basic(t *testing.T) { var stream kinesis.StreamDescription + config := fmt.Sprintf(testAccKinesisStreamConfig, rand.New(rand.NewSource(time.Now().UnixNano())).Int()) + resource.Test(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, Providers: testAccProviders, CheckDestroy: testAccCheckKinesisStreamDestroy, Steps: []resource.TestStep{ resource.TestStep{ - Config: testAccKinesisStreamConfig, + Config: config, Check: resource.ComposeTestCheckFunc( testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), testAccCheckAWSKinesisStreamAttributes(&stream), @@ -33,6 +35,52 @@ func TestAccAWSKinesisStream_basic(t *testing.T) { }) } +func TestAccAWSKinesisStream_retentionPeriod(t *testing.T) { + var stream kinesis.StreamDescription + + ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + config := fmt.Sprintf(testAccKinesisStreamConfig, ri) + updateConfig := fmt.Sprintf(testAccKinesisStreamConfigUpdateRetentionPeriod, ri) + decreaseConfig := fmt.Sprintf(testAccKinesisStreamConfigDecreaseRetentionPeriod, ri) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: config, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_stream.test_stream", "retention_period", "24"), + ), + }, + + resource.TestStep{ + Config: updateConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_stream.test_stream", "retention_period", "100"), + ), + }, + + resource.TestStep{ + Config: decreaseConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_stream.test_stream", "retention_period", "28"), + ), + }, + }, + }) +} + func testAccCheckKinesisStreamExists(n string, stream *kinesis.StreamDescription) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] @@ -103,7 +151,7 @@ func testAccCheckKinesisStreamDestroy(s *terraform.State) error { return nil } -var testAccKinesisStreamConfig = fmt.Sprintf(` +var testAccKinesisStreamConfig = ` resource "aws_kinesis_stream" "test_stream" { name = "terraform-kinesis-test-%d" shard_count = 2 @@ -111,4 +159,26 @@ resource "aws_kinesis_stream" "test_stream" { Name = "tf-test" } } -`, rand.New(rand.NewSource(time.Now().UnixNano())).Int()) +` + +var testAccKinesisStreamConfigUpdateRetentionPeriod = ` +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test-%d" + shard_count = 2 + retention_period = 100 + tags { + Name = "tf-test" + } +} +` + +var testAccKinesisStreamConfigDecreaseRetentionPeriod = ` +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test-%d" + shard_count = 2 + retention_period = 28 + tags { + Name = "tf-test" + } +} +` diff --git a/website/source/docs/providers/aws/r/kinesis_stream.html.markdown b/website/source/docs/providers/aws/r/kinesis_stream.html.markdown index b46752a00..90220bffb 100644 --- a/website/source/docs/providers/aws/r/kinesis_stream.html.markdown +++ b/website/source/docs/providers/aws/r/kinesis_stream.html.markdown @@ -19,6 +19,7 @@ For more details, see the [Amazon Kinesis Documentation][1]. resource "aws_kinesis_stream" "test_stream" { name = "terraform-kinesis-test" shard_count = 1 + retention_period = 48 tags { Environment = "test" } @@ -34,6 +35,7 @@ AWS account and region the Stream is created in. * `shard_count` – (Required) The number of shards that the stream will use. Amazon has guidlines for specifying the Stream size that should be referenced when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more. +* `retention_period` - (Optional) Length of time data records are accessible after they are added to the stream. The maximum value of a stream's retention period is 168 hours. Minimum value is 24. Default is 24. * `tags` - (Optional) A mapping of tags to assign to the resource. ## Attributes Reference