diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index b11f31ba1..be9873417 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" consulapi "github.com/hashicorp/consul/api" @@ -26,11 +27,25 @@ type RemoteClient struct { Path string GZip bool + mu sync.Mutex + + // The index of the last state we wrote. + // If this is > 0, Put will perform a CAS to ensure that the state wasn't + // changed during the operation. This is important even with locks, because + // if the client loses the lock for some reason, then reacquires it, we + // need to make sure that the state was not modified. + modifyIndex uint64 + consulLock *consulapi.Lock lockCh <-chan struct{} + + info *state.LockInfo } func (c *RemoteClient) Get() (*remote.Payload, error) { + c.mu.Lock() + defer c.mu.Unlock() + pair, _, err := c.Client.KV().Get(c.Path, nil) if err != nil { return nil, err @@ -39,6 +54,8 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { return nil, nil } + c.modifyIndex = pair.ModifyIndex + payload := pair.Value // If the payload starts with 0x1f, it's gzip, not json if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' { @@ -57,6 +74,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { } func (c *RemoteClient) Put(data []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + payload := data if c.GZip { if compressedState, err := compressState(data); err == nil { @@ -67,14 +87,50 @@ func (c *RemoteClient) Put(data []byte) error { } kv := c.Client.KV() - _, err := kv.Put(&consulapi.KVPair{ - Key: c.Path, - Value: payload, - }, nil) - return err + + verb := consulapi.KVCAS + + // Assume a 0 index doesn't need a CAS for now, since we are either + // creating a new state or purposely overwriting one. + if c.modifyIndex == 0 { + verb = consulapi.KVSet + } + + // KV.Put doesn't return the new index, so we use a single operation + // transaction to get the new index with a single request. + txOps := consulapi.KVTxnOps{ + &consulapi.KVTxnOp{ + Verb: verb, + Key: c.Path, + Value: payload, + Index: c.modifyIndex, + }, + } + + ok, resp, _, err := kv.Txn(txOps, nil) + if err != nil { + return err + } + + // transaction was rolled back + if !ok { + return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors) + } + + if len(resp.Results) != 1 { + // this probably shouldn't happen + return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results)) + } + + c.modifyIndex = resp.Results[0].ModifyIndex + + return nil } func (c *RemoteClient) Delete() error { + c.mu.Lock() + defer c.mu.Unlock() + kv := c.Client.KV() _, err := kv.Delete(c.Path, nil) return err @@ -113,6 +169,9 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) { } func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + select { case <-c.lockCh: // We had a lock, but lost it. @@ -125,6 +184,10 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } } + return c.lock(info) +} + +func (c *RemoteClient) lock(info *state.LockInfo) (string, error) { if c.consulLock == nil { opts := &consulapi.LockOptions{ Key: c.Path + lockSuffix, @@ -176,6 +239,9 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } func (c *RemoteClient) Unlock(id string) error { + c.mu.Lock() + defer c.mu.Unlock() + // this doesn't use the lock id, because the lock is tied to the consul client. if c.consulLock == nil || c.lockCh == nil { return nil