pulumi/sdk/go/common/promise/promise.go

138 lines
3.6 KiB
Go

// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promise
import (
"context"
"sync"
"sync/atomic"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
)
const (
statusUninitialized int32 = iota
statusPending
statusFulfilled
statusRejected
)
// Promise is a promise that can be resolved with a value of type T or rejected with an error. It is safe to call Result
// on it multiple times from multiple goroutines. This is much more permissive than channels.
type Promise[T any] struct {
done chan struct{}
mutex sync.Mutex
status atomic.Int32
result T
err error
}
// Result waits for the promise to be resolved and returns the result.
func (p *Promise[T]) Result(ctx context.Context) (T, error) {
if p.status.Load() == statusUninitialized {
panic("Promise must be initialized")
}
// Wait for either the promise or context to be done, if the context is done just exit with it's error
select {
case <-p.done:
break
case <-ctx.Done():
var t T
return t, ctx.Err()
}
contract.Assertf(p.status.Load() != statusPending, "Promise must be resolved")
// Only one of result or err will be set, the other will be the zero value so we can just return both.
return p.result, p.err
}
// CompletionSource is a source for a promise that can be resolved or rejected. It is safe to call Resolve or
// Reject multiple times concurrently, the first will apply and all others will return that they couldn't set the
// promise.
type CompletionSource[T any] struct {
init sync.Once
promise *Promise[T]
}
func (ps *CompletionSource[T]) Promise() *Promise[T] {
ps.init.Do(func() {
p := &Promise[T]{}
p.status.Store(statusPending)
p.done = make(chan struct{})
ps.promise = p
})
return ps.promise
}
func (ps *CompletionSource[T]) Fulfill(value T) bool {
promise := ps.Promise()
promise.mutex.Lock()
defer promise.mutex.Unlock()
contract.Assertf(promise.status.Load() != statusUninitialized, "Promise must be initialized")
if promise.status.Load() != statusPending {
return false
}
promise.result = value
promise.status.Store(statusFulfilled)
close(promise.done)
return true
}
func (ps *CompletionSource[T]) MustFulfill(value T) {
if !ps.Fulfill(value) {
panic("CompletionSource already resolved")
}
}
func (ps *CompletionSource[T]) Reject(err error) bool {
contract.Requiref(err != nil, "err", "err must not be nil")
promise := ps.Promise()
promise.mutex.Lock()
defer promise.mutex.Unlock()
contract.Assertf(promise.status.Load() != statusUninitialized, "Promise must be initialized")
if promise.status.Load() != statusPending {
return false
}
promise.err = err
promise.status.Store(statusRejected)
close(promise.done)
return true
}
func (ps *CompletionSource[T]) MustReject(err error) {
if !ps.Reject(err) {
panic("CompletionSource already resolved")
}
}
// Run runs the given function in a goroutine and returns a promise that will be resolved with the result of the
// function.
func Run[T any](f func() (T, error)) *Promise[T] {
ps := &CompletionSource[T]{}
go func() {
value, err := f()
if err != nil {
ps.Reject(err)
} else {
ps.Fulfill(value)
}
}()
return ps.Promise()
}