From 10f6d7f30f875020905d117dee3ba94b179daa70 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Thu, 12 Jan 2017 16:55:42 -0500 Subject: [PATCH] Add locking for s3 state Use a DynamoDB table to coodinate state locking in S3. We use a simple strategy here, defining a key containing the value of the bucket/key of the state file as the lock. If the keys exists, the locks fails. TODO: decide if locks should automatically be expired, or require manual intervention. --- state/local.go | 8 +-- state/remote/remote_test.go | 32 +++++++++++ state/remote/s3.go | 77 +++++++++++++++++++++++++ state/remote/s3_test.go | 108 +++++++++++++++++++++++++++++++++++- 4 files changed, 220 insertions(+), 5 deletions(-) diff --git a/state/local.go b/state/local.go index a25526732..7ecb73b01 100644 --- a/state/local.go +++ b/state/local.go @@ -16,7 +16,7 @@ type lockInfo struct { // Path to the state file Path string // The time the lock was taken - Time time.Time + Created time.Time // The time this lock expires Expires time.Time // The lock reason passed to State.Lock @@ -26,7 +26,7 @@ type lockInfo struct { // return the lock info formatted in an error func (l *lockInfo) Err() error { return fmt.Errorf("state file %q locked. created:%s, expires:%s, reason:%s", - l.Path, l.Time, l.Expires, l.Reason) + l.Path, l.Created, l.Expires, l.Reason) } // LocalState manages a state storage that is local to the filesystem. @@ -227,8 +227,8 @@ func (s *LocalState) writeLockInfo(reason string) error { lockInfo := &lockInfo{ Path: s.Path, - Time: time.Now(), - Expires: time.Now().Add(time.Hour), + Created: time.Now().UTC(), + Expires: time.Now().Add(time.Hour).UTC(), Reason: reason, } diff --git a/state/remote/remote_test.go b/state/remote/remote_test.go index db3c795c8..5fa20f4eb 100644 --- a/state/remote/remote_test.go +++ b/state/remote/remote_test.go @@ -44,6 +44,38 @@ func testClient(t *testing.T, c Client) { } } +func testClientLocks(t *testing.T, c Client) { + s3Client := c.(*S3Client) + + // initial lock + if err := s3Client.Lock("test"); err != nil { + t.Fatal(err) + } + + // second lock should fail + if err := s3Client.Lock("test"); err == nil { + t.Fatal("expected error, got nil") + } + + // unlock should work + if err := s3Client.Unlock(); err != nil { + t.Fatal(err) + } + + // now we should be able to lock again + if err := s3Client.Lock("test"); err != nil { + t.Fatal(err) + } + + // unlock should be idempotent + if err := s3Client.Unlock(); err != nil { + t.Fatal(err) + } + if err := s3Client.Unlock(); err != nil { + t.Fatal(err) + } +} + func TestRemoteClient_noPayload(t *testing.T) { s := &State{ Client: nilClient{}, diff --git a/state/remote/s3.go b/state/remote/s3.go index eb810eceb..772737e11 100644 --- a/state/remote/s3.go +++ b/state/remote/s3.go @@ -7,10 +7,12 @@ import ( "log" "os" "strconv" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/s3" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-multierror" @@ -89,6 +91,7 @@ providing credentials for the AWS S3 remote`)) } sess := session.New(awsConfig) nativeClient := s3.New(sess) + dynClient := dynamodb.New(sess) return &S3Client{ nativeClient: nativeClient, @@ -97,6 +100,8 @@ providing credentials for the AWS S3 remote`)) serverSideEncryption: serverSideEncryption, acl: acl, kmsKeyID: kmsKeyID, + dynClient: dynClient, + lockTable: conf["lock_table"], }, nil } @@ -107,6 +112,8 @@ type S3Client struct { serverSideEncryption bool acl string kmsKeyID string + dynClient *dynamodb.DynamoDB + lockTable string } func (c *S3Client) Get() (*Payload, error) { @@ -188,3 +195,73 @@ func (c *S3Client) Delete() error { return err } + +func (c *S3Client) Lock(reason string) error { + if c.lockTable == "" { + return nil + } + + stateName := fmt.Sprintf("%s/%s", c.bucketName, c.keyName) + + putParams := &dynamodb.PutItemInput{ + Item: map[string]*dynamodb.AttributeValue{ + "LockID": {S: aws.String(stateName)}, + "Created": {S: aws.String(time.Now().UTC().Format(time.RFC3339))}, + "Expires": {S: aws.String(time.Now().Add(time.Hour).UTC().Format(time.RFC3339))}, + "Info": {S: aws.String(reason)}, + }, + TableName: aws.String(c.lockTable), + ConditionExpression: aws.String("attribute_not_exists(LockID)"), + } + _, err := c.dynClient.PutItem(putParams) + + if err != nil { + getParams := &dynamodb.GetItemInput{ + Key: map[string]*dynamodb.AttributeValue{ + "LockID": {S: aws.String(fmt.Sprintf("%s/%s", c.bucketName, c.keyName))}, + }, + ProjectionExpression: aws.String("LockID, Created, Expires, Info"), + TableName: aws.String(c.lockTable), + } + + resp, err := c.dynClient.GetItem(getParams) + if err != nil { + return fmt.Errorf("s3 state file %q locked, cfailed to retrive info: %s", stateName, err) + } + + var created, expires, info string + if v, ok := resp.Item["Created"]; ok && v.S != nil { + created = *v.S + } + if v, ok := resp.Item["Expires"]; ok && v.S != nil { + expires = *v.S + } + if v, ok := resp.Item["Info"]; ok && v.S != nil { + info = *v.S + } + + return fmt.Errorf("state file %q locked. created:%s, expires:%s, reason:%s", + stateName, created, expires, info) + + } + return nil +} + +func (c *S3Client) Unlock() error { + if c.lockTable == "" { + return nil + } + + params := &dynamodb.DeleteItemInput{ + Key: map[string]*dynamodb.AttributeValue{ + "LockID": {S: aws.String(fmt.Sprintf("%s/%s", c.bucketName, c.keyName))}, + }, + TableName: aws.String(c.lockTable), + } + _, err := c.dynClient.DeleteItem(params) + + if err != nil { + return err + } + return nil +} diff --git a/state/remote/s3_test.go b/state/remote/s3_test.go index e61d44b9e..f29426fa7 100644 --- a/state/remote/s3_test.go +++ b/state/remote/s3_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/s3" ) @@ -123,9 +125,113 @@ func TestS3Client(t *testing.T) { _, err := nativeClient.DeleteBucket(deleteBucketReq) if err != nil { - t.Logf("WARNING: Failed to delete the test S3 bucket. It has been left in your AWS account and may incur storage charges. (error was %s)", err) + t.Logf("WARNING: Failed to delete the test S3 bucket. It may have been left in your AWS account and may incur storage charges. (error was %s)", err) } }() testClient(t, client) } + +func TestS3ClientLocks(t *testing.T) { + // This test creates a DynamoDB table. + // It may incur costs, so it will only run if AWS credential environment + // variables are present. + + accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID") + if accessKeyId == "" { + t.Skipf("skipping; AWS_ACCESS_KEY_ID must be set") + } + + regionName := os.Getenv("AWS_DEFAULT_REGION") + if regionName == "" { + regionName = "us-west-2" + } + + bucketName := fmt.Sprintf("terraform-remote-s3-lock-%x", time.Now().Unix()) + keyName := "testState" + + config := make(map[string]string) + config["region"] = regionName + config["bucket"] = bucketName + config["key"] = keyName + config["encrypt"] = "1" + config["lock_table"] = bucketName + + client, err := s3Factory(config) + if err != nil { + t.Fatalf("Error for valid config") + } + + s3Client := client.(*S3Client) + + // set this up before we try to crate the table, in case we timeout creating it. + defer deleteDynaboDBTable(t, s3Client, bucketName) + + createDynamoDBTable(t, s3Client, bucketName) + + testClientLocks(t, client) +} + +// create the dynamoDB table, and wait until we can query it. +func createDynamoDBTable(t *testing.T, c *S3Client, tableName string) { + createInput := &dynamodb.CreateTableInput{ + AttributeDefinitions: []*dynamodb.AttributeDefinition{ + { + AttributeName: aws.String("LockID"), + AttributeType: aws.String("S"), + }, + }, + KeySchema: []*dynamodb.KeySchemaElement{ + { + AttributeName: aws.String("LockID"), + KeyType: aws.String("HASH"), + }, + }, + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(5), + WriteCapacityUnits: aws.Int64(5), + }, + TableName: aws.String(tableName), + } + + _, err := c.dynClient.CreateTable(createInput) + if err != nil { + t.Fatal(err) + } + + // now wait until it's ACTIVE + start := time.Now() + time.Sleep(time.Second) + + describeInput := &dynamodb.DescribeTableInput{ + TableName: aws.String(tableName), + } + + for { + resp, err := c.dynClient.DescribeTable(describeInput) + if err != nil { + t.Fatal(err) + } + + if *resp.Table.TableStatus == "ACTIVE" { + return + } + + if time.Since(start) > time.Minute { + t.Fatalf("timed out creating DynamoDB table %s", tableName) + } + + time.Sleep(3 * time.Second) + } + +} + +func deleteDynaboDBTable(t *testing.T, c *S3Client, tableName string) { + params := &dynamodb.DeleteTableInput{ + TableName: aws.String(tableName), + } + _, err := c.dynClient.DeleteTable(params) + if err != nil { + t.Logf("WARNING: Failed to delete the test DynamoDB table %q. It has been left in your AWS account and may incur charges. (error was %s)", tableName, err) + } +}