helper/resource: add Retry function

This commit is contained in:
Mitchell Hashimoto 2014-10-07 21:44:51 -07:00
parent 65f5069616
commit ef62fa80db
4 changed files with 257 additions and 187 deletions

120
helper/resource/state.go Normal file
View File

@ -0,0 +1,120 @@
package resource
import (
"errors"
"fmt"
"log"
"math"
"time"
)
// StateRefreshFunc is a function type used for StateChangeConf that is
// responsible for refreshing the item being watched for a state change.
//
// It returns three results. `result` is any object that will be returned
// as the final object after waiting for state change. This allows you to
// return the final updated object, for example an EC2 instance after refreshing
// it.
//
// `state` is the latest state of that object. And `err` is any error that
// may have happened while refreshing the state.
type StateRefreshFunc func() (result interface{}, state string, err error)
// StateChangeConf is the configuration struct used for `WaitForState`.
type StateChangeConf struct {
Delay time.Duration // Wait this time before starting checks
Pending []string // States that are "allowed" and will continue trying
Refresh StateRefreshFunc // Refreshes the current state
Target string // Target state
Timeout time.Duration // The amount of time to wait before timeout
MinTimeout time.Duration // Smallest time to wait before refreshes
}
// WaitForState watches an object and waits for it to achieve the state
// specified in the configuration using the specified Refresh() func,
// waiting the number of seconds specified in the timeout configuration.
func (conf *StateChangeConf) WaitForState() (interface{}, error) {
log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target)
notfoundTick := 0
var result interface{}
var resulterr error
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
// Wait for the delay
time.Sleep(conf.Delay)
var err error
for tries := 0; ; tries++ {
// Wait between refreshes using an exponential backoff
wait := time.Duration(math.Pow(2, float64(tries))) *
100 * time.Millisecond
if wait < conf.MinTimeout {
wait = conf.MinTimeout
} else if wait > 10*time.Second {
wait = 10 * time.Second
}
log.Printf("[TRACE] Waiting %s before next try", wait)
time.Sleep(wait)
var currentState string
result, currentState, err = conf.Refresh()
if err != nil {
resulterr = err
return
}
// If we're waiting for the absence of a thing, then return
if result == nil && conf.Target == "" {
return
}
if result == nil {
// If we didn't find the resource, check if we have been
// not finding it for awhile, and if so, report an error.
notfoundTick += 1
if notfoundTick > 20 {
resulterr = errors.New("couldn't find resource")
return
}
} else {
// Reset the counter for when a resource isn't found
notfoundTick = 0
if currentState == conf.Target {
return
}
found := false
for _, allowed := range conf.Pending {
if currentState == allowed {
found = true
break
}
}
if !found {
resulterr = fmt.Errorf(
"unexpected state '%s', wanted target '%s'",
currentState,
conf.Target)
return
}
}
}
}()
select {
case <-doneCh:
return result, resulterr
case <-time.After(conf.Timeout):
return nil, fmt.Errorf(
"timeout while waiting for state to become '%s'",
conf.Target)
}
}

View File

@ -0,0 +1,99 @@
package resource
import (
"errors"
"testing"
"time"
)
func FailedStateRefreshFunc() StateRefreshFunc {
return func() (interface{}, string, error) {
return nil, "", errors.New("failed")
}
}
func TimeoutStateRefreshFunc() StateRefreshFunc {
return func() (interface{}, string, error) {
time.Sleep(100 * time.Second)
return nil, "", errors.New("failed")
}
}
func SuccessfulStateRefreshFunc() StateRefreshFunc {
return func() (interface{}, string, error) {
return struct{}{}, "running", nil
}
}
func TestWaitForState_timeout(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "running",
Refresh: TimeoutStateRefreshFunc(),
Timeout: 1 * time.Millisecond,
}
obj, err := conf.WaitForState()
if err == nil && err.Error() != "timeout while waiting for state to become 'running'" {
t.Fatalf("err: %s", err)
}
if obj != nil {
t.Fatalf("should not return obj")
}
}
func TestWaitForState_success(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "running",
Refresh: SuccessfulStateRefreshFunc(),
Timeout: 200 * time.Second,
}
obj, err := conf.WaitForState()
if err != nil {
t.Fatalf("err: %s", err)
}
if obj == nil {
t.Fatalf("should return obj")
}
}
func TestWaitForState_successEmpty(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "",
Refresh: func() (interface{}, string, error) {
return nil, "", nil
},
Timeout: 200 * time.Second,
}
obj, err := conf.WaitForState()
if err != nil {
t.Fatalf("err: %s", err)
}
if obj != nil {
t.Fatalf("obj should be nil")
}
}
func TestWaitForState_failure(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "running",
Refresh: FailedStateRefreshFunc(),
Timeout: 200 * time.Second,
}
obj, err := conf.WaitForState()
if err == nil && err.Error() != "failed" {
t.Fatalf("err: %s", err)
}
if obj != nil {
t.Fatalf("should not return obj")
}
}

View File

@ -1,120 +1,31 @@
package resource package resource
import ( import (
"errors"
"fmt"
"log"
"math"
"time" "time"
) )
// StateRefreshFunc is a function type used for StateChangeConf that is // RetryFunc is the function retried until it succeeds.
// responsible for refreshing the item being watched for a state change. type RetryFunc func() error
//
// It returns three results. `result` is any object that will be returned
// as the final object after waiting for state change. This allows you to
// return the final updated object, for example an EC2 instance after refreshing
// it.
//
// `state` is the latest state of that object. And `err` is any error that
// may have happened while refreshing the state.
type StateRefreshFunc func() (result interface{}, state string, err error)
// StateChangeConf is the configuration struct used for `WaitForState`. // Retry is a basic wrapper around StateChangeConf that will just retry
type StateChangeConf struct { // a function until it no longer returns an error.
Delay time.Duration // Wait this time before starting checks func Retry(timeout time.Duration, f RetryFunc) error {
Pending []string // States that are "allowed" and will continue trying var err error
Refresh StateRefreshFunc // Refreshes the current state c := &StateChangeConf{
Target string // Target state Pending: []string{"error"},
Timeout time.Duration // The amount of time to wait before timeout Target: "success",
MinTimeout time.Duration // Smallest time to wait before refreshes Timeout: timeout,
} MinTimeout: 500 * time.Millisecond,
Refresh: func() (interface{}, string, error) {
// WaitForState watches an object and waits for it to achieve the state err = f()
// specified in the configuration using the specified Refresh() func,
// waiting the number of seconds specified in the timeout configuration.
func (conf *StateChangeConf) WaitForState() (interface{}, error) {
log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target)
notfoundTick := 0
var result interface{}
var resulterr error
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
// Wait for the delay
time.Sleep(conf.Delay)
var err error
for tries := 0; ; tries++ {
// Wait between refreshes using an exponential backoff
wait := time.Duration(math.Pow(2, float64(tries))) *
100 * time.Millisecond
if wait < conf.MinTimeout {
wait = conf.MinTimeout
} else if wait > 10*time.Second {
wait = 10 * time.Second
}
log.Printf("[TRACE] Waiting %s before next try", wait)
time.Sleep(wait)
var currentState string
result, currentState, err = conf.Refresh()
if err != nil { if err != nil {
resulterr = err return 42, "error", nil
return
} }
// If we're waiting for the absence of a thing, then return return 42, "success", nil
if result == nil && conf.Target == "" { },
return
}
if result == nil {
// If we didn't find the resource, check if we have been
// not finding it for awhile, and if so, report an error.
notfoundTick += 1
if notfoundTick > 20 {
resulterr = errors.New("couldn't find resource")
return
}
} else {
// Reset the counter for when a resource isn't found
notfoundTick = 0
if currentState == conf.Target {
return
}
found := false
for _, allowed := range conf.Pending {
if currentState == allowed {
found = true
break
}
}
if !found {
resulterr = fmt.Errorf(
"unexpected state '%s', wanted target '%s'",
currentState,
conf.Target)
return
}
}
}
}()
select {
case <-doneCh:
return result, resulterr
case <-time.After(conf.Timeout):
return nil, fmt.Errorf(
"timeout while waiting for state to become '%s'",
conf.Target)
} }
c.WaitForState()
return err
} }

View File

@ -1,99 +1,39 @@
package resource package resource
import ( import (
"errors" "fmt"
"testing" "testing"
"time" "time"
) )
func FailedStateRefreshFunc() StateRefreshFunc { func TestRetry(t *testing.T) {
return func() (interface{}, string, error) { t.Parallel()
return nil, "", errors.New("failed")
}
}
func TimeoutStateRefreshFunc() StateRefreshFunc { tries := 0
return func() (interface{}, string, error) { f := func() error {
time.Sleep(100 * time.Second) tries++
return nil, "", errors.New("failed") if tries == 1 {
} return nil
} }
func SuccessfulStateRefreshFunc() StateRefreshFunc { return fmt.Errorf("error")
return func() (interface{}, string, error) {
return struct{}{}, "running", nil
}
}
func TestWaitForState_timeout(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "running",
Refresh: TimeoutStateRefreshFunc(),
Timeout: 1 * time.Millisecond,
} }
obj, err := conf.WaitForState() err := Retry(2 * time.Second, f)
if err == nil && err.Error() != "timeout while waiting for state to become 'running'" {
t.Fatalf("err: %s", err)
}
if obj != nil {
t.Fatalf("should not return obj")
}
}
func TestWaitForState_success(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "running",
Refresh: SuccessfulStateRefreshFunc(),
Timeout: 200 * time.Second,
}
obj, err := conf.WaitForState()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if obj == nil {
t.Fatalf("should return obj")
}
} }
func TestWaitForState_successEmpty(t *testing.T) { func TestRetry_timeout(t *testing.T) {
conf := &StateChangeConf{ t.Parallel()
Pending: []string{"pending", "incomplete"},
Target: "", f := func() error {
Refresh: func() (interface{}, string, error) { return fmt.Errorf("always")
return nil, "", nil
},
Timeout: 200 * time.Second,
} }
obj, err := conf.WaitForState() err := Retry(1 * time.Second, f)
if err != nil { if err == nil {
t.Fatalf("err: %s", err) t.Fatal("should error")
}
if obj != nil {
t.Fatalf("obj should be nil")
}
}
func TestWaitForState_failure(t *testing.T) {
conf := &StateChangeConf{
Pending: []string{"pending", "incomplete"},
Target: "running",
Refresh: FailedStateRefreshFunc(),
Timeout: 200 * time.Second,
}
obj, err := conf.WaitForState()
if err == nil && err.Error() != "failed" {
t.Fatalf("err: %s", err)
}
if obj != nil {
t.Fatalf("should not return obj")
} }
} }