provider/aws: Add aws_kinesis_stream data source (#13562)
This fixes #13521
This commit is contained in:
parent
bf8374c048
commit
3f0934bd4e
|
@ -0,0 +1,95 @@
|
|||
package aws
|
||||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
"github.com/hashicorp/terraform/helper/schema"
|
||||
)
|
||||
|
||||
func dataSourceAwsKinesisStream() *schema.Resource {
|
||||
return &schema.Resource{
|
||||
Read: dataSourceAwsKinesisStreamRead,
|
||||
|
||||
Schema: map[string]*schema.Schema{
|
||||
"name": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Required: true,
|
||||
},
|
||||
|
||||
"arn": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Computed: true,
|
||||
},
|
||||
|
||||
"creation_timestamp": &schema.Schema{
|
||||
Type: schema.TypeInt,
|
||||
Computed: true,
|
||||
},
|
||||
|
||||
"status": &schema.Schema{
|
||||
Type: schema.TypeString,
|
||||
Computed: true,
|
||||
},
|
||||
|
||||
"retention_period": &schema.Schema{
|
||||
Type: schema.TypeInt,
|
||||
Computed: true,
|
||||
},
|
||||
|
||||
"open_shards": &schema.Schema{
|
||||
Type: schema.TypeSet,
|
||||
Computed: true,
|
||||
Elem: &schema.Schema{Type: schema.TypeString},
|
||||
Set: schema.HashString,
|
||||
},
|
||||
|
||||
"closed_shards": &schema.Schema{
|
||||
Type: schema.TypeSet,
|
||||
Computed: true,
|
||||
Elem: &schema.Schema{Type: schema.TypeString},
|
||||
Set: schema.HashString,
|
||||
},
|
||||
|
||||
"shard_level_metrics": &schema.Schema{
|
||||
Type: schema.TypeSet,
|
||||
Computed: true,
|
||||
Elem: &schema.Schema{Type: schema.TypeString},
|
||||
Set: schema.HashString,
|
||||
},
|
||||
|
||||
"tags": &schema.Schema{
|
||||
Type: schema.TypeMap,
|
||||
Computed: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func dataSourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error {
|
||||
conn := meta.(*AWSClient).kinesisconn
|
||||
sn := d.Get("name").(string)
|
||||
|
||||
state, err := readKinesisStreamState(conn, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.SetId(state.arn)
|
||||
d.Set("arn", state.arn)
|
||||
d.Set("name", sn)
|
||||
d.Set("open_shards", state.openShards)
|
||||
d.Set("closed_shards", state.closedShards)
|
||||
d.Set("status", state.status)
|
||||
d.Set("creation_timestamp", state.creationTimestamp)
|
||||
d.Set("retention_period", state.retentionPeriod)
|
||||
d.Set("shard_level_metrics", state.shardLevelMetrics)
|
||||
|
||||
tags, err := conn.ListTagsForStream(&kinesis.ListTagsForStreamInput{
|
||||
StreamName: aws.String(sn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Set("tags", tagsToMapKinesis(tags.Tags))
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package aws
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
"github.com/hashicorp/terraform/helper/acctest"
|
||||
"github.com/hashicorp/terraform/helper/resource"
|
||||
)
|
||||
|
||||
func TestAccAWSKinesisStreamDataSource(t *testing.T) {
|
||||
var stream kinesis.StreamDescription
|
||||
|
||||
sn := fmt.Sprintf("terraform-kinesis-test-%d", acctest.RandInt())
|
||||
config := fmt.Sprintf(testAccCheckAwsKinesisStreamDataSourceConfig, sn)
|
||||
|
||||
updateShardCount := func() {
|
||||
conn := testAccProvider.Meta().(*AWSClient).kinesisconn
|
||||
_, err := conn.UpdateShardCount(&kinesis.UpdateShardCountInput{
|
||||
ScalingType: aws.String(kinesis.ScalingTypeUniformScaling),
|
||||
StreamName: aws.String(sn),
|
||||
TargetShardCount: aws.Int64(3),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error calling UpdateShardCount: %s", err)
|
||||
}
|
||||
if err := waitForKinesisToBeActive(conn, sn); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
resource.Test(t, resource.TestCase{
|
||||
PreCheck: func() { testAccPreCheck(t) },
|
||||
Providers: testAccProviders,
|
||||
CheckDestroy: testAccCheckKinesisStreamDestroy,
|
||||
Steps: []resource.TestStep{
|
||||
{
|
||||
Config: config,
|
||||
Check: resource.ComposeTestCheckFunc(
|
||||
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
|
||||
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "arn"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "name", sn),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "status", "ACTIVE"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "open_shards.#", "2"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "closed_shards.#", "0"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "shard_level_metrics.#", "2"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"),
|
||||
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"),
|
||||
),
|
||||
},
|
||||
{
|
||||
Config: config,
|
||||
PreConfig: updateShardCount,
|
||||
Check: resource.ComposeTestCheckFunc(
|
||||
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
|
||||
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "arn"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "name", sn),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "status", "ACTIVE"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "open_shards.#", "3"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "closed_shards.#", "4"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "shard_level_metrics.#", "2"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"),
|
||||
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"),
|
||||
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"),
|
||||
),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
var testAccCheckAwsKinesisStreamDataSourceConfig = `
|
||||
resource "aws_kinesis_stream" "test_stream" {
|
||||
name = "%s"
|
||||
shard_count = 2
|
||||
retention_period = 72
|
||||
tags {
|
||||
Name = "tf-test"
|
||||
}
|
||||
shard_level_metrics = [
|
||||
"IncomingBytes",
|
||||
"OutgoingBytes"
|
||||
]
|
||||
lifecycle {
|
||||
ignore_changes = ["shard_count"]
|
||||
}
|
||||
}
|
||||
|
||||
data "aws_kinesis_stream" "test_stream" {
|
||||
name = "${aws_kinesis_stream.test_stream.name}"
|
||||
}
|
||||
`
|
|
@ -179,6 +179,7 @@ func Provider() terraform.ResourceProvider {
|
|||
"aws_eip": dataSourceAwsEip(),
|
||||
"aws_elb_hosted_zone_id": dataSourceAwsElbHostedZoneId(),
|
||||
"aws_elb_service_account": dataSourceAwsElbServiceAccount(),
|
||||
"aws_kinesis_stream": dataSourceAwsKinesisStream(),
|
||||
"aws_iam_account_alias": dataSourceAwsIamAccountAlias(),
|
||||
"aws_iam_policy_document": dataSourceAwsIamPolicyDocument(),
|
||||
"aws_iam_role": dataSourceAwsIAMRole(),
|
||||
|
|
|
@ -95,10 +95,10 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er
|
|||
sn, err)
|
||||
}
|
||||
|
||||
s := streamRaw.(kinesisStreamState)
|
||||
s := streamRaw.(*kinesisStreamState)
|
||||
d.SetId(s.arn)
|
||||
d.Set("arn", s.arn)
|
||||
d.Set("shard_count", s.shardCount)
|
||||
d.Set("shard_count", len(s.openShards))
|
||||
|
||||
return resourceAwsKinesisStreamUpdate(d, meta)
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro
|
|||
|
||||
}
|
||||
d.Set("arn", state.arn)
|
||||
d.Set("shard_count", state.shardCount)
|
||||
d.Set("shard_count", len(state.openShards))
|
||||
d.Set("retention_period", state.retentionPeriod)
|
||||
|
||||
if len(state.shardLevelMetrics) > 0 {
|
||||
|
@ -290,23 +290,27 @@ func updateKinesisShardLevelMetrics(conn *kinesis.Kinesis, d *schema.ResourceDat
|
|||
|
||||
type kinesisStreamState struct {
|
||||
arn string
|
||||
creationTimestamp int64
|
||||
status string
|
||||
shardCount int
|
||||
retentionPeriod int64
|
||||
openShards []string
|
||||
closedShards []string
|
||||
shardLevelMetrics []string
|
||||
}
|
||||
|
||||
func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) {
|
||||
func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStreamState, error) {
|
||||
describeOpts := &kinesis.DescribeStreamInput{
|
||||
StreamName: aws.String(sn),
|
||||
}
|
||||
|
||||
var state kinesisStreamState
|
||||
state := &kinesisStreamState{}
|
||||
err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) {
|
||||
state.arn = aws.StringValue(page.StreamDescription.StreamARN)
|
||||
state.creationTimestamp = aws.TimeValue(page.StreamDescription.StreamCreationTimestamp).Unix()
|
||||
state.status = aws.StringValue(page.StreamDescription.StreamStatus)
|
||||
state.shardCount += len(openShards(page.StreamDescription.Shards))
|
||||
state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours)
|
||||
state.openShards = append(state.openShards, flattenShards(openShards(page.StreamDescription.Shards))...)
|
||||
state.closedShards = append(state.closedShards, flattenShards(closedShards(page.StreamDescription.Shards))...)
|
||||
state.shardLevelMetrics = flattenKinesisShardLevelMetrics(page.StreamDescription.EnhancedMonitoring)
|
||||
return !last
|
||||
})
|
||||
|
@ -349,14 +353,31 @@ func waitForKinesisToBeActive(conn *kinesis.Kinesis, sn string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html
|
||||
func openShards(shards []*kinesis.Shard) []*kinesis.Shard {
|
||||
var open []*kinesis.Shard
|
||||
return filterShards(shards, true)
|
||||
}
|
||||
|
||||
func closedShards(shards []*kinesis.Shard) []*kinesis.Shard {
|
||||
return filterShards(shards, false)
|
||||
}
|
||||
|
||||
// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html
|
||||
func filterShards(shards []*kinesis.Shard, open bool) []*kinesis.Shard {
|
||||
res := make([]*kinesis.Shard, 0, len(shards))
|
||||
for _, s := range shards {
|
||||
if s.SequenceNumberRange.EndingSequenceNumber == nil {
|
||||
open = append(open, s)
|
||||
if open && s.SequenceNumberRange.EndingSequenceNumber == nil {
|
||||
res = append(res, s)
|
||||
} else if !open && s.SequenceNumberRange.EndingSequenceNumber != nil {
|
||||
res = append(res, s)
|
||||
}
|
||||
}
|
||||
|
||||
return open
|
||||
return res
|
||||
}
|
||||
|
||||
func flattenShards(shards []*kinesis.Shard) []string {
|
||||
res := make([]string, len(shards))
|
||||
for i, s := range shards {
|
||||
res[i] = aws.StringValue(s.ShardId)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
---
|
||||
layout: "aws"
|
||||
page_title: "AWS: aws_kinesis_stream"
|
||||
sidebar_current: "docs-aws-datasource-kinesis-stream"
|
||||
description: |-
|
||||
Provides a Kinesis Stream data source.
|
||||
---
|
||||
|
||||
# aws\_kinesis\_stream
|
||||
|
||||
Use this data source to get information about a Kinesis Stream for use in other
|
||||
resources.
|
||||
|
||||
For more details, see the [Amazon Kinesis Documentation][1].
|
||||
|
||||
## Example Usage
|
||||
|
||||
```
|
||||
data "aws_kinesis_stream" "stream" {
|
||||
name = "stream-name"
|
||||
}
|
||||
```
|
||||
|
||||
## Argument Reference
|
||||
|
||||
* `name` - (Required) The name of the Kinesis Stream.
|
||||
|
||||
## Attributes Reference
|
||||
|
||||
`id` is set to the Amazon Resource Name (ARN) of the Kinesis Stream. In addition, the following attributes
|
||||
are exported:
|
||||
|
||||
* `arn` - The Amazon Resource Name (ARN) of the Kinesis Stream (same as id).
|
||||
* `name` - The name of the Kinesis Stream.
|
||||
* `creation_timestamp` - The approximate UNIX timestamp that the stream was created.
|
||||
* `status` - The current status of the stream. The stream status is one of CREATING, DELETING, ACTIVE, or UPDATING.
|
||||
* `retention_period` - Length of time (in hours) data records are accessible after they are added to the stream.
|
||||
* `open_shards` - The list of shard ids in the OPEN state. See [Shard State][2] for more.
|
||||
* `closed_shards` - The list of shard ids in the CLOSED state. See [Shard State][2] for more.
|
||||
* `shard_level_metrics` - A list of shard-level CloudWatch metrics which are enabled for the stream. See [Monitoring with CloudWatch][3] for more.
|
||||
* `tags` - A mapping of tags to assigned to the stream.
|
||||
|
||||
[1]: https://aws.amazon.com/documentation/kinesis/
|
||||
[2]: https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing
|
||||
[3]: https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html
|
|
@ -68,6 +68,9 @@
|
|||
<li<%= sidebar_current("docs-aws-datasource-elb-service-account") %>>
|
||||
<a href="/docs/providers/aws/d/elb_service_account.html">aws_elb_service_account</a>
|
||||
</li>
|
||||
<li<%= sidebar_current("docs-aws-datasource-kinesis-stream") %>>
|
||||
<a href="/docs/providers/aws/d/kinesis_stream.html">kinesis_stream</a>
|
||||
</li>
|
||||
<li<%= sidebar_current("docs-aws-datasource-iam-account-alias") %>>
|
||||
<a href="/docs/providers/aws/d/iam_account_alias.html">aws_iam_account_alias</a>
|
||||
</li>
|
||||
|
|
Loading…
Reference in New Issue