diff --git a/dag/walk.go b/dag/walk.go index 6339e1736..12c421681 100644 --- a/dag/walk.go +++ b/dag/walk.go @@ -80,6 +80,7 @@ func (w *walker) Update(v, e *Set) { // Calculate all our sets newEdges := e.Difference(&w.edges) + oldEdges := w.edges.Difference(e) newVerts := v.Difference(&w.vertices) oldVerts := w.vertices.Difference(v) @@ -161,6 +162,40 @@ func (w *walker) Update(v, e *Set) { // Record that the deps changed for this waiter changedDeps.Add(waiter) + + log.Printf( + "[DEBUG] dag/walk: added edge: %q waiting on %q", + VertexName(waiter), VertexName(dep)) + w.edges.Add(raw) + } + + // Process reoved edges + for _, raw := range oldEdges.List() { + edge := raw.(Edge) + + // waiter is the vertex that is "waiting" on this edge + waiter := edge.Target() + + // dep is the dependency we're waiting on + dep := edge.Source() + + // Get the info for the waiter + waiterInfo, ok := w.vertexMap[waiter] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Delete the dependency from the waiter + delete(waiterInfo.deps, dep) + + // Record that the deps changed for this waiter + changedDeps.Add(waiter) + + log.Printf( + "[DEBUG] dag/walk: removed edge: %q waiting on %q", + VertexName(waiter), VertexName(dep)) + w.edges.Delete(raw) } // For each vertex with changed dependencies, we need to kick off diff --git a/dag/walk_test.go b/dag/walk_test.go index 5051b9a9b..c720e8a54 100644 --- a/dag/walk_test.go +++ b/dag/walk_test.go @@ -1,6 +1,7 @@ package dag import ( + "fmt" "reflect" "sync" "testing" @@ -155,6 +156,67 @@ func TestWalker_newEdge(t *testing.T) { } } +func TestWalker_removeEdge(t *testing.T) { + // Run it a bunch of times since it is timing dependent + for i := 0; i < 50; i++ { + var g Graph + g.Add(1) + g.Add(2) + g.Add(3) + g.Connect(BasicEdge(1, 2)) + g.Connect(BasicEdge(3, 2)) + + // Record function + var order []interface{} + recordF := walkCbRecord(&order) + + // The way this works is that our original graph forces + // the order of 1 => 3 => 2. During the execution of 1, we + // remove the edge forcing 3 before 2. Then, during the execution + // of 3, we wait on a channel that is only closed by 2, implicitly + // forcing 2 before 3 via the callback (and not the graph). If + // 2 cannot execute before 3 (edge removal is non-functional), then + // this test will timeout. + var w *walker + gateCh := make(chan struct{}) + cb := func(v Vertex) error { + if v == 1 { + g.RemoveEdge(BasicEdge(3, 2)) + w.Update(g.vertices, g.edges) + } + + if v == 2 { + close(gateCh) + } + + if v == 3 { + select { + case <-gateCh: + case <-time.After(50 * time.Millisecond): + return fmt.Errorf("timeout 3 waiting for 2") + } + } + + return recordF(v) + } + + // Add the initial vertices + w = &walker{Callback: cb} + w.Update(g.vertices, g.edges) + + // Wait + if err := w.Wait(); err != nil { + t.Fatalf("err: %s", err) + } + + // Check + expected := []interface{}{1, 2, 3} + if !reflect.DeepEqual(order, expected) { + t.Fatalf("bad: %#v", order) + } + } +} + // walkCbRecord is a test helper callback that just records the order called. func walkCbRecord(order *[]interface{}) WalkFunc { var l sync.Mutex