From 1e5a8e4daefc4d8df8657f9f3ef719164b6f505d Mon Sep 17 00:00:00 2001 From: James Bardin Date: Tue, 2 Feb 2021 15:38:06 -0500 Subject: [PATCH] fix concurrent test relying on sleep Make an old concurrent test deterministic, and not rely on sleep for any synchronization. --- command/apply_test.go | 111 ++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 74 deletions(-) diff --git a/command/apply_test.go b/command/apply_test.go index 209f6b557..dbfad3ab0 100644 --- a/command/apply_test.go +++ b/command/apply_test.go @@ -2,6 +2,7 @@ package command import ( "bytes" + "context" "fmt" "io/ioutil" "os" @@ -143,34 +144,8 @@ func TestApply_lockedStateWait(t *testing.T) { } } -// high water mark counter -type hwm struct { - sync.Mutex - val int - max int -} - -func (t *hwm) Inc() { - t.Lock() - defer t.Unlock() - t.val++ - if t.val > t.max { - t.max = t.val - } -} - -func (t *hwm) Dec() { - t.Lock() - defer t.Unlock() - t.val-- -} - -func (t *hwm) Max() int { - t.Lock() - defer t.Unlock() - return t.max -} - +// Verify that the parallelism flag allows no more than the desired number of +// concurrent calls to ApplyResourceChange. func TestApply_parallelism(t *testing.T) { // Create a temporary working directory that is empty td := tempDir(t) @@ -182,13 +157,15 @@ func TestApply_parallelism(t *testing.T) { par := 4 - // This blocks all the apply functions. We close it when we exit so - // they end quickly after this test finishes. - block := make(chan struct{}) - // signal how many goroutines have started - started := make(chan int, 100) + // started is a semaphore that we use to ensure that we never have more + // than "par" apply operations happening concurrently + started := make(chan struct{}, par) - runCount := &hwm{} + // beginCtx is used as a starting gate to hold back ApplyResourceChange + // calls until we reach the desired concurrency. The cancel func "begin" is + // called once we reach the desired concurrency, allowing all apply calls + // to proceed in unison. + beginCtx, begin := context.WithCancel(context.Background()) // Since our mock provider has its own mutex preventing concurrent calls // to ApplyResourceChange, we need to use a number of separate providers @@ -209,12 +186,29 @@ func TestApply_parallelism(t *testing.T) { } } provider.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse { - // Increment so we're counting parallelism - started <- 1 - runCount.Inc() - defer runCount.Dec() - // Block here to stage up our max number of parallel instances - <-block + + // If we ever have more than our intended parallelism number of + // apply operations running concurrently, the semaphore will fail. + select { + case started <- struct{}{}: + defer func() { + <-started + }() + default: + t.Fatal("too many concurrent apply operations") + } + + // If we never reach our intended parallelism, the context will + // never be canceled and the test will time out. + if len(started) >= par { + begin() + } + <-beginCtx.Done() + + // do some "work" + // Not required for correctness, but makes it easier to spot a + // failure when there is more overlap. + time.Sleep(10 * time.Millisecond) return providers.ApplyResourceChangeResponse{ NewState: cty.EmptyObjectVal, @@ -240,40 +234,9 @@ func TestApply_parallelism(t *testing.T) { fmt.Sprintf("-parallelism=%d", par), } - // Run in a goroutine. We can get any errors from the ui.OutputWriter - doneCh := make(chan int, 1) - go func() { - doneCh <- c.Run(args) - }() - - timeout := time.After(5 * time.Second) - - // ensure things are running - for i := 0; i < par; i++ { - select { - case <-timeout: - t.Fatal("timeout waiting for all goroutines to start") - case <-started: - } - } - - // a little extra sleep, since we can't ensure all goroutines from the walk have - // really started - time.Sleep(100 * time.Millisecond) - close(block) - - select { - case res := <-doneCh: - if res != 0 { - t.Fatal(ui.OutputWriter.String()) - } - case <-timeout: - t.Fatal("timeout waiting from Run()") - } - - // The total in flight should equal the parallelism - if runCount.Max() != par { - t.Fatalf("Expected parallelism: %d, got: %d", par, runCount.Max()) + res := c.Run(args) + if res != 0 { + t.Fatal(ui.OutputWriter.String()) } }