dag: node visits return diagnostics rather than errors

This allows node visits to also return warnings.
This commit is contained in:
Martin Atkins 2018-04-06 16:25:35 -07:00
parent 24dce0c624
commit 2c70d884d6
5 changed files with 102 additions and 73 deletions

View File

@ -5,6 +5,8 @@ import (
"sort"
"strings"
"github.com/hashicorp/terraform/tfdiags"
"github.com/hashicorp/go-multierror"
)
@ -15,7 +17,7 @@ type AcyclicGraph struct {
}
// WalkFunc is the callback used for walking the graph.
type WalkFunc func(Vertex) error
type WalkFunc func(Vertex) tfdiags.Diagnostics
// DepthWalkFunc is a walk function that also receives the current depth of the
// walk as an argument
@ -161,9 +163,9 @@ func (g *AcyclicGraph) Cycles() [][]Vertex {
}
// Walk walks the graph, calling your callback as each node is visited.
// This will walk nodes in parallel if it can. Because the walk is done
// in parallel, the error returned will be a multierror.
func (g *AcyclicGraph) Walk(cb WalkFunc) error {
// This will walk nodes in parallel if it can. The resulting diagnostics
// contains problems from all graphs visited, in no particular order.
func (g *AcyclicGraph) Walk(cb WalkFunc) tfdiags.Diagnostics {
defer g.debug.BeginOperation(typeWalk, "").End("")
w := &Walker{Callback: cb, Reverse: true}

View File

@ -12,6 +12,8 @@ import (
"sync"
"testing"
"github.com/hashicorp/terraform/tfdiags"
"github.com/hashicorp/terraform/helper/logging"
)
@ -273,7 +275,7 @@ func TestAcyclicGraphWalk(t *testing.T) {
var visits []Vertex
var lock sync.Mutex
err := g.Walk(func(v Vertex) error {
err := g.Walk(func(v Vertex) tfdiags.Diagnostics {
lock.Lock()
defer lock.Unlock()
visits = append(visits, v)
@ -308,31 +310,29 @@ func TestAcyclicGraphWalk_error(t *testing.T) {
var visits []Vertex
var lock sync.Mutex
err := g.Walk(func(v Vertex) error {
err := g.Walk(func(v Vertex) tfdiags.Diagnostics {
lock.Lock()
defer lock.Unlock()
var diags tfdiags.Diagnostics
if v == 2 {
return fmt.Errorf("error")
diags = diags.Append(fmt.Errorf("error"))
return diags
}
visits = append(visits, v)
return nil
return diags
})
if err == nil {
t.Fatal("should error")
}
expected := [][]Vertex{
{1},
}
for _, e := range expected {
if reflect.DeepEqual(visits, e) {
return
}
expected := []Vertex{1}
if !reflect.DeepEqual(visits, expected) {
t.Errorf("wrong visits\ngot: %#v\nwant: %#v", visits, expected)
}
t.Fatalf("bad: %#v", visits)
}
func TestAcyclicGraph_ReverseDepthFirstWalk_WithRemoval(t *testing.T) {

View File

@ -5,6 +5,8 @@ import (
"encoding/json"
"strings"
"testing"
"github.com/hashicorp/terraform/tfdiags"
)
func TestGraphDot_empty(t *testing.T) {
@ -322,7 +324,7 @@ func TestGraphJSON_debugVisits(t *testing.T) {
g.Connect(BasicEdge(4, 2))
g.Connect(BasicEdge(3, 4))
err := (&AcyclicGraph{g}).Walk(func(v Vertex) error {
err := (&AcyclicGraph{g}).Walk(func(v Vertex) tfdiags.Diagnostics {
g.DebugVisitInfo(v, "basic walk")
return nil
})

View File

@ -2,12 +2,11 @@ package dag
import (
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform/tfdiags"
)
// Walker is used to walk every vertex of a graph in parallel.
@ -54,10 +53,15 @@ type Walker struct {
// if new vertices are added.
wait sync.WaitGroup
// errMap contains the errors recorded so far for execution. Reading
// and writing should hold errLock.
errMap map[Vertex]error
errLock sync.Mutex
// diagsMap contains the diagnostics recorded so far for execution,
// and upstreamFailed contains all the vertices whose problems were
// caused by upstream failures, and thus whose diagnostics should be
// excluded from the final set.
//
// Readers and writers of either map must hold diagsLock.
diagsMap map[Vertex]tfdiags.Diagnostics
upstreamFailed map[Vertex]struct{}
diagsLock sync.Mutex
}
type walkerVertex struct {
@ -98,31 +102,30 @@ type walkerVertex struct {
// user-returned error.
var errWalkUpstream = errors.New("upstream dependency failed")
// Wait waits for the completion of the walk and returns any errors (
// in the form of a multierror) that occurred. Update should be called
// to populate the walk with vertices and edges prior to calling this.
// Wait waits for the completion of the walk and returns diagnostics describing
// any problems that arose. Update should be called to populate the walk with
// vertices and edges prior to calling this.
//
// Wait will return as soon as all currently known vertices are complete.
// If you plan on calling Update with more vertices in the future, you
// should not call Wait until after this is done.
func (w *Walker) Wait() error {
func (w *Walker) Wait() tfdiags.Diagnostics {
// Wait for completion
w.wait.Wait()
// Grab the error lock
w.errLock.Lock()
defer w.errLock.Unlock()
// Build the error
var result error
for v, err := range w.errMap {
if err != nil && err != errWalkUpstream {
result = multierror.Append(result, fmt.Errorf(
"%s: %s", VertexName(v), err))
var diags tfdiags.Diagnostics
w.diagsLock.Lock()
for v, vDiags := range w.diagsMap {
if _, upstream := w.upstreamFailed[v]; upstream {
// Ignore diagnostics for nodes that had failed upstreams, since
// the downstream diagnostics are likely to be redundant.
continue
}
diags = diags.Append(vDiags)
}
w.diagsLock.Unlock()
return result
return diags
}
// Update updates the currently executing walk with the given graph.
@ -136,6 +139,7 @@ func (w *Walker) Wait() error {
// Multiple Updates can be called in parallel. Update can be called at any
// time during a walk.
func (w *Walker) Update(g *AcyclicGraph) {
log.Print("[TRACE] dag/walk: updating graph")
var v, e *Set
if g != nil {
v, e = g.vertices, g.edges
@ -381,25 +385,34 @@ func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {
}
// Run our callback or note that our upstream failed
var err error
var diags tfdiags.Diagnostics
var upstreamFailed bool
if depsSuccess {
log.Printf("[TRACE] dag/walk: walking %q", VertexName(v))
err = w.Callback(v)
diags = w.Callback(v)
} else {
log.Printf("[TRACE] dag/walk: upstream errored, not walking %q", VertexName(v))
err = errWalkUpstream
// This won't be displayed to the user because we'll set upstreamFailed,
// but we need to ensure there's at least one error in here so that
// the failures will cascade downstream.
diags = diags.Append(errors.New("upstream dependencies failed"))
upstreamFailed = true
}
// Record the error
if err != nil {
w.errLock.Lock()
defer w.errLock.Unlock()
if w.errMap == nil {
w.errMap = make(map[Vertex]error)
}
w.errMap[v] = err
// Record the result (we must do this after execution because we mustn't
// hold diagsLock while visiting a vertex.)
w.diagsLock.Lock()
if w.diagsMap == nil {
w.diagsMap = make(map[Vertex]tfdiags.Diagnostics)
}
w.diagsMap[v] = diags
if w.upstreamFailed == nil {
w.upstreamFailed = make(map[Vertex]struct{})
}
if upstreamFailed {
w.upstreamFailed[v] = struct{}{}
}
w.diagsLock.Unlock()
}
func (w *Walker) waitDeps(
@ -407,6 +420,7 @@ func (w *Walker) waitDeps(
deps map[Vertex]<-chan struct{},
doneCh chan<- bool,
cancelCh <-chan struct{}) {
// For each dependency given to us, wait for it to complete
for dep, depCh := range deps {
DepSatisfied:
@ -430,10 +444,10 @@ func (w *Walker) waitDeps(
}
// Dependencies satisfied! We need to check if any errored
w.errLock.Lock()
defer w.errLock.Unlock()
for dep, _ := range deps {
if w.errMap[dep] != nil {
w.diagsLock.Lock()
defer w.diagsLock.Unlock()
for dep := range deps {
if w.diagsMap[dep].HasErrors() {
// One of our dependencies failed, so return false
doneCh <- false
return

View File

@ -6,6 +6,8 @@ import (
"sync"
"testing"
"time"
"github.com/hashicorp/terraform/tfdiags"
)
func TestWalker_basic(t *testing.T) {
@ -28,7 +30,7 @@ func TestWalker_basic(t *testing.T) {
// Check
expected := []interface{}{1, 2}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected)
}
}
}
@ -68,9 +70,11 @@ func TestWalker_error(t *testing.T) {
recordF := walkCbRecord(&order)
// Build a callback that delays until we close a channel
cb := func(v Vertex) error {
cb := func(v Vertex) tfdiags.Diagnostics {
if v == 2 {
return fmt.Errorf("error!")
var diags tfdiags.Diagnostics
diags = diags.Append(fmt.Errorf("error"))
return diags
}
return recordF(v)
@ -87,7 +91,7 @@ func TestWalker_error(t *testing.T) {
// Check
expected := []interface{}{1}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected)
}
}
@ -104,7 +108,7 @@ func TestWalker_newVertex(t *testing.T) {
// Build a callback that notifies us when 2 has been walked
var w *Walker
cb := func(v Vertex) error {
cb := func(v Vertex) tfdiags.Diagnostics {
if v == 2 {
defer close(done2)
}
@ -134,7 +138,7 @@ func TestWalker_newVertex(t *testing.T) {
// Check
expected := []interface{}{1, 2, 3}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected)
}
}
@ -149,7 +153,7 @@ func TestWalker_removeVertex(t *testing.T) {
recordF := walkCbRecord(&order)
var w *Walker
cb := func(v Vertex) error {
cb := func(v Vertex) tfdiags.Diagnostics {
if v == 1 {
g.Remove(2)
w.Update(&g)
@ -170,7 +174,7 @@ func TestWalker_removeVertex(t *testing.T) {
// Check
expected := []interface{}{1}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected)
}
}
@ -185,17 +189,17 @@ func TestWalker_newEdge(t *testing.T) {
recordF := walkCbRecord(&order)
var w *Walker
cb := func(v Vertex) error {
cb := func(v Vertex) tfdiags.Diagnostics {
// record where we are first, otherwise the Updated vertex may get
// walked before the first visit.
err := recordF(v)
diags := recordF(v)
if v == 1 {
g.Add(3)
g.Connect(BasicEdge(3, 2))
w.Update(&g)
}
return err
return diags
}
// Add the initial vertices
@ -210,7 +214,7 @@ func TestWalker_newEdge(t *testing.T) {
// Check
expected := []interface{}{1, 3, 2}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected)
}
}
@ -236,23 +240,30 @@ func TestWalker_removeEdge(t *testing.T) {
// this test will timeout.
var w *Walker
gateCh := make(chan struct{})
cb := func(v Vertex) error {
cb := func(v Vertex) tfdiags.Diagnostics {
t.Logf("visit vertex %#v", v)
switch v {
case 1:
g.RemoveEdge(BasicEdge(3, 2))
w.Update(&g)
t.Logf("removed edge from 3 to 2")
case 2:
// this visit isn't completed until we've recorded it
// Once the visit is official, we can then close the gate to
// let 3 continue.
defer close(gateCh)
defer t.Logf("2 unblocked 3")
case 3:
select {
case <-gateCh:
case <-time.After(50 * time.Millisecond):
return fmt.Errorf("timeout 3 waiting for 2")
t.Logf("vertex 3 gate channel is now closed")
case <-time.After(500 * time.Millisecond):
t.Logf("vertex 3 timed out waiting for the gate channel to close")
var diags tfdiags.Diagnostics
diags = diags.Append(fmt.Errorf("timeout 3 waiting for 2"))
return diags
}
}
@ -264,21 +275,21 @@ func TestWalker_removeEdge(t *testing.T) {
w.Update(&g)
// Wait
if err := w.Wait(); err != nil {
t.Fatalf("err: %s", err)
if diags := w.Wait(); diags.HasErrors() {
t.Fatalf("unexpected errors: %s", diags.Err())
}
// Check
expected := []interface{}{1, 2, 3}
if !reflect.DeepEqual(order, expected) {
t.Fatalf("bad: %#v", order)
t.Errorf("wrong order\ngot: %#v\nwant: %#v", order, expected)
}
}
// walkCbRecord is a test helper callback that just records the order called.
func walkCbRecord(order *[]interface{}) WalkFunc {
var l sync.Mutex
return func(v Vertex) error {
return func(v Vertex) tfdiags.Diagnostics {
l.Lock()
defer l.Unlock()
*order = append(*order, v)