diff --git a/helper/resource/state.go b/helper/resource/state.go index 7473a105e..37c586a11 100644 --- a/helper/resource/state.go +++ b/helper/resource/state.go @@ -2,10 +2,11 @@ package resource import ( "log" - "sync/atomic" "time" ) +var refreshGracePeriod = 30 * time.Second + // StateRefreshFunc is a function type used for StateChangeConf that is // responsible for refreshing the item being watched for a state change. // @@ -62,58 +63,76 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { conf.ContinuousTargetOccurence = 1 } - // We can't safely read the result values if we timeout, so store them in - // an atomic.Value type Result struct { Result interface{} State string Error error + Done bool } - var lastResult atomic.Value - lastResult.Store(Result{}) - doneCh := make(chan struct{}) + // Read every result from the refresh loop, waiting for a positive result.Done. + resCh := make(chan Result, 1) + // cancellation channel for the refresh loop + cancelCh := make(chan struct{}) + + result := Result{} + go func() { - defer close(doneCh) + defer close(resCh) - // Wait for the delay time.Sleep(conf.Delay) - wait := 100 * time.Millisecond + // start with 0 delay for the first loop + var wait time.Duration for { + // store the last result + resCh <- result + + // wait and watch for cancellation + select { + case <-cancelCh: + return + case <-time.After(wait): + // first round had no wait + if wait == 0 { + wait = 100 * time.Millisecond + } + } + res, currentState, err := conf.Refresh() - result := Result{ + result = Result{ Result: res, State: currentState, Error: err, } - lastResult.Store(result) if err != nil { + resCh <- result return } // If we're waiting for the absence of a thing, then return if res == nil && len(conf.Target) == 0 { - targetOccurence += 1 + targetOccurence++ if conf.ContinuousTargetOccurence == targetOccurence { + result.Done = true + resCh <- result return - } else { - continue } + continue } if res == nil { // If we didn't find the resource, check if we have been // not finding it for awhile, and if so, report an error. - notfoundTick += 1 + notfoundTick++ if notfoundTick > conf.NotFoundChecks { result.Error = &NotFoundError{ LastError: err, Retries: notfoundTick, } - lastResult.Store(result) + resCh <- result return } } else { @@ -124,12 +143,13 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { for _, allowed := range conf.Target { if currentState == allowed { found = true - targetOccurence += 1 + targetOccurence++ if conf.ContinuousTargetOccurence == targetOccurence { + result.Done = true + resCh <- result return - } else { - continue } + continue } } @@ -147,11 +167,17 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { State: result.State, ExpectedState: conf.Target, } - lastResult.Store(result) + resCh <- result return } } + // Wait between refreshes using exponential backoff, except when + // waiting for the target state to reoccur. + if targetOccurence == 0 { + wait *= 2 + } + // If a poll interval has been specified, choose that interval. // Otherwise bound the default value. if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second { @@ -165,27 +191,69 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { } log.Printf("[TRACE] Waiting %s before next try", wait) - time.Sleep(wait) - - // Wait between refreshes using exponential backoff, except when - // waiting for the target state to reoccur. - if targetOccurence == 0 { - wait *= 2 - } } }() - select { - case <-doneCh: - r := lastResult.Load().(Result) - return r.Result, r.Error - case <-time.After(conf.Timeout): - r := lastResult.Load().(Result) - return nil, &TimeoutError{ - LastError: r.Error, - LastState: r.State, - Timeout: conf.Timeout, - ExpectedState: conf.Target, + // store the last value result from the refresh loop + lastResult := Result{} + + timeout := time.After(conf.Timeout) + for { + select { + case r, ok := <-resCh: + // channel closed, so return the last result + if !ok { + return lastResult.Result, lastResult.Error + } + + // we reached the intended state + if r.Done { + return r.Result, r.Error + } + + // still waiting, store the last result + lastResult = r + + case <-timeout: + log.Printf("[WARN] WaitForState timeout after %s", conf.Timeout) + log.Printf("[WARN] WaitForState starting %s refresh grace period", refreshGracePeriod) + + // cancel the goroutine and start our grace period timer + close(cancelCh) + timeout := time.After(refreshGracePeriod) + + // we need a for loop and a label to break on, because we may have + // an extra response value to read, but still want to wait for the + // channel to close. + forSelect: + for { + select { + case r, ok := <-resCh: + if r.Done { + // the last refresh loop reached the desired state + return r.Result, r.Error + } + + if !ok { + // the goroutine returned + break forSelect + } + + // target state not reached, save the result for the + // TimeoutError and wait for the channel to close + lastResult = r + case <-timeout: + log.Println("[ERROR] WaitForState exceeded refresh grace period") + break forSelect + } + } + + return nil, &TimeoutError{ + LastError: lastResult.Error, + LastState: lastResult.State, + Timeout: conf.Timeout, + ExpectedState: conf.Target, + } } } } diff --git a/helper/resource/state_test.go b/helper/resource/state_test.go index 4b4731351..6d6b329a1 100644 --- a/helper/resource/state_test.go +++ b/helper/resource/state_test.go @@ -2,6 +2,8 @@ package resource import ( "errors" + "strings" + "sync/atomic" "testing" "time" ) @@ -109,11 +111,18 @@ func TestWaitForState_inconsistent_positive(t *testing.T) { } func TestWaitForState_inconsistent_negative(t *testing.T) { + refreshCount := int64(0) + f := InconsistentStateRefreshFunc() + refresh := func() (interface{}, string, error) { + atomic.AddInt64(&refreshCount, 1) + return f() + } + conf := &StateChangeConf{ Pending: []string{"replicating"}, Target: []string{"done"}, - Refresh: InconsistentStateRefreshFunc(), - Timeout: 90 * time.Millisecond, + Refresh: refresh, + Timeout: 85 * time.Millisecond, PollInterval: 10 * time.Millisecond, ContinuousTargetOccurence: 4, } @@ -123,13 +132,27 @@ func TestWaitForState_inconsistent_negative(t *testing.T) { if err == nil { t.Fatal("Expected timeout error. No error returned.") } - expectedErr := "timeout while waiting for state to become 'done' (last state: 'done', timeout: 90ms)" - if err.Error() != expectedErr { - t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) + + // we can't guarantee the exact number of refresh calls in the tests by + // timing them, but we want to make sure the test at least went through th + // required states. + if atomic.LoadInt64(&refreshCount) < 6 { + t.Fatal("refreshed called too few times") + } + + expectedErr := "timeout while waiting for state to become 'done'" + if !strings.HasPrefix(err.Error(), expectedErr) { + t.Fatalf("error prefix doesn't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) } } func TestWaitForState_timeout(t *testing.T) { + old := refreshGracePeriod + refreshGracePeriod = 5 * time.Millisecond + defer func() { + refreshGracePeriod = old + }() + conf := &StateChangeConf{ Pending: []string{"pending", "incomplete"}, Target: []string{"running"}, @@ -148,6 +171,62 @@ func TestWaitForState_timeout(t *testing.T) { t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) } + if obj != nil { + t.Fatalf("should not return obj") + } +} + +// Make sure a timeout actually cancels the refresh goroutine and waits for its +// return. +func TestWaitForState_cancel(t *testing.T) { + // make this refresh func block until we cancel it + cancel := make(chan struct{}) + refresh := func() (interface{}, string, error) { + <-cancel + return nil, "pending", nil + } + conf := &StateChangeConf{ + Pending: []string{"pending", "incomplete"}, + Target: []string{"running"}, + Refresh: refresh, + Timeout: 10 * time.Millisecond, + PollInterval: 10 * time.Second, + } + + var obj interface{} + var err error + + waitDone := make(chan struct{}) + go func() { + defer close(waitDone) + obj, err = conf.WaitForState() + }() + + // make sure WaitForState is blocked + select { + case <-waitDone: + t.Fatal("WaitForState returned too early") + case <-time.After(10 * time.Millisecond): + } + + // unlock the refresh function + close(cancel) + // make sure WaitForState returns + select { + case <-waitDone: + case <-time.After(time.Second): + t.Fatal("WaitForState didn't return after refresh finished") + } + + if err == nil { + t.Fatal("Expected timeout error. No error returned.") + } + + expectedErr := "timeout while waiting for state to become 'running'" + if !strings.HasPrefix(err.Error(), expectedErr) { + t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) + } + if obj != nil { t.Fatalf("should not return obj") } diff --git a/helper/resource/wait_test.go b/helper/resource/wait_test.go index bb17d9fe4..526b21ae3 100644 --- a/helper/resource/wait_test.go +++ b/helper/resource/wait_test.go @@ -25,6 +25,21 @@ func TestRetry(t *testing.T) { } } +// make sure a slow StateRefreshFunc is allowed to complete after timeout +func TestRetry_grace(t *testing.T) { + t.Parallel() + + f := func() *RetryError { + time.Sleep(1 * time.Second) + return nil + } + + err := Retry(10*time.Millisecond, f) + if err != nil { + t.Fatalf("err: %s", err) + } +} + func TestRetry_timeout(t *testing.T) { t.Parallel() @@ -39,14 +54,18 @@ func TestRetry_timeout(t *testing.T) { } func TestRetry_hang(t *testing.T) { - t.Parallel() + old := refreshGracePeriod + refreshGracePeriod = 50 * time.Millisecond + defer func() { + refreshGracePeriod = old + }() f := func() *RetryError { time.Sleep(2 * time.Second) return nil } - err := Retry(1*time.Second, f) + err := Retry(50*time.Millisecond, f) if err == nil { t.Fatal("should error") }