diff --git a/helper/resource/state.go b/helper/resource/state.go new file mode 100644 index 000000000..8c5ec4b27 --- /dev/null +++ b/helper/resource/state.go @@ -0,0 +1,120 @@ +package resource + +import ( + "errors" + "fmt" + "log" + "math" + "time" +) + +// StateRefreshFunc is a function type used for StateChangeConf that is +// responsible for refreshing the item being watched for a state change. +// +// It returns three results. `result` is any object that will be returned +// as the final object after waiting for state change. This allows you to +// return the final updated object, for example an EC2 instance after refreshing +// it. +// +// `state` is the latest state of that object. And `err` is any error that +// may have happened while refreshing the state. +type StateRefreshFunc func() (result interface{}, state string, err error) + +// StateChangeConf is the configuration struct used for `WaitForState`. +type StateChangeConf struct { + Delay time.Duration // Wait this time before starting checks + Pending []string // States that are "allowed" and will continue trying + Refresh StateRefreshFunc // Refreshes the current state + Target string // Target state + Timeout time.Duration // The amount of time to wait before timeout + MinTimeout time.Duration // Smallest time to wait before refreshes +} + +// WaitForState watches an object and waits for it to achieve the state +// specified in the configuration using the specified Refresh() func, +// waiting the number of seconds specified in the timeout configuration. +func (conf *StateChangeConf) WaitForState() (interface{}, error) { + log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target) + + notfoundTick := 0 + + var result interface{} + var resulterr error + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + + // Wait for the delay + time.Sleep(conf.Delay) + + var err error + for tries := 0; ; tries++ { + // Wait between refreshes using an exponential backoff + wait := time.Duration(math.Pow(2, float64(tries))) * + 100 * time.Millisecond + if wait < conf.MinTimeout { + wait = conf.MinTimeout + } else if wait > 10*time.Second { + wait = 10 * time.Second + } + + log.Printf("[TRACE] Waiting %s before next try", wait) + time.Sleep(wait) + + var currentState string + result, currentState, err = conf.Refresh() + if err != nil { + resulterr = err + return + } + + // If we're waiting for the absence of a thing, then return + if result == nil && conf.Target == "" { + return + } + + if result == nil { + // If we didn't find the resource, check if we have been + // not finding it for awhile, and if so, report an error. + notfoundTick += 1 + if notfoundTick > 20 { + resulterr = errors.New("couldn't find resource") + return + } + } else { + // Reset the counter for when a resource isn't found + notfoundTick = 0 + + if currentState == conf.Target { + return + } + + found := false + for _, allowed := range conf.Pending { + if currentState == allowed { + found = true + break + } + } + + if !found { + resulterr = fmt.Errorf( + "unexpected state '%s', wanted target '%s'", + currentState, + conf.Target) + return + } + } + } + }() + + select { + case <-doneCh: + return result, resulterr + case <-time.After(conf.Timeout): + return nil, fmt.Errorf( + "timeout while waiting for state to become '%s'", + conf.Target) + } +} diff --git a/helper/resource/state_test.go b/helper/resource/state_test.go new file mode 100644 index 000000000..5a611e50f --- /dev/null +++ b/helper/resource/state_test.go @@ -0,0 +1,99 @@ +package resource + +import ( + "errors" + "testing" + "time" +) + +func FailedStateRefreshFunc() StateRefreshFunc { + return func() (interface{}, string, error) { + return nil, "", errors.New("failed") + } +} + +func TimeoutStateRefreshFunc() StateRefreshFunc { + return func() (interface{}, string, error) { + time.Sleep(100 * time.Second) + return nil, "", errors.New("failed") + } +} + +func SuccessfulStateRefreshFunc() StateRefreshFunc { + return func() (interface{}, string, error) { + return struct{}{}, "running", nil + } +} + +func TestWaitForState_timeout(t *testing.T) { + conf := &StateChangeConf{ + Pending: []string{"pending", "incomplete"}, + Target: "running", + Refresh: TimeoutStateRefreshFunc(), + Timeout: 1 * time.Millisecond, + } + + obj, err := conf.WaitForState() + + if err == nil && err.Error() != "timeout while waiting for state to become 'running'" { + t.Fatalf("err: %s", err) + } + + if obj != nil { + t.Fatalf("should not return obj") + } + +} + +func TestWaitForState_success(t *testing.T) { + conf := &StateChangeConf{ + Pending: []string{"pending", "incomplete"}, + Target: "running", + Refresh: SuccessfulStateRefreshFunc(), + Timeout: 200 * time.Second, + } + + obj, err := conf.WaitForState() + if err != nil { + t.Fatalf("err: %s", err) + } + if obj == nil { + t.Fatalf("should return obj") + } +} + +func TestWaitForState_successEmpty(t *testing.T) { + conf := &StateChangeConf{ + Pending: []string{"pending", "incomplete"}, + Target: "", + Refresh: func() (interface{}, string, error) { + return nil, "", nil + }, + Timeout: 200 * time.Second, + } + + obj, err := conf.WaitForState() + if err != nil { + t.Fatalf("err: %s", err) + } + if obj != nil { + t.Fatalf("obj should be nil") + } +} + +func TestWaitForState_failure(t *testing.T) { + conf := &StateChangeConf{ + Pending: []string{"pending", "incomplete"}, + Target: "running", + Refresh: FailedStateRefreshFunc(), + Timeout: 200 * time.Second, + } + + obj, err := conf.WaitForState() + if err == nil && err.Error() != "failed" { + t.Fatalf("err: %s", err) + } + if obj != nil { + t.Fatalf("should not return obj") + } +} diff --git a/helper/resource/wait.go b/helper/resource/wait.go index 8c5ec4b27..5fcd59c26 100644 --- a/helper/resource/wait.go +++ b/helper/resource/wait.go @@ -1,120 +1,31 @@ package resource import ( - "errors" - "fmt" - "log" - "math" "time" ) -// StateRefreshFunc is a function type used for StateChangeConf that is -// responsible for refreshing the item being watched for a state change. -// -// It returns three results. `result` is any object that will be returned -// as the final object after waiting for state change. This allows you to -// return the final updated object, for example an EC2 instance after refreshing -// it. -// -// `state` is the latest state of that object. And `err` is any error that -// may have happened while refreshing the state. -type StateRefreshFunc func() (result interface{}, state string, err error) +// RetryFunc is the function retried until it succeeds. +type RetryFunc func() error -// StateChangeConf is the configuration struct used for `WaitForState`. -type StateChangeConf struct { - Delay time.Duration // Wait this time before starting checks - Pending []string // States that are "allowed" and will continue trying - Refresh StateRefreshFunc // Refreshes the current state - Target string // Target state - Timeout time.Duration // The amount of time to wait before timeout - MinTimeout time.Duration // Smallest time to wait before refreshes -} - -// WaitForState watches an object and waits for it to achieve the state -// specified in the configuration using the specified Refresh() func, -// waiting the number of seconds specified in the timeout configuration. -func (conf *StateChangeConf) WaitForState() (interface{}, error) { - log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target) - - notfoundTick := 0 - - var result interface{} - var resulterr error - - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - - // Wait for the delay - time.Sleep(conf.Delay) - - var err error - for tries := 0; ; tries++ { - // Wait between refreshes using an exponential backoff - wait := time.Duration(math.Pow(2, float64(tries))) * - 100 * time.Millisecond - if wait < conf.MinTimeout { - wait = conf.MinTimeout - } else if wait > 10*time.Second { - wait = 10 * time.Second - } - - log.Printf("[TRACE] Waiting %s before next try", wait) - time.Sleep(wait) - - var currentState string - result, currentState, err = conf.Refresh() +// Retry is a basic wrapper around StateChangeConf that will just retry +// a function until it no longer returns an error. +func Retry(timeout time.Duration, f RetryFunc) error { + var err error + c := &StateChangeConf{ + Pending: []string{"error"}, + Target: "success", + Timeout: timeout, + MinTimeout: 500 * time.Millisecond, + Refresh: func() (interface{}, string, error) { + err = f() if err != nil { - resulterr = err - return + return 42, "error", nil } - // If we're waiting for the absence of a thing, then return - if result == nil && conf.Target == "" { - return - } - - if result == nil { - // If we didn't find the resource, check if we have been - // not finding it for awhile, and if so, report an error. - notfoundTick += 1 - if notfoundTick > 20 { - resulterr = errors.New("couldn't find resource") - return - } - } else { - // Reset the counter for when a resource isn't found - notfoundTick = 0 - - if currentState == conf.Target { - return - } - - found := false - for _, allowed := range conf.Pending { - if currentState == allowed { - found = true - break - } - } - - if !found { - resulterr = fmt.Errorf( - "unexpected state '%s', wanted target '%s'", - currentState, - conf.Target) - return - } - } - } - }() - - select { - case <-doneCh: - return result, resulterr - case <-time.After(conf.Timeout): - return nil, fmt.Errorf( - "timeout while waiting for state to become '%s'", - conf.Target) + return 42, "success", nil + }, } + + c.WaitForState() + return err } diff --git a/helper/resource/wait_test.go b/helper/resource/wait_test.go index 5a611e50f..768ed88a6 100644 --- a/helper/resource/wait_test.go +++ b/helper/resource/wait_test.go @@ -1,99 +1,39 @@ package resource import ( - "errors" + "fmt" "testing" "time" ) -func FailedStateRefreshFunc() StateRefreshFunc { - return func() (interface{}, string, error) { - return nil, "", errors.New("failed") - } -} +func TestRetry(t *testing.T) { + t.Parallel() -func TimeoutStateRefreshFunc() StateRefreshFunc { - return func() (interface{}, string, error) { - time.Sleep(100 * time.Second) - return nil, "", errors.New("failed") - } -} + tries := 0 + f := func() error { + tries++ + if tries == 1 { + return nil + } -func SuccessfulStateRefreshFunc() StateRefreshFunc { - return func() (interface{}, string, error) { - return struct{}{}, "running", nil - } -} - -func TestWaitForState_timeout(t *testing.T) { - conf := &StateChangeConf{ - Pending: []string{"pending", "incomplete"}, - Target: "running", - Refresh: TimeoutStateRefreshFunc(), - Timeout: 1 * time.Millisecond, + return fmt.Errorf("error") } - obj, err := conf.WaitForState() - - if err == nil && err.Error() != "timeout while waiting for state to become 'running'" { - t.Fatalf("err: %s", err) - } - - if obj != nil { - t.Fatalf("should not return obj") - } - -} - -func TestWaitForState_success(t *testing.T) { - conf := &StateChangeConf{ - Pending: []string{"pending", "incomplete"}, - Target: "running", - Refresh: SuccessfulStateRefreshFunc(), - Timeout: 200 * time.Second, - } - - obj, err := conf.WaitForState() + err := Retry(2 * time.Second, f) if err != nil { t.Fatalf("err: %s", err) } - if obj == nil { - t.Fatalf("should return obj") - } } -func TestWaitForState_successEmpty(t *testing.T) { - conf := &StateChangeConf{ - Pending: []string{"pending", "incomplete"}, - Target: "", - Refresh: func() (interface{}, string, error) { - return nil, "", nil - }, - Timeout: 200 * time.Second, +func TestRetry_timeout(t *testing.T) { + t.Parallel() + + f := func() error { + return fmt.Errorf("always") } - obj, err := conf.WaitForState() - if err != nil { - t.Fatalf("err: %s", err) - } - if obj != nil { - t.Fatalf("obj should be nil") - } -} - -func TestWaitForState_failure(t *testing.T) { - conf := &StateChangeConf{ - Pending: []string{"pending", "incomplete"}, - Target: "running", - Refresh: FailedStateRefreshFunc(), - Timeout: 200 * time.Second, - } - - obj, err := conf.WaitForState() - if err == nil && err.Error() != "failed" { - t.Fatalf("err: %s", err) - } - if obj != nil { - t.Fatalf("should not return obj") + err := Retry(1 * time.Second, f) + if err == nil { + t.Fatal("should error") } }