diff --git a/Makefile b/Makefile index 7f02603ec..a4170404d 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ plugin-dev: generate # we run this one package at a time here because running the entire suite in # one command creates memory usage issues when running in Travis-CI. test: fmtcheck generate - go list $(TEST) | xargs -t -n4 go test $(TESTARGS) -timeout=60s -parallel=4 + go list $(TEST) | xargs -t -n4 go test $(TESTARGS) -timeout=2m -parallel=4 # testacc runs acceptance tests testacc: fmtcheck generate diff --git a/backend/remote/backend_apply.go b/backend/remote/backend_apply.go index f8ad4c5e4..5aae9050c 100644 --- a/backend/remote/backend_apply.go +++ b/backend/remote/backend_apply.go @@ -148,19 +148,23 @@ func (b *Remote) opApply(stopCtx, cancelCtx context.Context, op *backend.Operati } } + r, err = b.waitForRun(stopCtx, cancelCtx, op, "apply", r, w) + if err != nil { + return r, err + } + + if b.CLI != nil { + // Insert a blank line to separate the ouputs. + b.CLI.Output("") + } + logs, err := b.client.Applies.Logs(stopCtx, r.Apply.ID) if err != nil { return r, generalError("error retrieving logs", err) } scanner := bufio.NewScanner(logs) - skip := 0 for scanner.Scan() { - // Skip the first 3 lines to prevent duplicate output. - if skip < 3 { - skip++ - continue - } if b.CLI != nil { b.CLI.Output(b.Colorize().Color(scanner.Text())) } @@ -369,6 +373,4 @@ will cancel the remote apply if its still pending. If the apply started it will stop streaming the logs, but will not stop the apply running remotely. To view this run in a browser, visit: https://%s/app/%s/%s/runs/%s[reset] - -Waiting for the apply to start... ` diff --git a/backend/remote/backend_mock.go b/backend/remote/backend_mock.go index a72ee1ce6..eac6b6839 100644 --- a/backend/remote/backend_mock.go +++ b/backend/remote/backend_mock.go @@ -310,6 +310,36 @@ func (m *mockOrganizations) Delete(ctx context.Context, name string) error { return nil } +func (m *mockOrganizations) Capacity(ctx context.Context, name string) (*tfe.Capacity, error) { + var pending, running int + for _, r := range m.client.Runs.runs { + if r.Status == tfe.RunPending { + pending++ + continue + } + running++ + } + return &tfe.Capacity{Pending: pending, Running: running}, nil +} + +func (m *mockOrganizations) RunQueue(ctx context.Context, name string, options tfe.RunQueueOptions) (*tfe.RunQueue, error) { + rq := &tfe.RunQueue{} + + for _, r := range m.client.Runs.runs { + rq.Items = append(rq.Items, r) + } + + rq.Pagination = &tfe.Pagination{ + CurrentPage: 1, + NextPage: 1, + PreviousPage: 1, + TotalPages: 1, + TotalCount: len(rq.Items), + } + + return rq, nil +} + type mockPlans struct { client *mockClient logs map[string]string @@ -629,6 +659,14 @@ func (m *mockRuns) Create(ctx context.Context, options tfe.RunCreateOptions) (*t r.IsDestroy = *options.IsDestroy } + w, ok := m.client.Workspaces.workspaceIDs[options.Workspace.ID] + if !ok { + return nil, tfe.ErrResourceNotFound + } + if w.CurrentRun == nil { + w.CurrentRun = r + } + m.runs[r.ID] = r m.workspaces[options.Workspace.ID] = append(m.workspaces[options.Workspace.ID], r) @@ -691,6 +729,10 @@ func (m *mockRuns) Cancel(ctx context.Context, runID string, options tfe.RunCanc panic("not implemented") } +func (m *mockRuns) ForceCancel(ctx context.Context, runID string, options tfe.RunForceCancelOptions) error { + panic("not implemented") +} + func (m *mockRuns) Discard(ctx context.Context, runID string, options tfe.RunDiscardOptions) error { panic("not implemented") } diff --git a/backend/remote/backend_plan.go b/backend/remote/backend_plan.go index 10c5121d0..381309fb3 100644 --- a/backend/remote/backend_plan.go +++ b/backend/remote/backend_plan.go @@ -7,6 +7,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "os" "path/filepath" "strings" @@ -181,11 +182,6 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation, }() } - r, err = b.client.Runs.Read(stopCtx, r.ID) - if err != nil { - return r, generalError("error retrieving run", err) - } - if b.CLI != nil { header := planDefaultHeader if op.Type == backend.OperationTypeApply { @@ -195,6 +191,16 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation, header, b.hostname, b.organization, op.Workspace, r.ID)) + "\n")) } + r, err = b.waitForRun(stopCtx, cancelCtx, op, "plan", r, w) + if err != nil { + return r, err + } + + if b.CLI != nil { + // Insert a blank line to separate the ouputs. + b.CLI.Output("") + } + logs, err := b.client.Plans.Logs(stopCtx, r.Plan.ID) if err != nil { return r, generalError("error retrieving logs", err) @@ -213,6 +219,178 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation, return r, nil } +// backoff will perform exponential backoff based on the iteration and +// limited by the provided min and max (in milliseconds) durations. +func backoff(min, max float64, iter int) time.Duration { + backoff := math.Pow(2, float64(iter)/5) * min + if backoff > max { + backoff = max + } + return time.Duration(backoff) * time.Millisecond +} + +func (b *Remote) waitForRun(stopCtx, cancelCtx context.Context, op *backend.Operation, opType string, r *tfe.Run, w *tfe.Workspace) (*tfe.Run, error) { + started := time.Now() + updated := started + for i := 0; ; i++ { + select { + case <-stopCtx.Done(): + return r, stopCtx.Err() + case <-cancelCtx.Done(): + return r, cancelCtx.Err() + case <-time.After(backoff(1000, 3000, i)): + // Timer up, show status + } + + // Retrieve the run to get its current status. + r, err := b.client.Runs.Read(stopCtx, r.ID) + if err != nil { + return r, generalError("error retrieving run", err) + } + + // Return if the run is no longer pending. + if r.Status != tfe.RunPending && r.Status != tfe.RunConfirmed { + if i == 0 && b.CLI != nil { + b.CLI.Output(b.Colorize().Color(fmt.Sprintf("Waiting for the %s to start...", opType))) + } + return r, nil + } + + // Check if 30 seconds have passed since the last update. + current := time.Now() + if b.CLI != nil && (i == 0 || current.Sub(updated).Seconds() > 30) { + updated = current + position := 0 + elapsed := "" + + // Calculate and set the elapsed time. + if i > 0 { + elapsed = fmt.Sprintf( + " (%s elapsed)", current.Sub(started).Truncate(30*time.Second)) + } + + // Retrieve the workspace used to run this operation in. + w, err = b.client.Workspaces.Read(stopCtx, b.organization, w.Name) + if err != nil { + return nil, generalError("error retrieving workspace", err) + } + + // If the workspace is locked the run will not be queued and we can + // update the status without making any expensive calls. + if w.Locked && w.CurrentRun != nil { + cr, err := b.client.Runs.Read(stopCtx, w.CurrentRun.ID) + if err != nil { + return r, generalError("error retrieving current run", err) + } + if cr.Status == tfe.RunPending { + b.CLI.Output(b.Colorize().Color( + "Waiting for the manually locked workspace to be unlocked..." + elapsed)) + continue + } + } + + // Skip checking the workspace queue when we are the current run. + if w.CurrentRun == nil || w.CurrentRun.ID != r.ID { + found := false + options := tfe.RunListOptions{} + runlist: + for { + rl, err := b.client.Runs.List(stopCtx, w.ID, options) + if err != nil { + return r, generalError("error retrieving run list", err) + } + + // Loop through all runs to calculate the workspace queue position. + for _, item := range rl.Items { + if !found { + if r.ID == item.ID { + found = true + } + continue + } + + // If the run is in a final state, ignore it and continue. + switch item.Status { + case tfe.RunApplied, tfe.RunCanceled, tfe.RunDiscarded, tfe.RunErrored: + continue + case tfe.RunPlanned: + if op.Type == backend.OperationTypePlan { + continue + } + } + + // Increase the workspace queue position. + position++ + + // Stop searching when we reached the current run. + if w.CurrentRun != nil && w.CurrentRun.ID == item.ID { + break runlist + } + } + + // Exit the loop when we've seen all pages. + if rl.CurrentPage >= rl.TotalPages { + break + } + + // Update the page number to get the next page. + options.PageNumber = rl.NextPage + } + + if position > 0 { + b.CLI.Output(b.Colorize().Color(fmt.Sprintf( + "Waiting for %d run(s) to finish before being queued...%s", + position, + elapsed, + ))) + continue + } + } + + options := tfe.RunQueueOptions{} + search: + for { + rq, err := b.client.Organizations.RunQueue(stopCtx, b.organization, options) + if err != nil { + return r, generalError("error retrieving queue", err) + } + + // Search through all queued items to find our run. + for _, item := range rq.Items { + if r.ID == item.ID { + position = item.PositionInQueue + break search + } + } + + // Exit the loop when we've seen all pages. + if rq.CurrentPage >= rq.TotalPages { + break + } + + // Update the page number to get the next page. + options.PageNumber = rq.NextPage + } + + if position > 0 { + c, err := b.client.Organizations.Capacity(stopCtx, b.organization) + if err != nil { + return r, generalError("error retrieving capacity", err) + } + b.CLI.Output(b.Colorize().Color(fmt.Sprintf( + "Waiting for %d queued run(s) to finish before starting...%s", + position-c.Running, + elapsed, + ))) + continue + } + + b.CLI.Output(b.Colorize().Color(fmt.Sprintf( + "Waiting for the %s to start...%s", opType, elapsed))) + } + } +} + const planErrNoQueueRunRights = ` Insufficient rights to generate a plan! @@ -289,8 +467,6 @@ const planDefaultHeader = ` will stop streaming the logs, but will not stop the plan running remotely. To view this run in a browser, visit: https://%s/app/%s/%s/runs/%s[reset] - -Waiting for the plan to start... ` // The newline in this error is to make it look good in the CLI!