Merge pull request #27668 from hashicorp/jbardin/parallelism-test

fix concurrent test relying on sleep
This commit is contained in:
James Bardin 2021-02-03 09:39:42 -05:00 committed by GitHub
commit 030632e87e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 37 additions and 74 deletions

View File

@ -2,6 +2,7 @@ package command
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
@ -143,34 +144,8 @@ func TestApply_lockedStateWait(t *testing.T) {
}
}
// high water mark counter
type hwm struct {
sync.Mutex
val int
max int
}
func (t *hwm) Inc() {
t.Lock()
defer t.Unlock()
t.val++
if t.val > t.max {
t.max = t.val
}
}
func (t *hwm) Dec() {
t.Lock()
defer t.Unlock()
t.val--
}
func (t *hwm) Max() int {
t.Lock()
defer t.Unlock()
return t.max
}
// Verify that the parallelism flag allows no more than the desired number of
// concurrent calls to ApplyResourceChange.
func TestApply_parallelism(t *testing.T) {
// Create a temporary working directory that is empty
td := tempDir(t)
@ -182,13 +157,15 @@ func TestApply_parallelism(t *testing.T) {
par := 4
// This blocks all the apply functions. We close it when we exit so
// they end quickly after this test finishes.
block := make(chan struct{})
// signal how many goroutines have started
started := make(chan int, 100)
// started is a semaphore that we use to ensure that we never have more
// than "par" apply operations happening concurrently
started := make(chan struct{}, par)
runCount := &hwm{}
// beginCtx is used as a starting gate to hold back ApplyResourceChange
// calls until we reach the desired concurrency. The cancel func "begin" is
// called once we reach the desired concurrency, allowing all apply calls
// to proceed in unison.
beginCtx, begin := context.WithCancel(context.Background())
// Since our mock provider has its own mutex preventing concurrent calls
// to ApplyResourceChange, we need to use a number of separate providers
@ -209,12 +186,29 @@ func TestApply_parallelism(t *testing.T) {
}
}
provider.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
// Increment so we're counting parallelism
started <- 1
runCount.Inc()
defer runCount.Dec()
// Block here to stage up our max number of parallel instances
<-block
// If we ever have more than our intended parallelism number of
// apply operations running concurrently, the semaphore will fail.
select {
case started <- struct{}{}:
defer func() {
<-started
}()
default:
t.Fatal("too many concurrent apply operations")
}
// If we never reach our intended parallelism, the context will
// never be canceled and the test will time out.
if len(started) >= par {
begin()
}
<-beginCtx.Done()
// do some "work"
// Not required for correctness, but makes it easier to spot a
// failure when there is more overlap.
time.Sleep(10 * time.Millisecond)
return providers.ApplyResourceChangeResponse{
NewState: cty.EmptyObjectVal,
@ -240,40 +234,9 @@ func TestApply_parallelism(t *testing.T) {
fmt.Sprintf("-parallelism=%d", par),
}
// Run in a goroutine. We can get any errors from the ui.OutputWriter
doneCh := make(chan int, 1)
go func() {
doneCh <- c.Run(args)
}()
timeout := time.After(5 * time.Second)
// ensure things are running
for i := 0; i < par; i++ {
select {
case <-timeout:
t.Fatal("timeout waiting for all goroutines to start")
case <-started:
}
}
// a little extra sleep, since we can't ensure all goroutines from the walk have
// really started
time.Sleep(100 * time.Millisecond)
close(block)
select {
case res := <-doneCh:
if res != 0 {
t.Fatal(ui.OutputWriter.String())
}
case <-timeout:
t.Fatal("timeout waiting from Run()")
}
// The total in flight should equal the parallelism
if runCount.Max() != par {
t.Fatalf("Expected parallelism: %d, got: %d", par, runCount.Max())
res := c.Run(args)
if res != 0 {
t.Fatal(ui.OutputWriter.String())
}
}