Add locking for s3 state
Use a DynamoDB table to coodinate state locking in S3. We use a simple strategy here, defining a key containing the value of the bucket/key of the state file as the lock. If the keys exists, the locks fails. TODO: decide if locks should automatically be expired, or require manual intervention.
This commit is contained in:
parent
35307d5a60
commit
10f6d7f30f
|
@ -16,7 +16,7 @@ type lockInfo struct {
|
|||
// Path to the state file
|
||||
Path string
|
||||
// The time the lock was taken
|
||||
Time time.Time
|
||||
Created time.Time
|
||||
// The time this lock expires
|
||||
Expires time.Time
|
||||
// The lock reason passed to State.Lock
|
||||
|
@ -26,7 +26,7 @@ type lockInfo struct {
|
|||
// return the lock info formatted in an error
|
||||
func (l *lockInfo) Err() error {
|
||||
return fmt.Errorf("state file %q locked. created:%s, expires:%s, reason:%s",
|
||||
l.Path, l.Time, l.Expires, l.Reason)
|
||||
l.Path, l.Created, l.Expires, l.Reason)
|
||||
}
|
||||
|
||||
// LocalState manages a state storage that is local to the filesystem.
|
||||
|
@ -227,8 +227,8 @@ func (s *LocalState) writeLockInfo(reason string) error {
|
|||
|
||||
lockInfo := &lockInfo{
|
||||
Path: s.Path,
|
||||
Time: time.Now(),
|
||||
Expires: time.Now().Add(time.Hour),
|
||||
Created: time.Now().UTC(),
|
||||
Expires: time.Now().Add(time.Hour).UTC(),
|
||||
Reason: reason,
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,38 @@ func testClient(t *testing.T, c Client) {
|
|||
}
|
||||
}
|
||||
|
||||
func testClientLocks(t *testing.T, c Client) {
|
||||
s3Client := c.(*S3Client)
|
||||
|
||||
// initial lock
|
||||
if err := s3Client.Lock("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// second lock should fail
|
||||
if err := s3Client.Lock("test"); err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
|
||||
// unlock should work
|
||||
if err := s3Client.Unlock(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// now we should be able to lock again
|
||||
if err := s3Client.Lock("test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// unlock should be idempotent
|
||||
if err := s3Client.Unlock(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s3Client.Unlock(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteClient_noPayload(t *testing.T) {
|
||||
s := &State{
|
||||
Client: nilClient{},
|
||||
|
|
|
@ -7,10 +7,12 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
@ -89,6 +91,7 @@ providing credentials for the AWS S3 remote`))
|
|||
}
|
||||
sess := session.New(awsConfig)
|
||||
nativeClient := s3.New(sess)
|
||||
dynClient := dynamodb.New(sess)
|
||||
|
||||
return &S3Client{
|
||||
nativeClient: nativeClient,
|
||||
|
@ -97,6 +100,8 @@ providing credentials for the AWS S3 remote`))
|
|||
serverSideEncryption: serverSideEncryption,
|
||||
acl: acl,
|
||||
kmsKeyID: kmsKeyID,
|
||||
dynClient: dynClient,
|
||||
lockTable: conf["lock_table"],
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -107,6 +112,8 @@ type S3Client struct {
|
|||
serverSideEncryption bool
|
||||
acl string
|
||||
kmsKeyID string
|
||||
dynClient *dynamodb.DynamoDB
|
||||
lockTable string
|
||||
}
|
||||
|
||||
func (c *S3Client) Get() (*Payload, error) {
|
||||
|
@ -188,3 +195,73 @@ func (c *S3Client) Delete() error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *S3Client) Lock(reason string) error {
|
||||
if c.lockTable == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
stateName := fmt.Sprintf("%s/%s", c.bucketName, c.keyName)
|
||||
|
||||
putParams := &dynamodb.PutItemInput{
|
||||
Item: map[string]*dynamodb.AttributeValue{
|
||||
"LockID": {S: aws.String(stateName)},
|
||||
"Created": {S: aws.String(time.Now().UTC().Format(time.RFC3339))},
|
||||
"Expires": {S: aws.String(time.Now().Add(time.Hour).UTC().Format(time.RFC3339))},
|
||||
"Info": {S: aws.String(reason)},
|
||||
},
|
||||
TableName: aws.String(c.lockTable),
|
||||
ConditionExpression: aws.String("attribute_not_exists(LockID)"),
|
||||
}
|
||||
_, err := c.dynClient.PutItem(putParams)
|
||||
|
||||
if err != nil {
|
||||
getParams := &dynamodb.GetItemInput{
|
||||
Key: map[string]*dynamodb.AttributeValue{
|
||||
"LockID": {S: aws.String(fmt.Sprintf("%s/%s", c.bucketName, c.keyName))},
|
||||
},
|
||||
ProjectionExpression: aws.String("LockID, Created, Expires, Info"),
|
||||
TableName: aws.String(c.lockTable),
|
||||
}
|
||||
|
||||
resp, err := c.dynClient.GetItem(getParams)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3 state file %q locked, cfailed to retrive info: %s", stateName, err)
|
||||
}
|
||||
|
||||
var created, expires, info string
|
||||
if v, ok := resp.Item["Created"]; ok && v.S != nil {
|
||||
created = *v.S
|
||||
}
|
||||
if v, ok := resp.Item["Expires"]; ok && v.S != nil {
|
||||
expires = *v.S
|
||||
}
|
||||
if v, ok := resp.Item["Info"]; ok && v.S != nil {
|
||||
info = *v.S
|
||||
}
|
||||
|
||||
return fmt.Errorf("state file %q locked. created:%s, expires:%s, reason:%s",
|
||||
stateName, created, expires, info)
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *S3Client) Unlock() error {
|
||||
if c.lockTable == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
params := &dynamodb.DeleteItemInput{
|
||||
Key: map[string]*dynamodb.AttributeValue{
|
||||
"LockID": {S: aws.String(fmt.Sprintf("%s/%s", c.bucketName, c.keyName))},
|
||||
},
|
||||
TableName: aws.String(c.lockTable),
|
||||
}
|
||||
_, err := c.dynClient.DeleteItem(params)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
|
@ -123,9 +125,113 @@ func TestS3Client(t *testing.T) {
|
|||
|
||||
_, err := nativeClient.DeleteBucket(deleteBucketReq)
|
||||
if err != nil {
|
||||
t.Logf("WARNING: Failed to delete the test S3 bucket. It has been left in your AWS account and may incur storage charges. (error was %s)", err)
|
||||
t.Logf("WARNING: Failed to delete the test S3 bucket. It may have been left in your AWS account and may incur storage charges. (error was %s)", err)
|
||||
}
|
||||
}()
|
||||
|
||||
testClient(t, client)
|
||||
}
|
||||
|
||||
func TestS3ClientLocks(t *testing.T) {
|
||||
// This test creates a DynamoDB table.
|
||||
// It may incur costs, so it will only run if AWS credential environment
|
||||
// variables are present.
|
||||
|
||||
accessKeyId := os.Getenv("AWS_ACCESS_KEY_ID")
|
||||
if accessKeyId == "" {
|
||||
t.Skipf("skipping; AWS_ACCESS_KEY_ID must be set")
|
||||
}
|
||||
|
||||
regionName := os.Getenv("AWS_DEFAULT_REGION")
|
||||
if regionName == "" {
|
||||
regionName = "us-west-2"
|
||||
}
|
||||
|
||||
bucketName := fmt.Sprintf("terraform-remote-s3-lock-%x", time.Now().Unix())
|
||||
keyName := "testState"
|
||||
|
||||
config := make(map[string]string)
|
||||
config["region"] = regionName
|
||||
config["bucket"] = bucketName
|
||||
config["key"] = keyName
|
||||
config["encrypt"] = "1"
|
||||
config["lock_table"] = bucketName
|
||||
|
||||
client, err := s3Factory(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error for valid config")
|
||||
}
|
||||
|
||||
s3Client := client.(*S3Client)
|
||||
|
||||
// set this up before we try to crate the table, in case we timeout creating it.
|
||||
defer deleteDynaboDBTable(t, s3Client, bucketName)
|
||||
|
||||
createDynamoDBTable(t, s3Client, bucketName)
|
||||
|
||||
testClientLocks(t, client)
|
||||
}
|
||||
|
||||
// create the dynamoDB table, and wait until we can query it.
|
||||
func createDynamoDBTable(t *testing.T, c *S3Client, tableName string) {
|
||||
createInput := &dynamodb.CreateTableInput{
|
||||
AttributeDefinitions: []*dynamodb.AttributeDefinition{
|
||||
{
|
||||
AttributeName: aws.String("LockID"),
|
||||
AttributeType: aws.String("S"),
|
||||
},
|
||||
},
|
||||
KeySchema: []*dynamodb.KeySchemaElement{
|
||||
{
|
||||
AttributeName: aws.String("LockID"),
|
||||
KeyType: aws.String("HASH"),
|
||||
},
|
||||
},
|
||||
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
|
||||
ReadCapacityUnits: aws.Int64(5),
|
||||
WriteCapacityUnits: aws.Int64(5),
|
||||
},
|
||||
TableName: aws.String(tableName),
|
||||
}
|
||||
|
||||
_, err := c.dynClient.CreateTable(createInput)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// now wait until it's ACTIVE
|
||||
start := time.Now()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
describeInput := &dynamodb.DescribeTableInput{
|
||||
TableName: aws.String(tableName),
|
||||
}
|
||||
|
||||
for {
|
||||
resp, err := c.dynClient.DescribeTable(describeInput)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if *resp.Table.TableStatus == "ACTIVE" {
|
||||
return
|
||||
}
|
||||
|
||||
if time.Since(start) > time.Minute {
|
||||
t.Fatalf("timed out creating DynamoDB table %s", tableName)
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func deleteDynaboDBTable(t *testing.T, c *S3Client, tableName string) {
|
||||
params := &dynamodb.DeleteTableInput{
|
||||
TableName: aws.String(tableName),
|
||||
}
|
||||
_, err := c.dynClient.DeleteTable(params)
|
||||
if err != nil {
|
||||
t.Logf("WARNING: Failed to delete the test DynamoDB table %q. It has been left in your AWS account and may incur charges. (error was %s)", tableName, err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue