From af5e22cf94d4dc92559895ea78fe07c829d196df Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 19 Apr 2017 10:10:07 -0400 Subject: [PATCH 1/6] don't leave WaitForState goroutine running Make sure that we can cancel the WaitForState refresh loop when reaching a timeout, otherwise it may run indefinitely. There's no need to try and store and read the Result concurrently, just pass the value over a channel. --- helper/resource/state.go | 91 +++++++++++++++++++++++++++------------- 1 file changed, 61 insertions(+), 30 deletions(-) diff --git a/helper/resource/state.go b/helper/resource/state.go index 7473a105e..dd5bfd4b6 100644 --- a/helper/resource/state.go +++ b/helper/resource/state.go @@ -2,7 +2,6 @@ package resource import ( "log" - "sync/atomic" "time" ) @@ -62,33 +61,45 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { conf.ContinuousTargetOccurence = 1 } - // We can't safely read the result values if we timeout, so store them in - // an atomic.Value type Result struct { Result interface{} State string Error error + Done bool } - var lastResult atomic.Value - lastResult.Store(Result{}) - doneCh := make(chan struct{}) + // read ever result from the refresh loop, waiting for a positive result.Done + resCh := make(chan Result, 1) + // cancellation channel for the refresh loop + cancelCh := make(chan struct{}) + go func() { - defer close(doneCh) + defer close(resCh) - // Wait for the delay time.Sleep(conf.Delay) - wait := 100 * time.Millisecond + // start with 0 delay for the first loop + var wait time.Duration for { + // wait and watch for cancellation + select { + case <-cancelCh: + return + case <-time.After(wait): + // first round had no wait + if wait == 0 { + wait = 100 * time.Millisecond + } + } + res, currentState, err := conf.Refresh() result := Result{ Result: res, State: currentState, Error: err, } - lastResult.Store(result) + resCh <- result if err != nil { return @@ -98,6 +109,8 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { if res == nil && len(conf.Target) == 0 { targetOccurence += 1 if conf.ContinuousTargetOccurence == targetOccurence { + result.Done = true + resCh <- result return } else { continue @@ -113,7 +126,7 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { LastError: err, Retries: notfoundTick, } - lastResult.Store(result) + resCh <- result return } } else { @@ -126,6 +139,8 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { found = true targetOccurence += 1 if conf.ContinuousTargetOccurence == targetOccurence { + result.Done = true + resCh <- result return } else { continue @@ -147,7 +162,7 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { State: result.State, ExpectedState: conf.Target, } - lastResult.Store(result) + resCh <- result return } } @@ -162,30 +177,46 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { } else if wait > 10*time.Second { wait = 10 * time.Second } + + // Wait between refreshes using exponential backoff, except when + // waiting for the target state to reoccur. + if targetOccurence == 0 { + wait *= 2 + } } log.Printf("[TRACE] Waiting %s before next try", wait) - time.Sleep(wait) - - // Wait between refreshes using exponential backoff, except when - // waiting for the target state to reoccur. - if targetOccurence == 0 { - wait *= 2 - } } }() - select { - case <-doneCh: - r := lastResult.Load().(Result) - return r.Result, r.Error - case <-time.After(conf.Timeout): - r := lastResult.Load().(Result) - return nil, &TimeoutError{ - LastError: r.Error, - LastState: r.State, - Timeout: conf.Timeout, - ExpectedState: conf.Target, + // store the last value result from the refresh loop + lastResult := Result{} + + timeout := time.After(conf.Timeout) + for { + select { + case r, ok := <-resCh: + // channel closed, so return the last result + if !ok { + return lastResult.Result, lastResult.Error + } + + // we reached the intended state + if r.Done { + return r.Result, r.Error + } + + // still waiting, store the last result + lastResult = r + + case <-timeout: + close(cancelCh) + return nil, &TimeoutError{ + LastError: lastResult.Error, + LastState: lastResult.State, + Timeout: conf.Timeout, + ExpectedState: conf.Target, + } } } } From 6601b9b8dd2f7d2fb8474eaadc46a5850f489189 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 19 Apr 2017 10:10:54 -0400 Subject: [PATCH 2/6] adjust the inconsistent_negative test to match This test unfortunately relies on the timing of the loops in WaitForState, and the text of the error message. Adjust the timing so the timeout isn't an even multiple of the poll interval, and make sure we reach a minimum number of retries. --- helper/resource/state_test.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/helper/resource/state_test.go b/helper/resource/state_test.go index 4b4731351..5e0cbe2dc 100644 --- a/helper/resource/state_test.go +++ b/helper/resource/state_test.go @@ -2,6 +2,8 @@ package resource import ( "errors" + "strings" + "sync/atomic" "testing" "time" ) @@ -109,11 +111,18 @@ func TestWaitForState_inconsistent_positive(t *testing.T) { } func TestWaitForState_inconsistent_negative(t *testing.T) { + refreshCount := int64(0) + f := InconsistentStateRefreshFunc() + refresh := func() (interface{}, string, error) { + atomic.AddInt64(&refreshCount, 1) + return f() + } + conf := &StateChangeConf{ Pending: []string{"replicating"}, Target: []string{"done"}, - Refresh: InconsistentStateRefreshFunc(), - Timeout: 90 * time.Millisecond, + Refresh: refresh, + Timeout: 85 * time.Millisecond, PollInterval: 10 * time.Millisecond, ContinuousTargetOccurence: 4, } @@ -123,9 +132,17 @@ func TestWaitForState_inconsistent_negative(t *testing.T) { if err == nil { t.Fatal("Expected timeout error. No error returned.") } - expectedErr := "timeout while waiting for state to become 'done' (last state: 'done', timeout: 90ms)" - if err.Error() != expectedErr { - t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) + + // we can't guarantee the exact number of refresh calls in the tests by + // timing them, but we want to make sure the test at least went through th + // required states. + if atomic.LoadInt64(&refreshCount) < 6 { + t.Fatal("refreshed called too few times") + } + + expectedErr := "timeout while waiting for state to become 'done'" + if !strings.HasPrefix(err.Error(), expectedErr) { + t.Fatalf("error prefix doesn't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) } } From af1628eaa4c109653b440b99be27b8fe053c83a8 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 19 Apr 2017 12:14:23 -0400 Subject: [PATCH 3/6] add refreshGracePeriod Refresh calls may have side effects that need to be recorded if it succeeds, especially common when when WaitForState is called from resource.Retry. If the WaitForState timeout is reached and there is a Refresh call in-flight, wait up to refreshGracePeriod (set to 30s) for it to complete. --- helper/resource/state.go | 57 ++++++++++++++++++++++++++++++------ helper/resource/wait_test.go | 15 ++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/helper/resource/state.go b/helper/resource/state.go index dd5bfd4b6..285926b11 100644 --- a/helper/resource/state.go +++ b/helper/resource/state.go @@ -5,6 +5,8 @@ import ( "time" ) +var refreshGracePeriod = 30 * time.Second + // StateRefreshFunc is a function type used for StateChangeConf that is // responsible for refreshing the item being watched for a state change. // @@ -68,11 +70,13 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { Done bool } - // read ever result from the refresh loop, waiting for a positive result.Done + // Read every result from the refresh loop, waiting for a positive result.Done. resCh := make(chan Result, 1) // cancellation channel for the refresh loop cancelCh := make(chan struct{}) + result := Result{} + go func() { defer close(resCh) @@ -82,6 +86,9 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { var wait time.Duration for { + // store the last result + resCh <- result + // wait and watch for cancellation select { case <-cancelCh: @@ -94,14 +101,14 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { } res, currentState, err := conf.Refresh() - result := Result{ + result = Result{ Result: res, State: currentState, Error: err, } - resCh <- result if err != nil { + resCh <- result return } @@ -167,6 +174,12 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { } } + // Wait between refreshes using exponential backoff, except when + // waiting for the target state to reoccur. + if targetOccurence == 0 { + wait *= 2 + } + // If a poll interval has been specified, choose that interval. // Otherwise bound the default value. if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second { @@ -177,12 +190,6 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { } else if wait > 10*time.Second { wait = 10 * time.Second } - - // Wait between refreshes using exponential backoff, except when - // waiting for the target state to reoccur. - if targetOccurence == 0 { - wait *= 2 - } } log.Printf("[TRACE] Waiting %s before next try", wait) @@ -210,7 +217,39 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { lastResult = r case <-timeout: + log.Printf("[WARN] WaitForState timeout after %s", conf.Timeout) + log.Printf("[WARN] WaitForState starting %s refresh grace period", refreshGracePeriod) + + // cancel the goroutine and start our grace period timer close(cancelCh) + timeout := time.After(refreshGracePeriod) + + // we need a for loop and a label to break on, because we may have + // an extra response value to read, but still want to wait for the + // channel to close. + forSelect: + for { + select { + case r, ok := <-resCh: + if r.Done { + // the last refresh loop reached the desired state + return r.Result, r.Error + } + + if !ok { + // the the goroutine returned + break forSelect + } + + // target state not reached, save the result for the + // TimeoutError and wait for the channel to close + lastResult = r + case <-timeout: + log.Println("[ERROR] WaitForState exceeded refresh grace period") + break forSelect + } + } + return nil, &TimeoutError{ LastError: lastResult.Error, LastState: lastResult.State, diff --git a/helper/resource/wait_test.go b/helper/resource/wait_test.go index bb17d9fe4..957bd1842 100644 --- a/helper/resource/wait_test.go +++ b/helper/resource/wait_test.go @@ -25,6 +25,21 @@ func TestRetry(t *testing.T) { } } +// make sure a slow StateRefreshFunc is allowed to complete after timeout +func TestRetry_grace(t *testing.T) { + t.Parallel() + + f := func() *RetryError { + time.Sleep(1 * time.Second) + return nil + } + + err := Retry(10*time.Millisecond, f) + if err != nil { + t.Fatalf("err: %s", err) + } +} + func TestRetry_timeout(t *testing.T) { t.Parallel() From eb4b45941c4c81180f42e30889e6037f05a0d2f4 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 19 Apr 2017 14:19:08 -0400 Subject: [PATCH 4/6] fix tests affected by refreshGracePeriod A couple tests require lowering the grace period to keep the test from taking the full 30s timeout. The Retry_hang test also needed to be removed from the Parallel group, becuase it modifies the global refreshGracePeriod variable. --- helper/resource/state_test.go | 6 ++++++ helper/resource/wait_test.go | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/helper/resource/state_test.go b/helper/resource/state_test.go index 5e0cbe2dc..ca534589c 100644 --- a/helper/resource/state_test.go +++ b/helper/resource/state_test.go @@ -147,6 +147,12 @@ func TestWaitForState_inconsistent_negative(t *testing.T) { } func TestWaitForState_timeout(t *testing.T) { + old := refreshGracePeriod + refreshGracePeriod = 5 * time.Millisecond + defer func() { + refreshGracePeriod = old + }() + conf := &StateChangeConf{ Pending: []string{"pending", "incomplete"}, Target: []string{"running"}, diff --git a/helper/resource/wait_test.go b/helper/resource/wait_test.go index 957bd1842..526b21ae3 100644 --- a/helper/resource/wait_test.go +++ b/helper/resource/wait_test.go @@ -54,14 +54,18 @@ func TestRetry_timeout(t *testing.T) { } func TestRetry_hang(t *testing.T) { - t.Parallel() + old := refreshGracePeriod + refreshGracePeriod = 50 * time.Millisecond + defer func() { + refreshGracePeriod = old + }() f := func() *RetryError { time.Sleep(2 * time.Second) return nil } - err := Retry(1*time.Second, f) + err := Retry(50*time.Millisecond, f) if err == nil { t.Fatal("should error") } From 14bea66f4ee3c09dff772beb407343f4fa11f5db Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 19 Apr 2017 18:06:41 -0400 Subject: [PATCH 5/6] add test for proper cancelation --- helper/resource/state_test.go | 56 +++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/helper/resource/state_test.go b/helper/resource/state_test.go index ca534589c..6d6b329a1 100644 --- a/helper/resource/state_test.go +++ b/helper/resource/state_test.go @@ -171,6 +171,62 @@ func TestWaitForState_timeout(t *testing.T) { t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) } + if obj != nil { + t.Fatalf("should not return obj") + } +} + +// Make sure a timeout actually cancels the refresh goroutine and waits for its +// return. +func TestWaitForState_cancel(t *testing.T) { + // make this refresh func block until we cancel it + cancel := make(chan struct{}) + refresh := func() (interface{}, string, error) { + <-cancel + return nil, "pending", nil + } + conf := &StateChangeConf{ + Pending: []string{"pending", "incomplete"}, + Target: []string{"running"}, + Refresh: refresh, + Timeout: 10 * time.Millisecond, + PollInterval: 10 * time.Second, + } + + var obj interface{} + var err error + + waitDone := make(chan struct{}) + go func() { + defer close(waitDone) + obj, err = conf.WaitForState() + }() + + // make sure WaitForState is blocked + select { + case <-waitDone: + t.Fatal("WaitForState returned too early") + case <-time.After(10 * time.Millisecond): + } + + // unlock the refresh function + close(cancel) + // make sure WaitForState returns + select { + case <-waitDone: + case <-time.After(time.Second): + t.Fatal("WaitForState didn't return after refresh finished") + } + + if err == nil { + t.Fatal("Expected timeout error. No error returned.") + } + + expectedErr := "timeout while waiting for state to become 'running'" + if !strings.HasPrefix(err.Error(), expectedErr) { + t.Fatalf("Errors don't match.\nExpected: %q\nGiven: %q\n", expectedErr, err.Error()) + } + if obj != nil { t.Fatalf("should not return obj") } From 4c3a053f0cf59ac9d7ab15cbcfbee50f788fde55 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 19 Apr 2017 18:19:48 -0400 Subject: [PATCH 6/6] lint errors --- helper/resource/state.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/helper/resource/state.go b/helper/resource/state.go index 285926b11..37c586a11 100644 --- a/helper/resource/state.go +++ b/helper/resource/state.go @@ -114,20 +114,19 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { // If we're waiting for the absence of a thing, then return if res == nil && len(conf.Target) == 0 { - targetOccurence += 1 + targetOccurence++ if conf.ContinuousTargetOccurence == targetOccurence { result.Done = true resCh <- result return - } else { - continue } + continue } if res == nil { // If we didn't find the resource, check if we have been // not finding it for awhile, and if so, report an error. - notfoundTick += 1 + notfoundTick++ if notfoundTick > conf.NotFoundChecks { result.Error = &NotFoundError{ LastError: err, @@ -144,14 +143,13 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { for _, allowed := range conf.Target { if currentState == allowed { found = true - targetOccurence += 1 + targetOccurence++ if conf.ContinuousTargetOccurence == targetOccurence { result.Done = true resCh <- result return - } else { - continue } + continue } } @@ -237,7 +235,7 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) { } if !ok { - // the the goroutine returned + // the goroutine returned break forSelect }