mirror of https://github.com/pulumi/pulumi.git
159 lines
4.4 KiB
Go
159 lines
4.4 KiB
Go
// Copyright 2016-2023, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package promise
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
)
|
|
|
|
const (
|
|
statusUninitialized int32 = iota
|
|
statusPending
|
|
statusFulfilled
|
|
statusRejected
|
|
)
|
|
|
|
// Promise is a promise that can be resolved with a value of type T or rejected with an error. It is safe to call Result
|
|
// on it multiple times from multiple goroutines. This is much more permissive than channels.
|
|
type Promise[T any] struct {
|
|
done chan struct{}
|
|
mutex sync.Mutex
|
|
status atomic.Int32
|
|
result T
|
|
err error
|
|
}
|
|
|
|
// Result waits for the promise to be resolved and returns the result.
|
|
func (p *Promise[T]) Result(ctx context.Context) (T, error) {
|
|
if p.status.Load() == statusUninitialized {
|
|
panic("Promise must be initialized")
|
|
}
|
|
|
|
// Wait for either the promise or context to be done, if the context is done just exit with it's error
|
|
select {
|
|
case <-p.done:
|
|
break
|
|
case <-ctx.Done():
|
|
var t T
|
|
return t, ctx.Err()
|
|
}
|
|
|
|
contract.Assertf(p.status.Load() != statusPending, "Promise must be resolved")
|
|
// Only one of result or err will be set, the other will be the zero value so we can just return both.
|
|
return p.result, p.err
|
|
}
|
|
|
|
// TryResult returns the result and true if the promise has been resolved, otherwise it returns false.
|
|
//
|
|
//nolint:revive // This error _isn't_ an error from the function, so ignore the "error should be last" rule.
|
|
func (p *Promise[T]) TryResult() (T, error, bool) {
|
|
// We don't need to lock here because we're just reading the status and the result and err are immutable
|
|
// once set.
|
|
status := p.status.Load()
|
|
|
|
if status == statusUninitialized {
|
|
panic("Promise must be initialized")
|
|
}
|
|
|
|
if status == statusPending {
|
|
var t T
|
|
return t, nil, false
|
|
}
|
|
// If the status is not pending then the promise is resolved and we can return the result and err. There
|
|
// is no race between status being set to fulfilled or rejected and result and err being changed.
|
|
return p.result, p.err, true
|
|
}
|
|
|
|
// CompletionSource is a source for a promise that can be resolved or rejected. It is safe to call Resolve or
|
|
// Reject multiple times concurrently, the first will apply and all others will return that they couldn't set the
|
|
// promise.
|
|
type CompletionSource[T any] struct {
|
|
init sync.Once
|
|
promise *Promise[T]
|
|
}
|
|
|
|
func (ps *CompletionSource[T]) Promise() *Promise[T] {
|
|
ps.init.Do(func() {
|
|
p := &Promise[T]{}
|
|
p.status.Store(statusPending)
|
|
p.done = make(chan struct{})
|
|
ps.promise = p
|
|
})
|
|
return ps.promise
|
|
}
|
|
|
|
func (ps *CompletionSource[T]) Fulfill(value T) bool {
|
|
promise := ps.Promise()
|
|
promise.mutex.Lock()
|
|
defer promise.mutex.Unlock()
|
|
|
|
contract.Assertf(promise.status.Load() != statusUninitialized, "Promise must be initialized")
|
|
if promise.status.Load() != statusPending {
|
|
return false
|
|
}
|
|
promise.result = value
|
|
promise.status.Store(statusFulfilled)
|
|
close(promise.done)
|
|
return true
|
|
}
|
|
|
|
func (ps *CompletionSource[T]) MustFulfill(value T) {
|
|
if !ps.Fulfill(value) {
|
|
panic("CompletionSource already resolved")
|
|
}
|
|
}
|
|
|
|
func (ps *CompletionSource[T]) Reject(err error) bool {
|
|
contract.Requiref(err != nil, "err", "err must not be nil")
|
|
|
|
promise := ps.Promise()
|
|
promise.mutex.Lock()
|
|
defer promise.mutex.Unlock()
|
|
|
|
contract.Assertf(promise.status.Load() != statusUninitialized, "Promise must be initialized")
|
|
if promise.status.Load() != statusPending {
|
|
return false
|
|
}
|
|
promise.err = err
|
|
promise.status.Store(statusRejected)
|
|
close(promise.done)
|
|
return true
|
|
}
|
|
|
|
func (ps *CompletionSource[T]) MustReject(err error) {
|
|
if !ps.Reject(err) {
|
|
panic("CompletionSource already resolved")
|
|
}
|
|
}
|
|
|
|
// Run runs the given function in a goroutine and returns a promise that will be resolved with the result of the
|
|
// function.
|
|
func Run[T any](f func() (T, error)) *Promise[T] {
|
|
ps := &CompletionSource[T]{}
|
|
go func() {
|
|
value, err := f()
|
|
if err != nil {
|
|
ps.Reject(err)
|
|
} else {
|
|
ps.Fulfill(value)
|
|
}
|
|
}()
|
|
return ps.Promise()
|
|
}
|