Merge pull request #15557 from hashicorp/jbardin/consul-lock-sessions

have the consul client manage the lock session
This commit is contained in:
James Bardin 2017-07-17 10:56:30 -04:00 committed by GitHub
commit 50f412bff4
1 changed files with 122 additions and 37 deletions

View File

@ -3,6 +3,7 @@ package consul
import (
"bytes"
"compress/gzip"
"context"
"crypto/md5"
"encoding/json"
"errors"
@ -20,6 +21,15 @@ import (
const (
lockSuffix = "/.lock"
lockInfoSuffix = "/.lockinfo"
// The Session TTL associated with this lock.
lockSessionTTL = "15s"
// the delay time from when a session is lost to when the
// lock is released by the server
lockDelay = 5 * time.Second
// interval between attempts to reacquire a lost lock
lockReacquireInterval = 2 * time.Second
)
// RemoteClient is a remote client that stores data in Consul.
@ -44,9 +54,15 @@ type RemoteClient struct {
info *state.LockInfo
// cancel the goroutine which is monitoring the lock.
monitorCancel chan struct{}
monitorDone chan struct{}
// cancel our goroutine which is monitoring the lock to automatically
// reacquire it when possible.
monitorCancel context.CancelFunc
monitorWG sync.WaitGroup
// sessionCancel cancels the Context use for session.RenewPeriodic, and is
// called when unlocking, or before creating a new lock if the lock is
// lost.
sessionCancel context.CancelFunc
}
func (c *RemoteClient) Get() (*remote.Payload, error) {
@ -202,25 +218,41 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
return c.lock()
}
// called after a lock is acquired
var testLockHook func()
// the lock implementation.
// Only to be called while holding Client.mu
func (c *RemoteClient) lock() (string, error) {
if c.consulLock == nil {
opts := &consulapi.LockOptions{
Key: c.Path + lockSuffix,
// only wait briefly, so terraform has the choice to fail fast or
// retry as needed.
LockWaitTime: time.Second,
LockTryOnce: true,
}
// We create a new session here, so it can be canceled when the lock is
// lost or unlocked.
lockSession, err := c.createSession()
if err != nil {
return "", err
}
lock, err := c.Client.LockOpts(opts)
if err != nil {
return "", err
}
opts := &consulapi.LockOptions{
Key: c.Path + lockSuffix,
Session: lockSession,
c.consulLock = lock
// only wait briefly, so terraform has the choice to fail fast or
// retry as needed.
LockWaitTime: time.Second,
LockTryOnce: true,
// Don't let the lock monitor give up right away, as it's possible the
// session is still OK. While the session is refreshed at a rate of
// TTL/2, the lock monitor is an idle blocking request and is more
// susceptible to being closed by a lower network layer.
MonitorRetries: 5,
//
// The delay between lock monitor retries.
// While the session has a 15s TTL plus a 5s wait period on a lost
// lock, if we can't get our lock back in 10+ seconds something is
// wrong so we're going to drop the session and start over.
MonitorRetryTime: 2 * time.Second,
}
c.consulLock, err = c.Client.LockOpts(opts)
if err != nil {
return "", err
}
lockErr := &state.LockError{}
@ -239,6 +271,7 @@ func (c *RemoteClient) lock() (string, error) {
}
lockErr.Info = lockInfo
return "", lockErr
}
@ -257,16 +290,22 @@ func (c *RemoteClient) lock() (string, error) {
// If we lose the lock to due communication issues with the consul agent,
// attempt to immediately reacquire the lock. Put will verify the integrity
// of the state by using a CAS operation.
c.monitorCancel = make(chan struct{})
c.monitorDone = make(chan struct{})
go func(cancel, done chan struct{}) {
defer func() {
close(done)
}()
ctx, cancel := context.WithCancel(context.Background())
c.monitorCancel = cancel
c.monitorWG.Add(1)
go func() {
defer c.monitorWG.Done()
select {
case <-c.lockCh:
log.Println("[ERROR] lost consul lock")
for {
c.mu.Lock()
// We lost our lock, so we need to cancel the session too.
// The CancelFunc is only replaced while holding Client.mu, so
// this is safe to call here. This will be replaced by the
// lock() call below.
c.sessionCancel()
c.consulLock = nil
_, err := c.lock()
c.mu.Unlock()
@ -276,11 +315,11 @@ func (c *RemoteClient) lock() (string, error) {
// terraform is running. There may be changes in progress,
// so there's no use in aborting. Either we eventually
// reacquire the lock, or a Put will fail on a CAS.
log.Printf("[ERROR] attempting to reacquire lock: %s", err)
time.Sleep(time.Second)
log.Printf("[ERROR] could not reacquire lock: %s", err)
time.Sleep(lockReacquireInterval)
select {
case <-cancel:
case <-ctx.Done():
return
default:
}
@ -292,10 +331,10 @@ func (c *RemoteClient) lock() (string, error) {
return
}
case <-cancel:
case <-ctx.Done():
return
}
}(c.monitorCancel, c.monitorDone)
}()
if testLockHook != nil {
testLockHook()
@ -304,6 +343,42 @@ func (c *RemoteClient) lock() (string, error) {
return c.info.ID, nil
}
// called after a lock is acquired
var testLockHook func()
func (c *RemoteClient) createSession() (string, error) {
// create the context first. Even if the session creation fails, we assume
// that the CancelFunc is always callable.
ctx, cancel := context.WithCancel(context.Background())
c.sessionCancel = cancel
session := c.Client.Session()
se := &consulapi.SessionEntry{
Name: consulapi.DefaultLockSessionName,
TTL: lockSessionTTL,
LockDelay: lockDelay,
}
id, _, err := session.Create(se, nil)
if err != nil {
return "", err
}
log.Println("[INFO] created consul lock session", id)
// keep the session renewed
// we need an adapter to convert the session Done() channel to a
// non-directional channel to satisfy the RenewPeriodic signature.
done := make(chan struct{})
go func() {
<-ctx.Done()
close(done)
}()
go session.RenewPeriodic(lockSessionTTL, id, nil, done)
return id, nil
}
func (c *RemoteClient) Unlock(id string) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -315,17 +390,27 @@ func (c *RemoteClient) Unlock(id string) error {
return c.unlock(id)
}
// the unlock implementation.
// Only to be called while holding Client.mu
func (c *RemoteClient) unlock(id string) error {
// cancel our monitoring goroutine
if c.monitorCancel != nil {
close(c.monitorCancel)
}
// this doesn't use the lock id, because the lock is tied to the consul client.
if c.consulLock == nil || c.lockCh == nil {
return nil
}
// cancel our monitoring goroutine
c.monitorCancel()
defer func() {
c.consulLock = nil
// The consul session is only used for this single lock, so cancel it
// after we unlock.
// The session is only created and replaced holding Client.mu, so the
// CancelFunc must be non-nil.
c.sessionCancel()
}()
select {
case <-c.lockCh:
return errors.New("consul lock was lost")
@ -344,9 +429,9 @@ func (c *RemoteClient) unlock(id string) error {
errs = multierror.Append(errs, err)
}
// the monitoring goroutine may be in a select on this chan, so we need to
// the monitoring goroutine may be in a select on the lockCh, so we need to
// wait for it to return before changing the value.
<-c.monitorDone
c.monitorWG.Wait()
c.lockCh = nil
// This is only cleanup, and will fail if the lock was immediately taken by