pulumi/sdk/go/common/promise/promise.go

159 lines
4.4 KiB
Go
Raw Normal View History

Add a strict opinionated promise library (#14552) <!--- Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation. --> # Description <!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. --> Prompted by having another case in https://github.com/pulumi/pulumi/pull/14548 that looked like "this is just a promise". We already had a mini-promise library for provider config in sdk/go/common/resource/plugin/provider_plugin.go. Plus a couple of places where we did a poor mans Go promise by having a WaitGroup plus a variable to set (see the two changes in /pkg). This adds a little promise library to safely cover all three of these cases plus the case I'll be adding in #14548. It is _minimal_ adding just the features of promises needed for these initial cases. We can add to it as needed. I suspect we've got a number of places in test code that could probably use this as well, but I haven't gone through that. I also suspect that having this type available will result in more places in the future being simpler because "its just a promise" is a fairly common scenario in async systems. In fact Output's internally _are just a promise_ so we could probably rewrite their internals using this, add a `Then()` like method for apply and all the async complexity gets handled in the promise layer while the output layer just cares about unknowns/secrets/etc. ## Checklist - [x] I have run `make tidy` to update any new dependencies - [x] I have run `make lint` to verify my code passes the lint check - [ ] I have formatted my code using `gofumpt` <!--- Please provide details if the checkbox below is to be left unchecked. --> - [x] I have added tests that prove my fix is effective or that my feature works <!--- User-facing changes require a CHANGELOG entry. --> - [ ] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change - This is in sdk/go/common and I'm considering it an non-public api for now. <!-- If the change(s) in this PR is a modification of an existing call to the Pulumi Cloud, then the service should honor older versions of the CLI where this change would not exist. You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add it to the service. --> - [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Cloud API version <!-- @Pulumi employees: If yes, you must submit corresponding changes in the service repo. -->
2023-11-15 14:53:12 +00:00
// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promise
import (
"context"
"sync"
"sync/atomic"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
)
const (
statusUninitialized int32 = iota
statusPending
statusFulfilled
statusRejected
)
// Promise is a promise that can be resolved with a value of type T or rejected with an error. It is safe to call Result
// on it multiple times from multiple goroutines. This is much more permissive than channels.
type Promise[T any] struct {
done chan struct{}
mutex sync.Mutex
status atomic.Int32
result T
err error
}
// Result waits for the promise to be resolved and returns the result.
func (p *Promise[T]) Result(ctx context.Context) (T, error) {
if p.status.Load() == statusUninitialized {
panic("Promise must be initialized")
}
// Wait for either the promise or context to be done, if the context is done just exit with it's error
select {
case <-p.done:
break
case <-ctx.Done():
var t T
return t, ctx.Err()
}
contract.Assertf(p.status.Load() != statusPending, "Promise must be resolved")
// Only one of result or err will be set, the other will be the zero value so we can just return both.
return p.result, p.err
}
Use promise rather than `atomic.Value` to record step errors. (#14612) <!--- Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation. --> # Description <!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. --> Fixes https://github.com/pulumi/pulumi/issues/14611. `atomic.Value` panics if a second value is written to it which isn't _exactly_ the same type. We were using this to atomically record any errors that happened, although where only interested in saving the first error. This was fine if no or one error happened, or if multiple errors happened that all were the same error type. But if two errors if different error types happened `sync.Value` would panic. Switched to use a promise completion source instead. If any error happens we use it to reject the completion source, we also log if this isn't the first error we've seen (a capabilitiy that `sync.Value` didn't give us). At the end of the step executor being used when `Errored()` is called we try to get the result of the completion source. ## Checklist - [x] I have run `make tidy` to update any new dependencies - [x] I have run `make lint` to verify my code passes the lint check - [ ] I have formatted my code using `gofumpt` <!--- Please provide details if the checkbox below is to be left unchecked. --> - [x] I have added tests that prove my fix is effective or that my feature works - Added tests for the new promise function `TryResult`, not sure it's that needed to try and write a test for the error case we hit in #14611 now that it's type safe. <!--- User-facing changes require a CHANGELOG entry. --> - [x] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change <!-- If the change(s) in this PR is a modification of an existing call to the Pulumi Cloud, then the service should honor older versions of the CLI where this change would not exist. You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add it to the service. --> - [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Cloud API version <!-- @Pulumi employees: If yes, you must submit corresponding changes in the service repo. -->
2023-11-18 19:03:35 +00:00
// TryResult returns the result and true if the promise has been resolved, otherwise it returns false.
//
//nolint:revive // This error _isn't_ an error from the function, so ignore the "error should be last" rule.
func (p *Promise[T]) TryResult() (T, error, bool) {
// We don't need to lock here because we're just reading the status and the result and err are immutable
// once set.
status := p.status.Load()
if status == statusUninitialized {
panic("Promise must be initialized")
}
if status == statusPending {
var t T
return t, nil, false
}
// If the status is not pending then the promise is resolved and we can return the result and err. There
// is no race between status being set to fulfilled or rejected and result and err being changed.
return p.result, p.err, true
}
Add a strict opinionated promise library (#14552) <!--- Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation. --> # Description <!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. --> Prompted by having another case in https://github.com/pulumi/pulumi/pull/14548 that looked like "this is just a promise". We already had a mini-promise library for provider config in sdk/go/common/resource/plugin/provider_plugin.go. Plus a couple of places where we did a poor mans Go promise by having a WaitGroup plus a variable to set (see the two changes in /pkg). This adds a little promise library to safely cover all three of these cases plus the case I'll be adding in #14548. It is _minimal_ adding just the features of promises needed for these initial cases. We can add to it as needed. I suspect we've got a number of places in test code that could probably use this as well, but I haven't gone through that. I also suspect that having this type available will result in more places in the future being simpler because "its just a promise" is a fairly common scenario in async systems. In fact Output's internally _are just a promise_ so we could probably rewrite their internals using this, add a `Then()` like method for apply and all the async complexity gets handled in the promise layer while the output layer just cares about unknowns/secrets/etc. ## Checklist - [x] I have run `make tidy` to update any new dependencies - [x] I have run `make lint` to verify my code passes the lint check - [ ] I have formatted my code using `gofumpt` <!--- Please provide details if the checkbox below is to be left unchecked. --> - [x] I have added tests that prove my fix is effective or that my feature works <!--- User-facing changes require a CHANGELOG entry. --> - [ ] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change - This is in sdk/go/common and I'm considering it an non-public api for now. <!-- If the change(s) in this PR is a modification of an existing call to the Pulumi Cloud, then the service should honor older versions of the CLI where this change would not exist. You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add it to the service. --> - [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Cloud API version <!-- @Pulumi employees: If yes, you must submit corresponding changes in the service repo. -->
2023-11-15 14:53:12 +00:00
// CompletionSource is a source for a promise that can be resolved or rejected. It is safe to call Resolve or
// Reject multiple times concurrently, the first will apply and all others will return that they couldn't set the
// promise.
type CompletionSource[T any] struct {
init sync.Once
promise *Promise[T]
}
func (ps *CompletionSource[T]) Promise() *Promise[T] {
ps.init.Do(func() {
p := &Promise[T]{}
p.status.Store(statusPending)
p.done = make(chan struct{})
ps.promise = p
})
return ps.promise
}
func (ps *CompletionSource[T]) Fulfill(value T) bool {
promise := ps.Promise()
promise.mutex.Lock()
defer promise.mutex.Unlock()
contract.Assertf(promise.status.Load() != statusUninitialized, "Promise must be initialized")
if promise.status.Load() != statusPending {
return false
}
promise.result = value
promise.status.Store(statusFulfilled)
close(promise.done)
return true
}
func (ps *CompletionSource[T]) MustFulfill(value T) {
if !ps.Fulfill(value) {
panic("CompletionSource already resolved")
}
}
func (ps *CompletionSource[T]) Reject(err error) bool {
contract.Requiref(err != nil, "err", "err must not be nil")
promise := ps.Promise()
promise.mutex.Lock()
defer promise.mutex.Unlock()
contract.Assertf(promise.status.Load() != statusUninitialized, "Promise must be initialized")
if promise.status.Load() != statusPending {
return false
}
promise.err = err
promise.status.Store(statusRejected)
close(promise.done)
return true
}
func (ps *CompletionSource[T]) MustReject(err error) {
if !ps.Reject(err) {
panic("CompletionSource already resolved")
}
}
// Run runs the given function in a goroutine and returns a promise that will be resolved with the result of the
// function.
func Run[T any](f func() (T, error)) *Promise[T] {
ps := &CompletionSource[T]{}
go func() {
value, err := f()
if err != nil {
ps.Reject(err)
} else {
ps.Fulfill(value)
}
}()
return ps.Promise()
}