Merge pull request #11787 from hashicorp/jbardin/state-locking
Add consul state locking
This commit is contained in:
commit
1448cb66fb
|
@ -2,15 +2,29 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/errwrap"
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
|
"github.com/hashicorp/terraform/state"
|
||||||
"github.com/hashicorp/terraform/state/remote"
|
"github.com/hashicorp/terraform/state/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
lockSuffix = "/.lock"
|
||||||
|
lockInfoSuffix = "/.lockinfo"
|
||||||
|
)
|
||||||
|
|
||||||
// RemoteClient is a remote client that stores data in Consul.
|
// RemoteClient is a remote client that stores data in Consul.
|
||||||
type RemoteClient struct {
|
type RemoteClient struct {
|
||||||
Client *consulapi.Client
|
Client *consulapi.Client
|
||||||
Path string
|
Path string
|
||||||
|
|
||||||
|
consulLock *consulapi.Lock
|
||||||
|
lockCh <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||||
|
@ -43,3 +57,121 @@ func (c *RemoteClient) Delete() error {
|
||||||
_, err := kv.Delete(c.Path, nil)
|
_, err := kv.Delete(c.Path, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) putLockInfo(info string) error {
|
||||||
|
li := &state.LockInfo{
|
||||||
|
Path: c.Path,
|
||||||
|
Created: time.Now().UTC(),
|
||||||
|
Info: info,
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := json.Marshal(li)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
kv := c.Client.KV()
|
||||||
|
_, err = kv.Put(&consulapi.KVPair{
|
||||||
|
Key: c.Path + lockInfoSuffix,
|
||||||
|
Value: js,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
|
||||||
|
path := c.Path + lockInfoSuffix
|
||||||
|
pair, _, err := c.Client.KV().Get(path, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pair == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
li := &state.LockInfo{}
|
||||||
|
err = json.Unmarshal(pair.Value, li)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errwrap.Wrapf("error unmarshaling lock info: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return li, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) Lock(info string) error {
|
||||||
|
select {
|
||||||
|
case <-c.lockCh:
|
||||||
|
// We had a lock, but lost it.
|
||||||
|
// Since we typically only call lock once, we shouldn't ever see this.
|
||||||
|
return errors.New("lost consul lock")
|
||||||
|
default:
|
||||||
|
if c.lockCh != nil {
|
||||||
|
// we have an active lock already
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.consulLock == nil {
|
||||||
|
opts := &consulapi.LockOptions{
|
||||||
|
Key: c.Path + lockSuffix,
|
||||||
|
// We currently don't procide any options to block terraform and
|
||||||
|
// retry lock acquisition, but we can wait briefly in case the
|
||||||
|
// lock is about to be freed.
|
||||||
|
LockWaitTime: time.Second,
|
||||||
|
LockTryOnce: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
lock, err := c.Client.LockOpts(opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.consulLock = lock
|
||||||
|
}
|
||||||
|
|
||||||
|
lockCh, err := c.consulLock.Lock(make(chan struct{}))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if lockCh == nil {
|
||||||
|
lockInfo, e := c.getLockInfo()
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
return lockInfo.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
c.lockCh = lockCh
|
||||||
|
|
||||||
|
err = c.putLockInfo(info)
|
||||||
|
if err != nil {
|
||||||
|
err = multierror.Append(err, c.Unlock())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) Unlock() error {
|
||||||
|
if c.consulLock == nil || c.lockCh == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.lockCh:
|
||||||
|
return errors.New("consul lock was lost")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.consulLock.Unlock()
|
||||||
|
c.lockCh = nil
|
||||||
|
|
||||||
|
kv := c.Client.KV()
|
||||||
|
_, delErr := kv.Delete(c.Path+lockInfoSuffix, nil)
|
||||||
|
if delErr != nil {
|
||||||
|
err = multierror.Append(err, delErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -2,12 +2,12 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/terraform/backend"
|
"github.com/hashicorp/terraform/backend"
|
||||||
"github.com/hashicorp/terraform/backend/remote-state"
|
"github.com/hashicorp/terraform/backend/remote-state"
|
||||||
"github.com/hashicorp/terraform/helper/acctest"
|
|
||||||
"github.com/hashicorp/terraform/state/remote"
|
"github.com/hashicorp/terraform/state/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,14 +16,47 @@ func TestRemoteClient_impl(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoteClient(t *testing.T) {
|
func TestRemoteClient(t *testing.T) {
|
||||||
acctest.RemoteTestPrecheck(t)
|
addr := os.Getenv("CONSUL_HTTP_ADDR")
|
||||||
|
if addr == "" {
|
||||||
|
t.Log("consul tests require CONSUL_HTTP_ADDR")
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
// Get the backend
|
// Get the backend
|
||||||
b := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
b := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
"address": "demo.consul.io:80",
|
"address": addr,
|
||||||
"path": fmt.Sprintf("tf-unit/%s", time.Now().String()),
|
"path": fmt.Sprintf("tf-unit/%s", time.Now().String()),
|
||||||
})
|
})
|
||||||
|
|
||||||
// Test
|
// Test
|
||||||
remotestate.TestClient(t, b)
|
remotestate.TestClient(t, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsul_stateLock(t *testing.T) {
|
||||||
|
addr := os.Getenv("CONSUL_HTTP_ADDR")
|
||||||
|
if addr == "" {
|
||||||
|
t.Log("consul lock tests require CONSUL_HTTP_ADDR")
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
path := fmt.Sprintf("tf-unit/%s", time.Now().String())
|
||||||
|
|
||||||
|
// create 2 instances to get 2 remote.Clients
|
||||||
|
sA, err := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": addr,
|
||||||
|
"path": path,
|
||||||
|
}).State()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sB, err := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": addr,
|
||||||
|
"path": path,
|
||||||
|
}).State()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
remote.TestRemoteLocks(t, sA.(*remote.State).Client, sB.(*remote.State).Client)
|
||||||
|
}
|
||||||
|
|
|
@ -14,19 +14,27 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// lock metadata structure for local locks
|
// lock metadata structure for local locks
|
||||||
type lockInfo struct {
|
type LockInfo struct {
|
||||||
// Path to the state file
|
// Path to the state file
|
||||||
Path string
|
Path string
|
||||||
// The time the lock was taken
|
// The time the lock was taken
|
||||||
Created time.Time
|
Created time.Time
|
||||||
// Extra info passed to State.Lock
|
// Extra info passed to State.Lock
|
||||||
Reason string
|
Info string
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the lock info formatted in an error
|
// return the lock info formatted in an error
|
||||||
func (l *lockInfo) Err() error {
|
func (l *LockInfo) Err() error {
|
||||||
return fmt.Errorf("state file %q locked. created:%s, reason:%s",
|
return fmt.Errorf("state locked. path:%q, created:%s, info:%q",
|
||||||
l.Path, l.Created, l.Reason)
|
l.Path, l.Created, l.Info)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LockInfo) String() string {
|
||||||
|
js, err := json.Marshal(l)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return string(js)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalState manages a state storage that is local to the filesystem.
|
// LocalState manages a state storage that is local to the filesystem.
|
||||||
|
@ -224,14 +232,14 @@ func (s *LocalState) lockInfoPath() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// lockInfo returns the data in a lock info file
|
// lockInfo returns the data in a lock info file
|
||||||
func (s *LocalState) lockInfo() (*lockInfo, error) {
|
func (s *LocalState) lockInfo() (*LockInfo, error) {
|
||||||
path := s.lockInfoPath()
|
path := s.lockInfoPath()
|
||||||
infoData, err := ioutil.ReadFile(path)
|
infoData, err := ioutil.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := lockInfo{}
|
info := LockInfo{}
|
||||||
err = json.Unmarshal(infoData, &info)
|
err = json.Unmarshal(infoData, &info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("state file %q locked, but could not unmarshal lock info: %s", s.Path, err)
|
return nil, fmt.Errorf("state file %q locked, but could not unmarshal lock info: %s", s.Path, err)
|
||||||
|
@ -240,13 +248,13 @@ func (s *LocalState) lockInfo() (*lockInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write a new lock info file
|
// write a new lock info file
|
||||||
func (s *LocalState) writeLockInfo(reason string) error {
|
func (s *LocalState) writeLockInfo(info string) error {
|
||||||
path := s.lockInfoPath()
|
path := s.lockInfoPath()
|
||||||
|
|
||||||
lockInfo := &lockInfo{
|
lockInfo := &LockInfo{
|
||||||
Path: s.Path,
|
Path: s.Path,
|
||||||
Created: time.Now().UTC(),
|
Created: time.Now().UTC(),
|
||||||
Reason: reason,
|
Info: info,
|
||||||
}
|
}
|
||||||
|
|
||||||
infoData, err := json.Marshal(lockInfo)
|
infoData, err := json.Marshal(lockInfo)
|
||||||
|
|
|
@ -40,7 +40,7 @@ func TestLocalStateLocks(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if lockInfo.Reason != "test" {
|
if lockInfo.Info != "test" {
|
||||||
t.Fatalf("invalid lock info %#v\n", lockInfo)
|
t.Fatalf("invalid lock info %#v\n", lockInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,38 +44,6 @@ func testClient(t *testing.T, c Client) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testClientLocks(t *testing.T, c Client) {
|
|
||||||
s3Client := c.(*S3Client)
|
|
||||||
|
|
||||||
// initial lock
|
|
||||||
if err := s3Client.Lock("test"); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// second lock should fail
|
|
||||||
if err := s3Client.Lock("test"); err == nil {
|
|
||||||
t.Fatal("expected error, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlock should work
|
|
||||||
if err := s3Client.Unlock(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now we should be able to lock again
|
|
||||||
if err := s3Client.Lock("test"); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlock should be idempotent
|
|
||||||
if err := s3Client.Unlock(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err := s3Client.Unlock(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRemoteClient_noPayload(t *testing.T) {
|
func TestRemoteClient_noPayload(t *testing.T) {
|
||||||
s := &State{
|
s := &State{
|
||||||
Client: nilClient{},
|
Client: nilClient{},
|
||||||
|
|
|
@ -2,6 +2,7 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -17,6 +18,7 @@ import (
|
||||||
"github.com/hashicorp/go-cleanhttp"
|
"github.com/hashicorp/go-cleanhttp"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
terraformAws "github.com/hashicorp/terraform/builtin/providers/aws"
|
terraformAws "github.com/hashicorp/terraform/builtin/providers/aws"
|
||||||
|
"github.com/hashicorp/terraform/state"
|
||||||
)
|
)
|
||||||
|
|
||||||
func s3Factory(conf map[string]string) (Client, error) {
|
func s3Factory(conf map[string]string) (Client, error) {
|
||||||
|
@ -196,18 +198,22 @@ func (c *S3Client) Delete() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3Client) Lock(reason string) error {
|
func (c *S3Client) Lock(info string) error {
|
||||||
if c.lockTable == "" {
|
if c.lockTable == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
stateName := fmt.Sprintf("%s/%s", c.bucketName, c.keyName)
|
stateName := fmt.Sprintf("%s/%s", c.bucketName, c.keyName)
|
||||||
|
lockInfo := &state.LockInfo{
|
||||||
|
Path: stateName,
|
||||||
|
Created: time.Now().UTC(),
|
||||||
|
Info: info,
|
||||||
|
}
|
||||||
|
|
||||||
putParams := &dynamodb.PutItemInput{
|
putParams := &dynamodb.PutItemInput{
|
||||||
Item: map[string]*dynamodb.AttributeValue{
|
Item: map[string]*dynamodb.AttributeValue{
|
||||||
"LockID": {S: aws.String(stateName)},
|
"LockID": {S: aws.String(stateName)},
|
||||||
"Created": {S: aws.String(time.Now().UTC().Format(time.RFC3339))},
|
"Info": {S: aws.String(lockInfo.String())},
|
||||||
"Info": {S: aws.String(reason)},
|
|
||||||
},
|
},
|
||||||
TableName: aws.String(c.lockTable),
|
TableName: aws.String(c.lockTable),
|
||||||
ConditionExpression: aws.String("attribute_not_exists(LockID)"),
|
ConditionExpression: aws.String("attribute_not_exists(LockID)"),
|
||||||
|
@ -225,20 +231,21 @@ func (c *S3Client) Lock(reason string) error {
|
||||||
|
|
||||||
resp, err := c.dynClient.GetItem(getParams)
|
resp, err := c.dynClient.GetItem(getParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("s3 state file %q locked, cfailed to retrive info: %s", stateName, err)
|
return fmt.Errorf("s3 state file %q locked, failed to retrive info: %s", stateName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var created, info string
|
var infoData string
|
||||||
if v, ok := resp.Item["Created"]; ok && v.S != nil {
|
|
||||||
created = *v.S
|
|
||||||
}
|
|
||||||
if v, ok := resp.Item["Info"]; ok && v.S != nil {
|
if v, ok := resp.Item["Info"]; ok && v.S != nil {
|
||||||
info = *v.S
|
infoData = *v.S
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("state file %q locked. created:%s, reason:%s",
|
lockInfo = &state.LockInfo{}
|
||||||
stateName, created, info)
|
err = json.Unmarshal([]byte(infoData), lockInfo)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("s3 state file %q locked, failed get lock info: %s", stateName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return lockInfo.Err()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,7 +169,7 @@ func TestS3ClientLocks(t *testing.T) {
|
||||||
|
|
||||||
createDynamoDBTable(t, s3Client, bucketName)
|
createDynamoDBTable(t, s3Client, bucketName)
|
||||||
|
|
||||||
testClientLocks(t, client)
|
TestRemoteLocks(t, client, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the dynamoDB table, and wait until we can query it.
|
// create the dynamoDB table, and wait until we can query it.
|
||||||
|
|
|
@ -41,3 +41,47 @@ func TestClient(t *testing.T, c Client) {
|
||||||
t.Fatalf("bad: %#v", p)
|
t.Fatalf("bad: %#v", p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test the lock implementation for a remote.Client.
|
||||||
|
// This test requires 2 client instances, in oder to have multiple remote
|
||||||
|
// clients since some implementations may tie the client to the lock, or may
|
||||||
|
// have reentrant locks.
|
||||||
|
func TestRemoteLocks(t *testing.T, a, b Client) {
|
||||||
|
lockerA, ok := a.(state.Locker)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("client A not a state.Locker")
|
||||||
|
}
|
||||||
|
|
||||||
|
lockerB, ok := b.(state.Locker)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("client B not a state.Locker")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerA.Lock("test client A"); err != nil {
|
||||||
|
t.Fatal("unable to get initial lock:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerB.Lock("test client B"); err == nil {
|
||||||
|
lockerA.Unlock()
|
||||||
|
t.Fatal("client B obtained lock while held by client A")
|
||||||
|
} else {
|
||||||
|
t.Log("lock info error:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerA.Unlock(); err != nil {
|
||||||
|
t.Fatal("error unlocking client A", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerB.Lock("test client B"); err != nil {
|
||||||
|
t.Fatal("unable to obtain lock from client B")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerB.Unlock(); err != nil {
|
||||||
|
t.Fatal("error unlocking client B:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// unlock should be repeatable
|
||||||
|
if err := lockerA.Unlock(); err != nil {
|
||||||
|
t.Fatal("Unlock error from client A when state was not locked:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue