Merge pull request #13778 from hashicorp/jbardin/GH-13617

improve resource.WaitForState and add refreshGracePeriod
This commit is contained in:
James Bardin 2017-04-19 18:23:00 -04:00 committed by GitHub
commit f5cda342f7
3 changed files with 211 additions and 45 deletions

View File

@ -2,10 +2,11 @@ package resource
import ( import (
"log" "log"
"sync/atomic"
"time" "time"
) )
var refreshGracePeriod = 30 * time.Second
// StateRefreshFunc is a function type used for StateChangeConf that is // StateRefreshFunc is a function type used for StateChangeConf that is
// responsible for refreshing the item being watched for a state change. // responsible for refreshing the item being watched for a state change.
// //
@ -62,58 +63,76 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
conf.ContinuousTargetOccurence = 1 conf.ContinuousTargetOccurence = 1
} }
// We can't safely read the result values if we timeout, so store them in
// an atomic.Value
type Result struct { type Result struct {
Result interface{} Result interface{}
State string State string
Error error Error error
Done bool
} }
var lastResult atomic.Value
lastResult.Store(Result{})
doneCh := make(chan struct{}) // Read every result from the refresh loop, waiting for a positive result.Done.
resCh := make(chan Result, 1)
// cancellation channel for the refresh loop
cancelCh := make(chan struct{})
result := Result{}
go func() { go func() {
defer close(doneCh) defer close(resCh)
// Wait for the delay
time.Sleep(conf.Delay) time.Sleep(conf.Delay)
wait := 100 * time.Millisecond // start with 0 delay for the first loop
var wait time.Duration
for { for {
// store the last result
resCh <- result
// wait and watch for cancellation
select {
case <-cancelCh:
return
case <-time.After(wait):
// first round had no wait
if wait == 0 {
wait = 100 * time.Millisecond
}
}
res, currentState, err := conf.Refresh() res, currentState, err := conf.Refresh()
result := Result{ result = Result{
Result: res, Result: res,
State: currentState, State: currentState,
Error: err, Error: err,
} }
lastResult.Store(result)
if err != nil { if err != nil {
resCh <- result
return return
} }
// If we're waiting for the absence of a thing, then return // If we're waiting for the absence of a thing, then return
if res == nil && len(conf.Target) == 0 { if res == nil && len(conf.Target) == 0 {
targetOccurence += 1 targetOccurence++
if conf.ContinuousTargetOccurence == targetOccurence { if conf.ContinuousTargetOccurence == targetOccurence {
result.Done = true
resCh <- result
return return
} else {
continue
} }
continue
} }
if res == nil { if res == nil {
// If we didn't find the resource, check if we have been // If we didn't find the resource, check if we have been
// not finding it for awhile, and if so, report an error. // not finding it for awhile, and if so, report an error.
notfoundTick += 1 notfoundTick++
if notfoundTick > conf.NotFoundChecks { if notfoundTick > conf.NotFoundChecks {
result.Error = &NotFoundError{ result.Error = &NotFoundError{
LastError: err, LastError: err,
Retries: notfoundTick, Retries: notfoundTick,
} }
lastResult.Store(result) resCh <- result
return return
} }
} else { } else {
@ -124,12 +143,13 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
for _, allowed := range conf.Target { for _, allowed := range conf.Target {
if currentState == allowed { if currentState == allowed {
found = true found = true
targetOccurence += 1 targetOccurence++
if conf.ContinuousTargetOccurence == targetOccurence { if conf.ContinuousTargetOccurence == targetOccurence {
result.Done = true
resCh <- result
return return
} else {
continue
} }
continue
} }
} }
@ -147,11 +167,17 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
State: result.State, State: result.State,
ExpectedState: conf.Target, ExpectedState: conf.Target,
} }
lastResult.Store(result) resCh <- result
return return
} }
} }
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
// If a poll interval has been specified, choose that interval. // If a poll interval has been specified, choose that interval.
// Otherwise bound the default value. // Otherwise bound the default value.
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second { if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
@ -165,27 +191,69 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
} }
log.Printf("[TRACE] Waiting %s before next try", wait) log.Printf("[TRACE] Waiting %s before next try", wait)
time.Sleep(wait)
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
} }
}() }()
select { // store the last value result from the refresh loop
case <-doneCh: lastResult := Result{}
r := lastResult.Load().(Result)
return r.Result, r.Error timeout := time.After(conf.Timeout)
case <-time.After(conf.Timeout): for {
r := lastResult.Load().(Result) select {
return nil, &TimeoutError{ case r, ok := <-resCh:
LastError: r.Error, // channel closed, so return the last result
LastState: r.State, if !ok {
Timeout: conf.Timeout, return lastResult.Result, lastResult.Error
ExpectedState: conf.Target, }
// we reached the intended state
if r.Done {
return r.Result, r.Error
}
// still waiting, store the last result
lastResult = r
case <-timeout:
log.Printf("[WARN] WaitForState timeout after %s", conf.Timeout)
log.Printf("[WARN] WaitForState starting %s refresh grace period", refreshGracePeriod)
// cancel the goroutine and start our grace period timer
close(cancelCh)
timeout := time.After(refreshGracePeriod)
// we need a for loop and a label to break on, because we may have
// an extra response value to read, but still want to wait for the
// channel to close.
forSelect:
for {
select {
case r, ok := <-resCh:
if r.Done {
// the last refresh loop reached the desired state
return r.Result, r.Error
}
if !ok {
// the goroutine returned
break forSelect
}
// target state not reached, save the result for the
// TimeoutError and wait for the channel to close
lastResult = r
case <-timeout:
log.Println("[ERROR] WaitForState exceeded refresh grace period")
break forSelect
}
}
return nil, &TimeoutError{
LastError: lastResult.Error,
LastState: lastResult.State,
Timeout: conf.Timeout,
ExpectedState: conf.Target,
}
} }
} }
} }

View File

@ -2,6 +2,8 @@ package resource
import ( import (
"errors" "errors"
"strings"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
@ -109,11 +111,18 @@ func TestWaitForState_inconsistent_positive(t *testing.T) {
} }
func TestWaitForState_inconsistent_negative(t *testing.T) { func TestWaitForState_inconsistent_negative(t *testing.T) {
refreshCount := int64(0)
f := InconsistentStateRefreshFunc()
refresh := func() (interface{}, string, error) {
atomic.AddInt64(&refreshCount, 1)
return f()
}
conf := &StateChangeConf{ conf := &StateChangeConf{
Pending: []string{"replicating"}, Pending: []string{"replicating"},
Target: []string{"done"}, Target: []string{"done"},
Refresh: InconsistentStateRefreshFunc(), Refresh: refresh,
Timeout: 90 * time.Millisecond, Timeout: 85 * time.Millisecond,
PollInterval: 10 * time.Millisecond, PollInterval: 10 * time.Millisecond,
ContinuousTargetOccurence: 4, ContinuousTargetOccurence: 4,
} }
@ -123,13 +132,27 @@ func TestWaitForState_inconsistent_negative(t *testing.T) {
if err == nil { if err == nil {
t.Fatal("Expected timeout error. No error returned.") t.Fatal("Expected timeout error. No error returned.")
} }
expectedErr := "timeout while waiting for state to become 'done' (last state: 'done', timeout: 90ms)"
if err.Error() != expectedErr { // we can't guarantee the exact number of refresh calls in the tests by
t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) // timing them, but we want to make sure the test at least went through th
// required states.
if atomic.LoadInt64(&refreshCount) < 6 {
t.Fatal("refreshed called too few times")
}
expectedErr := "timeout while waiting for state to become 'done'"
if !strings.HasPrefix(err.Error(), expectedErr) {
t.Fatalf("error prefix doesn't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error())
} }
} }
func TestWaitForState_timeout(t *testing.T) { func TestWaitForState_timeout(t *testing.T) {
old := refreshGracePeriod
refreshGracePeriod = 5 * time.Millisecond
defer func() {
refreshGracePeriod = old
}()
conf := &StateChangeConf{ conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"}, Pending: []string{"pending", "incomplete"},
Target: []string{"running"}, Target: []string{"running"},
@ -148,6 +171,62 @@ func TestWaitForState_timeout(t *testing.T) {
t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error())
} }
if obj != nil {
t.Fatalf("should not return obj")
}
}
// Make sure a timeout actually cancels the refresh goroutine and waits for its
// return.
func TestWaitForState_cancel(t *testing.T) {
// make this refresh func block until we cancel it
cancel := make(chan struct{})
refresh := func() (interface{}, string, error) {
<-cancel
return nil, "pending", nil
}
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: []string{"running"},
Refresh: refresh,
Timeout: 10 * time.Millisecond,
PollInterval: 10 * time.Second,
}
var obj interface{}
var err error
waitDone := make(chan struct{})
go func() {
defer close(waitDone)
obj, err = conf.WaitForState()
}()
// make sure WaitForState is blocked
select {
case <-waitDone:
t.Fatal("WaitForState returned too early")
case <-time.After(10 * time.Millisecond):
}
// unlock the refresh function
close(cancel)
// make sure WaitForState returns
select {
case <-waitDone:
case <-time.After(time.Second):
t.Fatal("WaitForState didn't return after refresh finished")
}
if err == nil {
t.Fatal("Expected timeout error. No error returned.")
}
expectedErr := "timeout while waiting for state to become 'running'"
if !strings.HasPrefix(err.Error(), expectedErr) {
t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error())
}
if obj != nil { if obj != nil {
t.Fatalf("should not return obj") t.Fatalf("should not return obj")
} }

View File

@ -25,6 +25,21 @@ func TestRetry(t *testing.T) {
} }
} }
// make sure a slow StateRefreshFunc is allowed to complete after timeout
func TestRetry_grace(t *testing.T) {
t.Parallel()
f := func() *RetryError {
time.Sleep(1 * time.Second)
return nil
}
err := Retry(10*time.Millisecond, f)
if err != nil {
t.Fatalf("err: %s", err)
}
}
func TestRetry_timeout(t *testing.T) { func TestRetry_timeout(t *testing.T) {
t.Parallel() t.Parallel()
@ -39,14 +54,18 @@ func TestRetry_timeout(t *testing.T) {
} }
func TestRetry_hang(t *testing.T) { func TestRetry_hang(t *testing.T) {
t.Parallel() old := refreshGracePeriod
refreshGracePeriod = 50 * time.Millisecond
defer func() {
refreshGracePeriod = old
}()
f := func() *RetryError { f := func() *RetryError {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
return nil return nil
} }
err := Retry(1*time.Second, f) err := Retry(50*time.Millisecond, f)
if err == nil { if err == nil {
t.Fatal("should error") t.Fatal("should error")
} }