From 2c70d884d6a3de8ad6aaf8230a37c0c1b17ba9cb Mon Sep 17 00:00:00 2001 From: Martin Atkins Date: Fri, 6 Apr 2018 16:25:35 -0700 Subject: [PATCH] dag: node visits return diagnostics rather than errors This allows node visits to also return warnings. --- dag/dag.go | 10 +++--- dag/dag_test.go | 24 ++++++------- dag/marshal_test.go | 4 ++- dag/walk.go | 88 ++++++++++++++++++++++++++------------------- dag/walk_test.go | 49 +++++++++++++++---------- 5 files changed, 102 insertions(+), 73 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index b7eb10c33..77c67eff9 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -5,6 +5,8 @@ import ( "sort" "strings" + "github.com/hashicorp/terraform/tfdiags" + "github.com/hashicorp/go-multierror" ) @@ -15,7 +17,7 @@ type AcyclicGraph struct { } // WalkFunc is the callback used for walking the graph. -type WalkFunc func(Vertex) error +type WalkFunc func(Vertex) tfdiags.Diagnostics // DepthWalkFunc is a walk function that also receives the current depth of the // walk as an argument @@ -161,9 +163,9 @@ func (g *AcyclicGraph) Cycles() [][]Vertex { } // Walk walks the graph, calling your callback as each node is visited. -// This will walk nodes in parallel if it can. Because the walk is done -// in parallel, the error returned will be a multierror. -func (g *AcyclicGraph) Walk(cb WalkFunc) error { +// This will walk nodes in parallel if it can. The resulting diagnostics +// contains problems from all graphs visited, in no particular order. +func (g *AcyclicGraph) Walk(cb WalkFunc) tfdiags.Diagnostics { defer g.debug.BeginOperation(typeWalk, "").End("") w := &Walker{Callback: cb, Reverse: true} diff --git a/dag/dag_test.go b/dag/dag_test.go index ae9d87244..222df257e 100644 --- a/dag/dag_test.go +++ b/dag/dag_test.go @@ -12,6 +12,8 @@ import ( "sync" "testing" + "github.com/hashicorp/terraform/tfdiags" + "github.com/hashicorp/terraform/helper/logging" ) @@ -273,7 +275,7 @@ func TestAcyclicGraphWalk(t *testing.T) { var visits []Vertex var lock sync.Mutex - err := g.Walk(func(v Vertex) error { + err := g.Walk(func(v Vertex) tfdiags.Diagnostics { lock.Lock() defer lock.Unlock() visits = append(visits, v) @@ -308,31 +310,29 @@ func TestAcyclicGraphWalk_error(t *testing.T) { var visits []Vertex var lock sync.Mutex - err := g.Walk(func(v Vertex) error { + err := g.Walk(func(v Vertex) tfdiags.Diagnostics { lock.Lock() defer lock.Unlock() + var diags tfdiags.Diagnostics + if v == 2 { - return fmt.Errorf("error") + diags = diags.Append(fmt.Errorf("error")) + return diags } visits = append(visits, v) - return nil + return diags }) if err == nil { t.Fatal("should error") } - expected := [][]Vertex{ - {1}, - } - for _, e := range expected { - if reflect.DeepEqual(visits, e) { - return - } + expected := []Vertex{1} + if !reflect.DeepEqual(visits, expected) { + t.Errorf("wrong visits\ngot: %#v\nwant: %#v", visits, expected) } - t.Fatalf("bad: %#v", visits) } func TestAcyclicGraph_ReverseDepthFirstWalk_WithRemoval(t *testing.T) { diff --git a/dag/marshal_test.go b/dag/marshal_test.go index 9201e0208..c2f52a936 100644 --- a/dag/marshal_test.go +++ b/dag/marshal_test.go @@ -5,6 +5,8 @@ import ( "encoding/json" "strings" "testing" + + "github.com/hashicorp/terraform/tfdiags" ) func TestGraphDot_empty(t *testing.T) { @@ -322,7 +324,7 @@ func TestGraphJSON_debugVisits(t *testing.T) { g.Connect(BasicEdge(4, 2)) g.Connect(BasicEdge(3, 4)) - err := (&AcyclicGraph{g}).Walk(func(v Vertex) error { + err := (&AcyclicGraph{g}).Walk(func(v Vertex) tfdiags.Diagnostics { g.DebugVisitInfo(v, "basic walk") return nil }) diff --git a/dag/walk.go b/dag/walk.go index f03b10030..4dd5615b4 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -2,12 +2,11 @@ package dag import ( "errors" - "fmt" "log" "sync" "time" - "github.com/hashicorp/go-multierror" + "github.com/hashicorp/terraform/tfdiags" ) // Walker is used to walk every vertex of a graph in parallel. @@ -54,10 +53,15 @@ type Walker struct { // if new vertices are added. wait sync.WaitGroup - // errMap contains the errors recorded so far for execution. Reading - // and writing should hold errLock. - errMap map[Vertex]error - errLock sync.Mutex + // diagsMap contains the diagnostics recorded so far for execution, + // and upstreamFailed contains all the vertices whose problems were + // caused by upstream failures, and thus whose diagnostics should be + // excluded from the final set. + // + // Readers and writers of either map must hold diagsLock. + diagsMap map[Vertex]tfdiags.Diagnostics + upstreamFailed map[Vertex]struct{} + diagsLock sync.Mutex } type walkerVertex struct { @@ -98,31 +102,30 @@ type walkerVertex struct { // user-returned error. var errWalkUpstream = errors.New("upstream dependency failed") -// Wait waits for the completion of the walk and returns any errors ( -// in the form of a multierror) that occurred. Update should be called -// to populate the walk with vertices and edges prior to calling this. +// Wait waits for the completion of the walk and returns diagnostics describing +// any problems that arose. Update should be called to populate the walk with +// vertices and edges prior to calling this. // // Wait will return as soon as all currently known vertices are complete. // If you plan on calling Update with more vertices in the future, you // should not call Wait until after this is done. -func (w *Walker) Wait() error { +func (w *Walker) Wait() tfdiags.Diagnostics { // Wait for completion w.wait.Wait() - // Grab the error lock - w.errLock.Lock() - defer w.errLock.Unlock() - - // Build the error - var result error - for v, err := range w.errMap { - if err != nil && err != errWalkUpstream { - result = multierror.Append(result, fmt.Errorf( - "%s: %s", VertexName(v), err)) + var diags tfdiags.Diagnostics + w.diagsLock.Lock() + for v, vDiags := range w.diagsMap { + if _, upstream := w.upstreamFailed[v]; upstream { + // Ignore diagnostics for nodes that had failed upstreams, since + // the downstream diagnostics are likely to be redundant. + continue } + diags = diags.Append(vDiags) } + w.diagsLock.Unlock() - return result + return diags } // Update updates the currently executing walk with the given graph. @@ -136,6 +139,7 @@ func (w *Walker) Wait() error { // Multiple Updates can be called in parallel. Update can be called at any // time during a walk. func (w *Walker) Update(g *AcyclicGraph) { + log.Print("[TRACE] dag/walk: updating graph") var v, e *Set if g != nil { v, e = g.vertices, g.edges @@ -381,25 +385,34 @@ func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { } // Run our callback or note that our upstream failed - var err error + var diags tfdiags.Diagnostics + var upstreamFailed bool if depsSuccess { log.Printf("[TRACE] dag/walk: walking %q", VertexName(v)) - err = w.Callback(v) + diags = w.Callback(v) } else { log.Printf("[TRACE] dag/walk: upstream errored, not walking %q", VertexName(v)) - err = errWalkUpstream + // This won't be displayed to the user because we'll set upstreamFailed, + // but we need to ensure there's at least one error in here so that + // the failures will cascade downstream. + diags = diags.Append(errors.New("upstream dependencies failed")) + upstreamFailed = true } - // Record the error - if err != nil { - w.errLock.Lock() - defer w.errLock.Unlock() - - if w.errMap == nil { - w.errMap = make(map[Vertex]error) - } - w.errMap[v] = err + // Record the result (we must do this after execution because we mustn't + // hold diagsLock while visiting a vertex.) + w.diagsLock.Lock() + if w.diagsMap == nil { + w.diagsMap = make(map[Vertex]tfdiags.Diagnostics) } + w.diagsMap[v] = diags + if w.upstreamFailed == nil { + w.upstreamFailed = make(map[Vertex]struct{}) + } + if upstreamFailed { + w.upstreamFailed[v] = struct{}{} + } + w.diagsLock.Unlock() } func (w *Walker) waitDeps( @@ -407,6 +420,7 @@ func (w *Walker) waitDeps( deps map[Vertex]<-chan struct{}, doneCh chan<- bool, cancelCh <-chan struct{}) { + // For each dependency given to us, wait for it to complete for dep, depCh := range deps { DepSatisfied: @@ -430,10 +444,10 @@ func (w *Walker) waitDeps( } // Dependencies satisfied! We need to check if any errored - w.errLock.Lock() - defer w.errLock.Unlock() - for dep, _ := range deps { - if w.errMap[dep] != nil { + w.diagsLock.Lock() + defer w.diagsLock.Unlock() + for dep := range deps { + if w.diagsMap[dep].HasErrors() { // One of our dependencies failed, so return false doneCh <- false return diff --git a/dag/walk_test.go b/dag/walk_test.go index 9095d7189..5464248b1 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -6,6 +6,8 @@ import ( "sync" "testing" "time" + + "github.com/hashicorp/terraform/tfdiags" ) func TestWalker_basic(t *testing.T) { @@ -28,7 +30,7 @@ func TestWalker_basic(t *testing.T) { // Check expected := []interface{}{1, 2} if !reflect.DeepEqual(order, expected) { - t.Fatalf("bad: %#v", order) + t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected) } } } @@ -68,9 +70,11 @@ func TestWalker_error(t *testing.T) { recordF := walkCbRecord(&order) // Build a callback that delays until we close a channel - cb := func(v Vertex) error { + cb := func(v Vertex) tfdiags.Diagnostics { if v == 2 { - return fmt.Errorf("error!") + var diags tfdiags.Diagnostics + diags = diags.Append(fmt.Errorf("error")) + return diags } return recordF(v) @@ -87,7 +91,7 @@ func TestWalker_error(t *testing.T) { // Check expected := []interface{}{1} if !reflect.DeepEqual(order, expected) { - t.Fatalf("bad: %#v", order) + t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected) } } @@ -104,7 +108,7 @@ func TestWalker_newVertex(t *testing.T) { // Build a callback that notifies us when 2 has been walked var w *Walker - cb := func(v Vertex) error { + cb := func(v Vertex) tfdiags.Diagnostics { if v == 2 { defer close(done2) } @@ -134,7 +138,7 @@ func TestWalker_newVertex(t *testing.T) { // Check expected := []interface{}{1, 2, 3} if !reflect.DeepEqual(order, expected) { - t.Fatalf("bad: %#v", order) + t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected) } } @@ -149,7 +153,7 @@ func TestWalker_removeVertex(t *testing.T) { recordF := walkCbRecord(&order) var w *Walker - cb := func(v Vertex) error { + cb := func(v Vertex) tfdiags.Diagnostics { if v == 1 { g.Remove(2) w.Update(&g) @@ -170,7 +174,7 @@ func TestWalker_removeVertex(t *testing.T) { // Check expected := []interface{}{1} if !reflect.DeepEqual(order, expected) { - t.Fatalf("bad: %#v", order) + t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected) } } @@ -185,17 +189,17 @@ func TestWalker_newEdge(t *testing.T) { recordF := walkCbRecord(&order) var w *Walker - cb := func(v Vertex) error { + cb := func(v Vertex) tfdiags.Diagnostics { // record where we are first, otherwise the Updated vertex may get // walked before the first visit. - err := recordF(v) + diags := recordF(v) if v == 1 { g.Add(3) g.Connect(BasicEdge(3, 2)) w.Update(&g) } - return err + return diags } // Add the initial vertices @@ -210,7 +214,7 @@ func TestWalker_newEdge(t *testing.T) { // Check expected := []interface{}{1, 3, 2} if !reflect.DeepEqual(order, expected) { - t.Fatalf("bad: %#v", order) + t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected) } } @@ -236,23 +240,30 @@ func TestWalker_removeEdge(t *testing.T) { // this test will timeout. var w *Walker gateCh := make(chan struct{}) - cb := func(v Vertex) error { + cb := func(v Vertex) tfdiags.Diagnostics { + t.Logf("visit vertex %#v", v) switch v { case 1: g.RemoveEdge(BasicEdge(3, 2)) w.Update(&g) + t.Logf("removed edge from 3 to 2") case 2: // this visit isn't completed until we've recorded it // Once the visit is official, we can then close the gate to // let 3 continue. defer close(gateCh) + defer t.Logf("2 unblocked 3") case 3: select { case <-gateCh: - case <-time.After(50 * time.Millisecond): - return fmt.Errorf("timeout 3 waiting for 2") + t.Logf("vertex 3 gate channel is now closed") + case <-time.After(500 * time.Millisecond): + t.Logf("vertex 3 timed out waiting for the gate channel to close") + var diags tfdiags.Diagnostics + diags = diags.Append(fmt.Errorf("timeout 3 waiting for 2")) + return diags } } @@ -264,21 +275,21 @@ func TestWalker_removeEdge(t *testing.T) { w.Update(&g) // Wait - if err := w.Wait(); err != nil { - t.Fatalf("err: %s", err) + if diags := w.Wait(); diags.HasErrors() { + t.Fatalf("unexpected errors: %s", diags.Err()) } // Check expected := []interface{}{1, 2, 3} if !reflect.DeepEqual(order, expected) { - t.Fatalf("bad: %#v", order) + t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected) } } // walkCbRecord is a test helper callback that just records the order called. func walkCbRecord(order *[]interface{}) WalkFunc { var l sync.Mutex - return func(v Vertex) error { + return func(v Vertex) tfdiags.Diagnostics { l.Lock() defer l.Unlock() *order = append(*order, v)