pulumi/pkg/backend/diy/pool_test.go

204 lines
4.1 KiB
Go
Raw Normal View History

filestate: Add re-usable worker pool The filestate backend's Upgrade method currently does some manual goroutine management to ensure that it uses a fixed number of goroutines as it attempts to upgrade all stacks in parallel. This was fine while the upgrade step was just one phase: Roughly: for _, stack := range stacks { go upgrade(stack) } // Using a pool instead of a new goroutine for each upgrade. However, this will not suffice with upcoming changes to address #12600 because the upgrade process will now have multiple phases: gather information, fill missing information with a prompt, upgrade. Only the first and last phases of this are parallelizable. Attempting to do that using the existing pattern in Upgrade will lead to code that is quite difficult to read, so this change introduces a simple shared worker pool abstraction. It operates like a mix of `sync.WaitGroup` and [errgroup][1]. Namely: - supports multiple `Wait` and `Enqueue` phases (like WaitGroup) - supports functions that return errors (like errgroup) [1]: https://pkg.go.dev/golang.org/x/sync@v0.1.0/errgroup This makes it very easy to adapt code that looks like the following: wg := &errgroup.Group{} for _, x := range xs { x := x wg.Go(func() error { return f(x) }) } if err := wg.Wait(); err != nil { return err } wg = &errgroup.Group{} for _, y := range ys { y := y wg.Go(func() error { return f(y) }) } if err := wg.Wait(); err != nil { return err } Into the following: pool := newWorkerPool(..) defer pool.Close() for _, x := range xs { x := x pool.Enqueue(func() error { return f(x) }) } if err := pool.Wait(); err != nil { return err } for _, y := range ys { y := y pool.Enqueue(func() error { return f(y) }) } if err := pool.Wait(); err != nil { return err } The workerPool-based version looks similar, but it'll spawn a fixed number of goroutines once at the start and re-use them for all tasks.
2023-04-12 19:13:47 +00:00
// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package diy
filestate: Add re-usable worker pool The filestate backend's Upgrade method currently does some manual goroutine management to ensure that it uses a fixed number of goroutines as it attempts to upgrade all stacks in parallel. This was fine while the upgrade step was just one phase: Roughly: for _, stack := range stacks { go upgrade(stack) } // Using a pool instead of a new goroutine for each upgrade. However, this will not suffice with upcoming changes to address #12600 because the upgrade process will now have multiple phases: gather information, fill missing information with a prompt, upgrade. Only the first and last phases of this are parallelizable. Attempting to do that using the existing pattern in Upgrade will lead to code that is quite difficult to read, so this change introduces a simple shared worker pool abstraction. It operates like a mix of `sync.WaitGroup` and [errgroup][1]. Namely: - supports multiple `Wait` and `Enqueue` phases (like WaitGroup) - supports functions that return errors (like errgroup) [1]: https://pkg.go.dev/golang.org/x/sync@v0.1.0/errgroup This makes it very easy to adapt code that looks like the following: wg := &errgroup.Group{} for _, x := range xs { x := x wg.Go(func() error { return f(x) }) } if err := wg.Wait(); err != nil { return err } wg = &errgroup.Group{} for _, y := range ys { y := y wg.Go(func() error { return f(y) }) } if err := wg.Wait(); err != nil { return err } Into the following: pool := newWorkerPool(..) defer pool.Close() for _, x := range xs { x := x pool.Enqueue(func() error { return f(x) }) } if err := pool.Wait(); err != nil { return err } for _, y := range ys { y := y pool.Enqueue(func() error { return f(y) }) } if err := pool.Wait(); err != nil { return err } The workerPool-based version looks similar, but it'll spawn a fixed number of goroutines once at the start and re-use them for all tasks.
2023-04-12 19:13:47 +00:00
import (
"errors"
"fmt"
"runtime"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"pgregory.net/rapid"
)
func TestWorkerPool_reusable(t *testing.T) {
t.Parallel()
pool := newWorkerPool(0, 0)
defer pool.Close()
const (
numPhases = 10
numTasks = 100
)
// Verifies that a worker pool is re-usable
// by enqueuing a bunch of tasks to it,
// waiting for them, and then enqueuing more.
//
// Each enqueue-wait cycle is called a "phase".
// We run multiple phases to verify that the pool
// is re-usable and does not get stuck after the first phase.
for phase := 0; phase < numPhases; phase++ {
var count atomic.Int64
for task := 0; task < numTasks; task++ {
pool.Enqueue(func() error {
count.Add(1)
return nil
})
}
require.NoError(t, pool.Wait())
assert.Equal(t, int64(numTasks), count.Load())
}
}
func TestWorkerPool_error(t *testing.T) {
t.Parallel()
pool := newWorkerPool(0, 0)
defer pool.Close()
const numTasks = 100
// Create N unique errors to return from the tasks.
errors := make([]error, numTasks)
for i := range errors {
errors[i] = fmt.Errorf("error %d", i)
}
for _, err := range errors {
err := err
pool.Enqueue(func() error {
return err
})
}
err := pool.Wait()
require.Error(t, err)
// Validate that the returned error matches
// the errors returned by the tasks.
for i, err := range errors {
assert.ErrorIs(t, err, errors[i])
}
}
func TestWorkerPool_oneError(t *testing.T) {
t.Parallel()
pool := newWorkerPool(0, 0)
defer pool.Close()
const numTasks = 10
giveErr := errors.New("great sadness")
for i := 0; i < numTasks; i++ {
i := i
pool.Enqueue(func() error {
if i == 7 {
return giveErr
}
return nil
})
}
err := pool.Wait()
require.Error(t, err)
assert.ErrorIs(t, err, giveErr)
}
func TestWorkerPool_workerCount(t *testing.T) {
t.Parallel()
gomaxprocs := runtime.GOMAXPROCS(0)
tests := []struct {
desc string
numWorkers int
numTasksHint int
wantWorkers int
}{
{
desc: "default",
wantWorkers: gomaxprocs,
},
{
desc: "negative",
numWorkers: -1,
wantWorkers: gomaxprocs,
},
{
desc: "explicit",
numWorkers: 2,
wantWorkers: 2,
},
{
desc: "hint/too small",
numWorkers: 4,
numTasksHint: 2,
wantWorkers: 2,
},
{
desc: "hint/large",
numWorkers: 1,
numTasksHint: 42,
wantWorkers: 1,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.desc, func(t *testing.T) {
t.Parallel()
pool := newWorkerPool(tt.numWorkers, tt.numTasksHint)
defer pool.Close()
assert.Equal(t, tt.wantWorkers, pool.numWorkers)
})
}
}
// Verifies that no combination of core actions on a pool
// can cause it to deadlock or panic.
func TestWorkerPool_randomActions(t *testing.T) {
t.Parallel()
rapid.Check(t, func(t *rapid.T) {
pool := newWorkerPool(0, 0)
// Number of tasks epqueued but not yet running.
var pending atomic.Int64
// Runs a random sequence of actions from the
// map of actions.
t.Run(map[string]func(*rapid.T){
"enqueue": func(t *rapid.T) {
pending.Add(1)
pool.Enqueue(func() error {
defer pending.Add(-1)
// Yield to other goroutines
// so the enqueue doesn't resolve
// immediately.
runtime.Gosched()
return nil
})
},
"wait": func(t *rapid.T) {
assert.NoError(t, pool.Wait())
assert.Zero(t, pending.Load())
},
})
pool.Close()
assert.Zero(t, pending.Load())
})
}