Merge pull request #5223 from stack72/f-aws-kinesis-retention-period

provider/aws: Implement RetentionPeriod Changes for Kinesis Stream
This commit is contained in:
Paul Stack 2016-02-23 17:26:12 +00:00
commit 039065f63a
3 changed files with 151 additions and 6 deletions

View File

@ -32,6 +32,20 @@ func resourceAwsKinesisStream() *schema.Resource {
ForceNew: true, ForceNew: true,
}, },
"retention_period": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 24,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(int)
if value < 24 || value > 168 {
errors = append(errors, fmt.Errorf(
"%q must be between 24 and 168 hours", k))
}
return
},
},
"arn": &schema.Schema{ "arn": &schema.Schema{
Type: schema.TypeString, Type: schema.TypeString,
Optional: true, Optional: true,
@ -93,6 +107,10 @@ func resourceAwsKinesisStreamUpdate(d *schema.ResourceData, meta interface{}) er
d.SetPartial("tags") d.SetPartial("tags")
d.Partial(false) d.Partial(false)
if err := setKinesisRetentionPeriod(conn, d); err != nil {
return err
}
return resourceAwsKinesisStreamRead(d, meta) return resourceAwsKinesisStreamRead(d, meta)
} }
@ -114,6 +132,7 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro
} }
d.Set("arn", state.arn) d.Set("arn", state.arn)
d.Set("shard_count", state.shardCount) d.Set("shard_count", state.shardCount)
d.Set("retention_period", state.retentionPeriod)
// set tags // set tags
describeTagsOpts := &kinesis.ListTagsForStreamInput{ describeTagsOpts := &kinesis.ListTagsForStreamInput{
@ -160,10 +179,63 @@ func resourceAwsKinesisStreamDelete(d *schema.ResourceData, meta interface{}) er
return nil return nil
} }
func setKinesisRetentionPeriod(conn *kinesis.Kinesis, d *schema.ResourceData) error {
sn := d.Get("name").(string)
oraw, nraw := d.GetChange("retention_period")
o := oraw.(int)
n := nraw.(int)
if n == 0 {
log.Printf("[DEBUG] Kinesis Stream (%q) Retention Period Not Changed", sn)
return nil
}
if n > o {
log.Printf("[DEBUG] Increasing %s Stream Retention Period to %d", sn, n)
_, err := conn.IncreaseStreamRetentionPeriod(&kinesis.IncreaseStreamRetentionPeriodInput{
StreamName: aws.String(sn),
RetentionPeriodHours: aws.Int64(int64(n)),
})
if err != nil {
return err
}
} else {
log.Printf("[DEBUG] Decreasing %s Stream Retention Period to %d", sn, n)
_, err := conn.DecreaseStreamRetentionPeriod(&kinesis.DecreaseStreamRetentionPeriodInput{
StreamName: aws.String(sn),
RetentionPeriodHours: aws.Int64(int64(n)),
})
if err != nil {
return err
}
}
stateConf := &resource.StateChangeConf{
Pending: []string{"UPDATING"},
Target: []string{"ACTIVE"},
Refresh: streamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}
_, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for Kinesis Stream (%s) to become active: %s",
sn, err)
}
return nil
}
type kinesisStreamState struct { type kinesisStreamState struct {
arn string arn string
status string status string
shardCount int shardCount int
retentionPeriod int64
} }
func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) { func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) {
@ -176,6 +248,7 @@ func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamStat
state.arn = aws.StringValue(page.StreamDescription.StreamARN) state.arn = aws.StringValue(page.StreamDescription.StreamARN)
state.status = aws.StringValue(page.StreamDescription.StreamStatus) state.status = aws.StringValue(page.StreamDescription.StreamStatus)
state.shardCount += len(page.StreamDescription.Shards) state.shardCount += len(page.StreamDescription.Shards)
state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours)
return !last return !last
}) })
return state, err return state, err

View File

@ -17,13 +17,15 @@ import (
func TestAccAWSKinesisStream_basic(t *testing.T) { func TestAccAWSKinesisStream_basic(t *testing.T) {
var stream kinesis.StreamDescription var stream kinesis.StreamDescription
config := fmt.Sprintf(testAccKinesisStreamConfig, rand.New(rand.NewSource(time.Now().UnixNano())).Int())
resource.Test(t, resource.TestCase{ resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) }, PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders, Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisStreamDestroy, CheckDestroy: testAccCheckKinesisStreamDestroy,
Steps: []resource.TestStep{ Steps: []resource.TestStep{
resource.TestStep{ resource.TestStep{
Config: testAccKinesisStreamConfig, Config: config,
Check: resource.ComposeTestCheckFunc( Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
testAccCheckAWSKinesisStreamAttributes(&stream), testAccCheckAWSKinesisStreamAttributes(&stream),
@ -33,6 +35,52 @@ func TestAccAWSKinesisStream_basic(t *testing.T) {
}) })
} }
func TestAccAWSKinesisStream_retentionPeriod(t *testing.T) {
var stream kinesis.StreamDescription
ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
config := fmt.Sprintf(testAccKinesisStreamConfig, ri)
updateConfig := fmt.Sprintf(testAccKinesisStreamConfigUpdateRetentionPeriod, ri)
decreaseConfig := fmt.Sprintf(testAccKinesisStreamConfigDecreaseRetentionPeriod, ri)
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
testAccCheckAWSKinesisStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_stream.test_stream", "retention_period", "24"),
),
},
resource.TestStep{
Config: updateConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
testAccCheckAWSKinesisStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_stream.test_stream", "retention_period", "100"),
),
},
resource.TestStep{
Config: decreaseConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
testAccCheckAWSKinesisStreamAttributes(&stream),
resource.TestCheckResourceAttr(
"aws_kinesis_stream.test_stream", "retention_period", "28"),
),
},
},
})
}
func testAccCheckKinesisStreamExists(n string, stream *kinesis.StreamDescription) resource.TestCheckFunc { func testAccCheckKinesisStreamExists(n string, stream *kinesis.StreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error { return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n] rs, ok := s.RootModule().Resources[n]
@ -103,7 +151,7 @@ func testAccCheckKinesisStreamDestroy(s *terraform.State) error {
return nil return nil
} }
var testAccKinesisStreamConfig = fmt.Sprintf(` var testAccKinesisStreamConfig = `
resource "aws_kinesis_stream" "test_stream" { resource "aws_kinesis_stream" "test_stream" {
name = "terraform-kinesis-test-%d" name = "terraform-kinesis-test-%d"
shard_count = 2 shard_count = 2
@ -111,4 +159,26 @@ resource "aws_kinesis_stream" "test_stream" {
Name = "tf-test" Name = "tf-test"
} }
} }
`, rand.New(rand.NewSource(time.Now().UnixNano())).Int()) `
var testAccKinesisStreamConfigUpdateRetentionPeriod = `
resource "aws_kinesis_stream" "test_stream" {
name = "terraform-kinesis-test-%d"
shard_count = 2
retention_period = 100
tags {
Name = "tf-test"
}
}
`
var testAccKinesisStreamConfigDecreaseRetentionPeriod = `
resource "aws_kinesis_stream" "test_stream" {
name = "terraform-kinesis-test-%d"
shard_count = 2
retention_period = 28
tags {
Name = "tf-test"
}
}
`

View File

@ -19,6 +19,7 @@ For more details, see the [Amazon Kinesis Documentation][1].
resource "aws_kinesis_stream" "test_stream" { resource "aws_kinesis_stream" "test_stream" {
name = "terraform-kinesis-test" name = "terraform-kinesis-test"
shard_count = 1 shard_count = 1
retention_period = 48
tags { tags {
Environment = "test" Environment = "test"
} }
@ -34,6 +35,7 @@ AWS account and region the Stream is created in.
* `shard_count`  (Required) The number of shards that the stream will use. * `shard_count`  (Required) The number of shards that the stream will use.
Amazon has guidlines for specifying the Stream size that should be referenced Amazon has guidlines for specifying the Stream size that should be referenced
when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more. when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more.
* `retention_period` - (Optional) Length of time data records are accessible after they are added to the stream. The maximum value of a stream's retention period is 168 hours. Minimum value is 24. Default is 24.
* `tags` - (Optional) A mapping of tags to assign to the resource. * `tags` - (Optional) A mapping of tags to assign to the resource.
## Attributes Reference ## Attributes Reference