From 214ed23974130c411d23eb05423973dbda44059d Mon Sep 17 00:00:00 2001 From: Clint Shryock Date: Wed, 27 May 2015 14:17:26 -0500 Subject: [PATCH] provider/aws: AWS Kinesis Stream support Adds a Kinesis Stream resource --- builtin/providers/aws/config.go | 7 +- builtin/providers/aws/provider.go | 1 + .../aws/resource_aws_kinesis_stream.go | 157 ++++++++++++++++++ .../aws/resource_aws_kinesis_stream_test.go | 108 ++++++++++++ .../aws/r/kinesis_stream.html.markdown | 44 +++++ website/source/layouts/aws.erb | 4 + 6 files changed, 320 insertions(+), 1 deletion(-) create mode 100644 builtin/providers/aws/resource_aws_kinesis_stream.go create mode 100644 builtin/providers/aws/resource_aws_kinesis_stream_test.go create mode 100644 website/source/docs/providers/aws/r/kinesis_stream.html.markdown diff --git a/builtin/providers/aws/config.go b/builtin/providers/aws/config.go index 92a69d6bb..9d8397aef 100644 --- a/builtin/providers/aws/config.go +++ b/builtin/providers/aws/config.go @@ -14,11 +14,12 @@ import ( "github.com/awslabs/aws-sdk-go/service/elasticache" "github.com/awslabs/aws-sdk-go/service/elb" "github.com/awslabs/aws-sdk-go/service/iam" + "github.com/awslabs/aws-sdk-go/service/kinesis" "github.com/awslabs/aws-sdk-go/service/rds" "github.com/awslabs/aws-sdk-go/service/route53" "github.com/awslabs/aws-sdk-go/service/s3" - "github.com/awslabs/aws-sdk-go/service/sqs" "github.com/awslabs/aws-sdk-go/service/sns" + "github.com/awslabs/aws-sdk-go/service/sqs" ) type Config struct { @@ -43,6 +44,7 @@ type AWSClient struct { region string rdsconn *rds.RDS iamconn *iam.IAM + kinesisconn *kinesis.Kinesis elasticacheconn *elasticache.ElastiCache } @@ -100,6 +102,9 @@ func (c *Config) Client() (interface{}, error) { log.Println("[INFO] Initializing IAM Connection") client.iamconn = iam.New(awsConfig) + log.Println("[INFO] Initializing Kinesis Connection") + client.kinesisconn = kinesis.New(awsConfig) + err := c.ValidateAccountId(client.iamconn) if err != nil { errs = append(errs, err) diff --git a/builtin/providers/aws/provider.go b/builtin/providers/aws/provider.go index 8b04be2d3..e8eb481b1 100644 --- a/builtin/providers/aws/provider.go +++ b/builtin/providers/aws/provider.go @@ -109,6 +109,7 @@ func Provider() terraform.ResourceProvider { "aws_instance": resourceAwsInstance(), "aws_internet_gateway": resourceAwsInternetGateway(), "aws_key_pair": resourceAwsKeyPair(), + "aws_kinesis_stream": resourceAwsKinesisStream(), "aws_launch_configuration": resourceAwsLaunchConfiguration(), "aws_lb_cookie_stickiness_policy": resourceAwsLBCookieStickinessPolicy(), "aws_main_route_table_association": resourceAwsMainRouteTableAssociation(), diff --git a/builtin/providers/aws/resource_aws_kinesis_stream.go b/builtin/providers/aws/resource_aws_kinesis_stream.go new file mode 100644 index 000000000..2485edb28 --- /dev/null +++ b/builtin/providers/aws/resource_aws_kinesis_stream.go @@ -0,0 +1,157 @@ +package aws + +import ( + "fmt" + "time" + + "github.com/awslabs/aws-sdk-go/aws" + "github.com/awslabs/aws-sdk-go/aws/awserr" + "github.com/awslabs/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/helper/schema" +) + +func resourceAwsKinesisStream() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsKinesisStreamCreate, + Read: resourceAwsKinesisStreamRead, + Delete: resourceAwsKinesisStreamDelete, + + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + + "shard_count": &schema.Schema{ + Type: schema.TypeInt, + Required: true, + ForceNew: true, + }, + + "arn": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + }, + } +} + +func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + sn := d.Get("name").(string) + createOpts := &kinesis.CreateStreamInput{ + ShardCount: aws.Long(int64(d.Get("shard_count").(int))), + StreamName: aws.String(sn), + } + + _, err := conn.CreateStream(createOpts) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + return fmt.Errorf("[WARN] Error creating Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) + } + return err + } + + // set name as ID for now. Kinesis stream names are unique per account+region + d.SetId(sn) + + stateConf := &resource.StateChangeConf{ + Pending: []string{"CREATING"}, + Target: "ACTIVE", + Refresh: streamStateRefreshFunc(conn, sn), + Timeout: 5 * time.Minute, + Delay: 10 * time.Second, + MinTimeout: 3 * time.Second, + } + + streamRaw, err := stateConf.WaitForState() + if err != nil { + return fmt.Errorf( + "Error waiting for Kinesis Stream (%s) to become active: %s", + sn, err) + } + + s := streamRaw.(*kinesis.StreamDescription) + d.Set("arn", *s.StreamARN) + + return nil +} + +func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + describeOpts := &kinesis.DescribeStreamInput{ + StreamName: aws.String(d.Id()), + Limit: aws.Long(1), + } + resp, err := conn.DescribeStream(describeOpts) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ResourceNotFoundException" { + d.SetId("") + return nil + } + return fmt.Errorf("[WARN] Error reading Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) + } + return err + } + + s := resp.StreamDescription + d.Set("arn", *s.StreamARN) + d.Set("shard_count", len(s.Shards)) + + return nil +} + +func resourceAwsKinesisStreamDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + _, err := conn.DeleteStream(&kinesis.DeleteStreamInput{ + StreamName: aws.String(d.Id()), + }) + + if err != nil { + return err + } + + stateConf := &resource.StateChangeConf{ + Pending: []string{"DELETING"}, + Target: "DESTROYED", + Refresh: streamStateRefreshFunc(conn, d.Id()), + Timeout: 5 * time.Minute, + Delay: 10 * time.Second, + MinTimeout: 3 * time.Second, + } + + _, err = stateConf.WaitForState() + if err != nil { + return fmt.Errorf( + "Error waiting for Stream (%s) to be destroyed: %s", + d.Id(), err) + } + + d.SetId("") + return nil +} + +func streamStateRefreshFunc(conn *kinesis.Kinesis, streamName string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + describeOpts := &kinesis.DescribeStreamInput{ + StreamName: aws.String(streamName), + Limit: aws.Long(1), + } + resp, err := conn.DescribeStream(describeOpts) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ResourceNotFoundException" { + return 42, "DESTROYED", nil + } + return nil, awsErr.Code(), err + } + return nil, "failed", err + } + + return resp.StreamDescription, *resp.StreamDescription.StreamStatus, nil + } +} diff --git a/builtin/providers/aws/resource_aws_kinesis_stream_test.go b/builtin/providers/aws/resource_aws_kinesis_stream_test.go new file mode 100644 index 000000000..7de318f27 --- /dev/null +++ b/builtin/providers/aws/resource_aws_kinesis_stream_test.go @@ -0,0 +1,108 @@ +package aws + +import ( + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/awslabs/aws-sdk-go/aws" + "github.com/awslabs/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" +) + +func TestAccKinesisStream_basic(t *testing.T) { + var stream kinesis.StreamDescription + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: testAccKinesisStreamConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + ), + }, + }, + }) +} + +func testAccCheckKinesisStreamExists(n string, stream *kinesis.StreamDescription) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No Kinesis ID is set") + } + + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + describeOpts := &kinesis.DescribeStreamInput{ + StreamName: aws.String(rs.Primary.ID), + Limit: aws.Long(1), + } + resp, err := conn.DescribeStream(describeOpts) + if err != nil { + return err + } + + *stream = *resp.StreamDescription + + return nil + } +} + +func testAccCheckAWSKinesisStreamAttributes(stream *kinesis.StreamDescription) resource.TestCheckFunc { + return func(s *terraform.State) error { + if !strings.HasPrefix(*stream.StreamName, "terraform-kinesis-test") { + return fmt.Errorf("Bad Stream name: %s", *stream.StreamName) + } + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_kinesis_stream" { + continue + } + if *stream.StreamARN != rs.Primary.Attributes["arn"] { + return fmt.Errorf("Bad Stream ARN\n\t expected: %s\n\tgot: %s\n", rs.Primary.Attributes["arn"], *stream.StreamARN) + } + } + return nil + } +} + +func testAccCheckKinesisStreamDestroy(s *terraform.State) error { + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_kinesis_stream" { + continue + } + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + describeOpts := &kinesis.DescribeStreamInput{ + StreamName: aws.String(rs.Primary.ID), + Limit: aws.Long(1), + } + resp, err := conn.DescribeStream(describeOpts) + if err == nil { + if resp.StreamDescription != nil && *resp.StreamDescription.StreamStatus != "DELETING" { + return fmt.Errorf("Error: Stream still exists") + } + } + + return nil + + } + + return nil +} + +var testAccKinesisStreamConfig = fmt.Sprintf(` +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test-%d" + shard_count = 1 +} +`, rand.New(rand.NewSource(time.Now().UnixNano())).Int()) diff --git a/website/source/docs/providers/aws/r/kinesis_stream.html.markdown b/website/source/docs/providers/aws/r/kinesis_stream.html.markdown new file mode 100644 index 000000000..a667bb427 --- /dev/null +++ b/website/source/docs/providers/aws/r/kinesis_stream.html.markdown @@ -0,0 +1,44 @@ +--- +layout: "aws" +page_title: "AWS: aws_kinesis_stream" +sidebar_current: "docs-aws-resource-kinesis-stream" +description: |- + Provides a AWS Kinesis Stream +--- + +# aws\_kinesis\_stream + +Provides a Kinesis Stream resource. Amazon Kinesis is a managed service that +scales elastically for real-time processing of streaming big data. + +For more details, see the [Amazon Kinesis Documentation][1]. + +## Example Usage + +``` +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test" + shard_count = 1 +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Required) A name to identify the stream. This is unique to the +AWS account and region the Stream is created in. +* `shard_count` – (Required) The number of shards that the stream will use. +Amazon has guidlines for specifying the Stream size that should be referenced +when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more. + +## Attributes Reference + +* `id` - The unique Stream id +* `name` - The unique Stream name (same as `id`) +* `shard_count` - The count of Shards for this Stream +* `arn` - The Amazon Resource Name (ARN) specifying the Stream + + +[1]: http://aws.amazon.com/documentation/kinesis/ +[2]: http://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html diff --git a/website/source/layouts/aws.erb b/website/source/layouts/aws.erb index d2ccd1cf9..b7199c1c8 100644 --- a/website/source/layouts/aws.erb +++ b/website/source/layouts/aws.erb @@ -101,6 +101,10 @@ aws_iam_user_policy + > + aws_kinesis_stream + + > aws_instance