From 063d770e517b2273c2766f7135613f9adf814b0e Mon Sep 17 00:00:00 2001 From: Spencer Nelson Date: Mon, 14 Dec 2015 11:26:44 -0500 Subject: [PATCH] provider/aws: Kinesis DescribeStream pagination Each call to the Kinesis DescribeStream API returns a limited number of shards. When interrogating AWS for the state of a Kinesis stream, the client needs to page through the API's responses to get the true number of shards. --- .../aws/resource_aws_kinesis_stream.go | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/builtin/providers/aws/resource_aws_kinesis_stream.go b/builtin/providers/aws/resource_aws_kinesis_stream.go index 1abb9dbc3..76beb3ac4 100644 --- a/builtin/providers/aws/resource_aws_kinesis_stream.go +++ b/builtin/providers/aws/resource_aws_kinesis_stream.go @@ -74,9 +74,10 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er sn, err) } - s := streamRaw.(*kinesis.StreamDescription) - d.SetId(*s.StreamARN) - d.Set("arn", s.StreamARN) + s := streamRaw.(kinesisStreamState) + d.SetId(s.arn) + d.Set("arn", s.arn) + d.Set("shard_count", s.shardCount) return resourceAwsKinesisStreamUpdate(d, meta) } @@ -98,10 +99,8 @@ func resourceAwsKinesisStreamUpdate(d *schema.ResourceData, meta interface{}) er func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).kinesisconn sn := d.Get("name").(string) - describeOpts := &kinesis.DescribeStreamInput{ - StreamName: aws.String(sn), - } - resp, err := conn.DescribeStream(describeOpts) + + state, err := readKinesisStreamState(conn, sn) if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == "ResourceNotFoundException" { @@ -111,11 +110,10 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro return fmt.Errorf("[WARN] Error reading Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) } return err - } - s := resp.StreamDescription - d.Set("arn", *s.StreamARN) - d.Set("shard_count", len(s.Shards)) + } + d.Set("arn", state.arn) + d.Set("shard_count", state.shardCount) // set tags describeTagsOpts := &kinesis.ListTagsForStreamInput{ @@ -162,12 +160,30 @@ func resourceAwsKinesisStreamDelete(d *schema.ResourceData, meta interface{}) er return nil } +type kinesisStreamState struct { + arn string + status string + shardCount int +} + +func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) { + describeOpts := &kinesis.DescribeStreamInput{ + StreamName: aws.String(sn), + } + + var state kinesisStreamState + err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) { + state.arn = aws.StringValue(page.StreamDescription.StreamARN) + state.status = aws.StringValue(page.StreamDescription.StreamStatus) + state.shardCount += len(page.StreamDescription.Shards) + return !last + }) + return state, err +} + func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefreshFunc { return func() (interface{}, string, error) { - describeOpts := &kinesis.DescribeStreamInput{ - StreamName: aws.String(sn), - } - resp, err := conn.DescribeStream(describeOpts) + state, err := readKinesisStreamState(conn, sn) if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == "ResourceNotFoundException" { @@ -178,6 +194,6 @@ func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefr return nil, "failed", err } - return resp.StreamDescription, *resp.StreamDescription.StreamStatus, nil + return state, state.status, nil } }