add refreshGracePeriod

Refresh calls may have side effects that need to be recorded if it
succeeds, especially common when when WaitForState is called from
resource.Retry.

If the WaitForState timeout is reached and there is a Refresh call
in-flight, wait up to refreshGracePeriod (set to 30s) for it to
complete.
This commit is contained in:
James Bardin 2017-04-19 12:14:23 -04:00
parent 6601b9b8dd
commit af1628eaa4
2 changed files with 63 additions and 9 deletions

View File

@ -5,6 +5,8 @@ import (
"time"
)
var refreshGracePeriod = 30 * time.Second
// StateRefreshFunc is a function type used for StateChangeConf that is
// responsible for refreshing the item being watched for a state change.
//
@ -68,11 +70,13 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
Done bool
}
// read ever result from the refresh loop, waiting for a positive result.Done
// Read every result from the refresh loop, waiting for a positive result.Done.
resCh := make(chan Result, 1)
// cancellation channel for the refresh loop
cancelCh := make(chan struct{})
result := Result{}
go func() {
defer close(resCh)
@ -82,6 +86,9 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
var wait time.Duration
for {
// store the last result
resCh <- result
// wait and watch for cancellation
select {
case <-cancelCh:
@ -94,14 +101,14 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
}
res, currentState, err := conf.Refresh()
result := Result{
result = Result{
Result: res,
State: currentState,
Error: err,
}
resCh <- result
if err != nil {
resCh <- result
return
}
@ -167,6 +174,12 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
}
}
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
// If a poll interval has been specified, choose that interval.
// Otherwise bound the default value.
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
@ -177,12 +190,6 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
} else if wait > 10*time.Second {
wait = 10 * time.Second
}
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
}
log.Printf("[TRACE] Waiting %s before next try", wait)
@ -210,7 +217,39 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
lastResult = r
case <-timeout:
log.Printf("[WARN] WaitForState timeout after %s", conf.Timeout)
log.Printf("[WARN] WaitForState starting %s refresh grace period", refreshGracePeriod)
// cancel the goroutine and start our grace period timer
close(cancelCh)
timeout := time.After(refreshGracePeriod)
// we need a for loop and a label to break on, because we may have
// an extra response value to read, but still want to wait for the
// channel to close.
forSelect:
for {
select {
case r, ok := <-resCh:
if r.Done {
// the last refresh loop reached the desired state
return r.Result, r.Error
}
if !ok {
// the the goroutine returned
break forSelect
}
// target state not reached, save the result for the
// TimeoutError and wait for the channel to close
lastResult = r
case <-timeout:
log.Println("[ERROR] WaitForState exceeded refresh grace period")
break forSelect
}
}
return nil, &TimeoutError{
LastError: lastResult.Error,
LastState: lastResult.State,

View File

@ -25,6 +25,21 @@ func TestRetry(t *testing.T) {
}
}
// make sure a slow StateRefreshFunc is allowed to complete after timeout
func TestRetry_grace(t *testing.T) {
t.Parallel()
f := func() *RetryError {
time.Sleep(1 * time.Second)
return nil
}
err := Retry(10*time.Millisecond, f)
if err != nil {
t.Fatalf("err: %s", err)
}
}
func TestRetry_timeout(t *testing.T) {
t.Parallel()