mirror of https://github.com/pulumi/pulumi.git
2395 lines
75 KiB
Go
2395 lines
75 KiB
Go
// Copyright 2016-2021, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//go:generate go run generate/main.go
|
|
|
|
package pulumi
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"reflect"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/slice"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/internal"
|
|
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/structpb"
|
|
)
|
|
|
|
var disableResourceReferences = cmdutil.IsTruthy(os.Getenv("PULUMI_DISABLE_RESOURCE_REFERENCES"))
|
|
|
|
type workGroup = internal.WorkGroup
|
|
|
|
type contextState struct {
|
|
info RunInfo
|
|
stack Resource
|
|
exports map[string]Input
|
|
monitor pulumirpc.ResourceMonitorClient
|
|
monitorConn *grpc.ClientConn
|
|
engine pulumirpc.EngineClient
|
|
engineConn *grpc.ClientConn
|
|
callbacksLock sync.Mutex
|
|
callbacks *callbackServer
|
|
|
|
keepResources bool // true if resources should be marshaled as strongly-typed references.
|
|
keepOutputValues bool // true if outputs should be marshaled as strongly-type output values.
|
|
supportsDeletedWith bool // true if deletedWith supported by pulumi
|
|
supportsAliasSpecs bool // true if full alias specification is supported by pulumi
|
|
supportsTransforms bool // true if remote transforms are supported by pulumi
|
|
supportsInvokeTransforms bool // true if remote invoke transforms are supported by pulumi
|
|
rpcs int // the number of outstanding RPC requests.
|
|
rpcsDone *sync.Cond // an event signaling completion of RPCs.
|
|
rpcsLock sync.Mutex // a lock protecting the RPC count and event.
|
|
rpcError error // the first error (if any) encountered during an RPC.
|
|
|
|
join workGroup // the waitgroup for non-RPC async work associated with this context
|
|
}
|
|
|
|
// Context handles registration of resources and exposes metadata about the current deployment context.
|
|
type Context struct {
|
|
state *contextState
|
|
ctx context.Context
|
|
Log Log // the logging interface for the Pulumi log stream.
|
|
}
|
|
|
|
// NewContext creates a fresh run context out of the given metadata.
|
|
func NewContext(ctx context.Context, info RunInfo) (*Context, error) {
|
|
// Connect to the gRPC endpoints if we have addresses for them.
|
|
var monitorConn *grpc.ClientConn
|
|
var monitor pulumirpc.ResourceMonitorClient
|
|
if addr := info.MonitorAddr; addr != "" {
|
|
conn, err := grpc.Dial(
|
|
info.MonitorAddr,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
rpcutil.GrpcChannelOptions(),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to resource monitor over RPC: %w", err)
|
|
}
|
|
monitorConn = conn
|
|
monitor = pulumirpc.NewResourceMonitorClient(monitorConn)
|
|
}
|
|
|
|
var engineConn *grpc.ClientConn
|
|
var engine pulumirpc.EngineClient
|
|
if info.engineConn != nil {
|
|
engineConn = info.engineConn
|
|
engine = pulumirpc.NewEngineClient(engineConn)
|
|
} else if addr := info.EngineAddr; addr != "" {
|
|
conn, err := grpc.Dial(
|
|
info.EngineAddr,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
rpcutil.GrpcChannelOptions(),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to engine over RPC: %w", err)
|
|
}
|
|
engineConn = conn
|
|
engine = pulumirpc.NewEngineClient(engineConn)
|
|
}
|
|
|
|
if info.Mocks != nil {
|
|
monitor = &mockMonitor{project: info.Project, stack: info.Stack, mocks: info.Mocks}
|
|
engine = &mockEngine{}
|
|
}
|
|
|
|
if wrap := info.wrapResourceMonitorClient; wrap != nil {
|
|
monitor = wrap(monitor)
|
|
}
|
|
|
|
supportsFeature := func(id string) (bool, error) {
|
|
if monitor != nil {
|
|
resp, err := monitor.SupportsFeature(ctx, &pulumirpc.SupportsFeatureRequest{Id: id})
|
|
if err != nil {
|
|
return false, fmt.Errorf("checking monitor features: %w", err)
|
|
}
|
|
return resp.GetHasSupport(), nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
keepResources, err := supportsFeature("resourceReferences")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
keepOutputValues, err := supportsFeature("outputValues")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
supportsDeletedWith, err := supportsFeature("deletedWith")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
supportsAliasSpecs, err := supportsFeature("aliasSpecs")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
supportsTransforms, err := supportsFeature("transforms")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
supportsInvokeTransforms, err := supportsFeature("invokeTransforms")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
contextState := &contextState{
|
|
info: info,
|
|
exports: make(map[string]Input),
|
|
monitorConn: monitorConn,
|
|
monitor: monitor,
|
|
engineConn: engineConn,
|
|
engine: engine,
|
|
keepResources: keepResources,
|
|
keepOutputValues: keepOutputValues,
|
|
supportsDeletedWith: supportsDeletedWith,
|
|
supportsAliasSpecs: supportsAliasSpecs,
|
|
supportsTransforms: supportsTransforms,
|
|
supportsInvokeTransforms: supportsInvokeTransforms,
|
|
}
|
|
contextState.rpcsDone = sync.NewCond(&contextState.rpcsLock)
|
|
context := &Context{
|
|
state: contextState,
|
|
ctx: ctx,
|
|
}
|
|
context.Log = &logState{
|
|
engine: engine,
|
|
ctx: ctx,
|
|
join: &contextState.join,
|
|
}
|
|
|
|
return context, nil
|
|
}
|
|
|
|
// Context returns the base context used to instantiate the current context.
|
|
func (ctx *Context) Context() context.Context {
|
|
return ctx.ctx
|
|
}
|
|
|
|
// WithValue returns a copy of base context in which the value associated with key is val.
|
|
func (ctx *Context) WithValue(key, val any) *Context {
|
|
newCtx := &Context{
|
|
ctx: ctx.ctx,
|
|
state: ctx.state,
|
|
Log: ctx.Log,
|
|
}
|
|
newCtx.ctx = context.WithValue(newCtx.ctx, key, val)
|
|
return newCtx
|
|
}
|
|
|
|
// Value returns the value associated with key from base context
|
|
func (ctx *Context) Value(key any) any {
|
|
return ctx.ctx.Value(key)
|
|
}
|
|
|
|
// Close implements io.Closer and relinquishes any outstanding resources held by the context.
|
|
func (ctx *Context) Close() error {
|
|
if ctx.state.engineConn != nil {
|
|
if err := ctx.state.engineConn.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if ctx.state.monitorConn != nil {
|
|
if err := ctx.state.monitorConn.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// wait waits for all asynchronous work associated with this context to drain. RPCs may not be queued once wait
|
|
// returns.
|
|
func (ctx *Context) wait() error {
|
|
// Wait for async work to flush.
|
|
ctx.state.join.Wait()
|
|
|
|
// Ensure all outstanding RPCs have completed before proceeding. Also, prevent any new RPCs from happening.
|
|
ctx.state.rpcsLock.Lock()
|
|
defer ctx.state.rpcsLock.Unlock()
|
|
|
|
// Wait until the RPC count hits zero.
|
|
for ctx.state.rpcs > 0 {
|
|
ctx.state.rpcsDone.Wait()
|
|
}
|
|
|
|
// Mark the RPCs flag so that no more RPCs are permitted.
|
|
ctx.state.rpcs = noMoreRPCs
|
|
|
|
if ctx.state.rpcError != nil {
|
|
return fmt.Errorf("waiting for RPCs: %w", ctx.state.rpcError)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Organization returns the current organization name.
|
|
func (ctx *Context) Organization() string {
|
|
org := ctx.state.info.Organization
|
|
if org == "" {
|
|
org = "organization"
|
|
}
|
|
return org
|
|
}
|
|
|
|
// Project returns the current project name.
|
|
func (ctx *Context) Project() string { return ctx.state.info.Project }
|
|
|
|
// Stack returns the current stack name being deployed into.
|
|
func (ctx *Context) Stack() string { return ctx.state.info.Stack }
|
|
|
|
// Parallel returns the degree of parallelism currently being used by the engine (1 being entirely serial).
|
|
func (ctx *Context) Parallel() int32 { return ctx.state.info.Parallel }
|
|
|
|
// DryRun is true when evaluating a program for purposes of planning, instead of performing a true deployment.
|
|
func (ctx *Context) DryRun() bool { return ctx.state.info.DryRun }
|
|
|
|
// RunningWithMocks is true if the program is running using a Mock monitor instead of a real Pulumi engine.
|
|
func (ctx *Context) RunningWithMocks() bool {
|
|
_, isMockMonitor := ctx.state.monitor.(*mockMonitor)
|
|
return isMockMonitor
|
|
}
|
|
|
|
// GetConfig returns the config value, as a string, and a bool indicating whether it exists or not.
|
|
func (ctx *Context) GetConfig(key string) (string, bool) {
|
|
v, ok := ctx.state.info.Config[key]
|
|
return v, ok
|
|
}
|
|
|
|
// IsConfigSecret returns true if the config value is a secret.
|
|
func (ctx *Context) IsConfigSecret(key string) bool {
|
|
for _, secretKey := range ctx.state.info.ConfigSecretKeys {
|
|
if key == secretKey {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// registerTransform starts up a callback server if not already running and registers the given transform.
|
|
func (ctx *Context) registerTransform(t ResourceTransform) (*pulumirpc.Callback, error) {
|
|
if !ctx.state.supportsTransforms {
|
|
return nil, errors.New("the Pulumi CLI does not support transforms. Please update the Pulumi CLI")
|
|
}
|
|
|
|
// Wrap the transform in a callback function.
|
|
callback := func(innerCtx context.Context, req []byte) (proto.Message, error) {
|
|
var rpcReq pulumirpc.TransformRequest
|
|
if err := proto.Unmarshal(req, &rpcReq); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling request: %w", err)
|
|
}
|
|
|
|
// Unmarshal the resource inputs.
|
|
properties, err := plugin.UnmarshalProperties(rpcReq.Properties, plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
KeepOutputValues: true,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unmarshaling transform protobuf properties: %w", err)
|
|
}
|
|
|
|
props, err := unmarshalPropertyMap(ctx, properties)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unmarshaling transform properties: %w", err)
|
|
}
|
|
|
|
// Unmarshal the resource options.
|
|
var parent Resource
|
|
if rpcReq.Parent != "" {
|
|
parent = ctx.newDependencyResource(URN(rpcReq.Parent))
|
|
}
|
|
|
|
var opts ResourceOptions
|
|
if rpcReq.Options != nil {
|
|
opts.AdditionalSecretOutputs = rpcReq.Options.AdditionalSecretOutputs
|
|
opts.Aliases = slice.Map(rpcReq.Options.Aliases, func(a *pulumirpc.Alias) Alias {
|
|
switch v := a.GetAlias().(type) {
|
|
case *pulumirpc.Alias_Urn:
|
|
return Alias{URN: URN(v.Urn)}
|
|
case *pulumirpc.Alias_Spec_:
|
|
alias := Alias{
|
|
Name: String(v.Spec.Name),
|
|
Type: String(v.Spec.Type),
|
|
Stack: String(v.Spec.Stack),
|
|
Project: String(v.Spec.Project),
|
|
}
|
|
|
|
switch p := v.Spec.Parent.(type) {
|
|
case *pulumirpc.Alias_Spec_NoParent:
|
|
alias.NoParent = Bool(p.NoParent)
|
|
case *pulumirpc.Alias_Spec_ParentUrn:
|
|
alias.ParentURN = URN(p.ParentUrn)
|
|
}
|
|
return alias
|
|
}
|
|
return Alias{}
|
|
})
|
|
if rpcReq.Options.CustomTimeouts != nil {
|
|
opts.CustomTimeouts = &CustomTimeouts{
|
|
Create: rpcReq.Options.CustomTimeouts.Create,
|
|
Update: rpcReq.Options.CustomTimeouts.Update,
|
|
Delete: rpcReq.Options.CustomTimeouts.Delete,
|
|
}
|
|
}
|
|
if rpcReq.Options.DeleteBeforeReplace != nil {
|
|
opts.DeleteBeforeReplace = *rpcReq.Options.DeleteBeforeReplace
|
|
}
|
|
opts.DependsOn = slice.Map(rpcReq.Options.DependsOn, func(d string) Resource {
|
|
return ctx.newDependencyResource(URN(d))
|
|
})
|
|
opts.IgnoreChanges = rpcReq.Options.IgnoreChanges
|
|
opts.Parent = parent
|
|
opts.PluginDownloadURL = rpcReq.Options.PluginDownloadUrl
|
|
opts.Protect = rpcReq.Options.Protect
|
|
if rpcReq.Options.Provider != "" {
|
|
opts.Provider = ctx.newDependencyProviderResourceFromRef(rpcReq.Options.Provider)
|
|
}
|
|
if rpcReq.Options.Providers != nil {
|
|
opts.Providers = make([]ProviderResource, 0, len(rpcReq.Options.Providers))
|
|
for _, p := range rpcReq.Options.Providers {
|
|
opts.Providers = append(opts.Providers, ctx.newDependencyProviderResourceFromRef(p))
|
|
}
|
|
}
|
|
opts.ReplaceOnChanges = rpcReq.Options.ReplaceOnChanges
|
|
opts.RetainOnDelete = rpcReq.Options.RetainOnDelete
|
|
opts.Version = rpcReq.Options.Version
|
|
}
|
|
|
|
args := &ResourceTransformArgs{
|
|
Custom: rpcReq.Custom,
|
|
Type: rpcReq.Type,
|
|
Name: rpcReq.Name,
|
|
Props: props,
|
|
Opts: opts,
|
|
}
|
|
|
|
res := t(innerCtx, args)
|
|
rpcRes := &pulumirpc.TransformResponse{
|
|
Properties: nil,
|
|
Options: nil,
|
|
}
|
|
|
|
if res != nil {
|
|
opts := res.Opts
|
|
|
|
umProperties := res.Props
|
|
if umProperties == nil {
|
|
umProperties = Map{}
|
|
}
|
|
mProperties, _, err := marshalInput(umProperties, anyType, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
properties = resource.PropertyMap{}
|
|
if mProperties.IsObject() {
|
|
properties = mProperties.ObjectValue()
|
|
}
|
|
|
|
rpcRes.Properties, err = plugin.MarshalProperties(
|
|
properties,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
// Marshal the resource options
|
|
alias, err := ctx.mapAliases(opts.Aliases, rpcReq.Type, rpcReq.Name, parent)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling aliases: %w", err)
|
|
}
|
|
|
|
// It's an error to try and change the parent and the engine doesn't even let you send it back so
|
|
// sanity check that here.
|
|
if opts.Parent != parent {
|
|
return nil, errors.New("cannot change parent in transform")
|
|
}
|
|
|
|
rpcRes.Options = &pulumirpc.TransformResourceOptions{
|
|
AdditionalSecretOutputs: opts.AdditionalSecretOutputs,
|
|
Aliases: alias,
|
|
}
|
|
|
|
if opts.CustomTimeouts != nil {
|
|
rpcRes.Options.CustomTimeouts = &pulumirpc.RegisterResourceRequest_CustomTimeouts{
|
|
Create: opts.CustomTimeouts.Create,
|
|
Update: opts.CustomTimeouts.Update,
|
|
Delete: opts.CustomTimeouts.Delete,
|
|
}
|
|
}
|
|
|
|
if opts.DeleteBeforeReplace {
|
|
v := true
|
|
rpcRes.Options.DeleteBeforeReplace = &v
|
|
}
|
|
|
|
marshalToUrn := func(resource Resource) (string, error) {
|
|
urn, _, _, err := resource.URN().awaitURN(ctx.ctx)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(urn), nil
|
|
}
|
|
|
|
rpcRes.Options.DependsOn, err = slice.MapError(opts.DependsOn, marshalToUrn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling dependsOn: %w", err)
|
|
}
|
|
rpcRes.Options.IgnoreChanges = opts.IgnoreChanges
|
|
rpcRes.Options.PluginDownloadUrl = opts.PluginDownloadURL
|
|
rpcRes.Options.Protect = opts.Protect
|
|
|
|
if opts.Provider != nil {
|
|
rpcRes.Options.Provider, err = ctx.resolveProviderReference(opts.Provider)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling provider: %w", err)
|
|
}
|
|
}
|
|
|
|
if opts.Providers != nil {
|
|
rpcRes.Options.Providers = make(map[string]string)
|
|
for _, p := range opts.Providers {
|
|
ref, err := ctx.resolveProviderReference(p)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling providers: %w", err)
|
|
}
|
|
|
|
rpcRes.Options.Providers[p.getPackage()] = ref
|
|
}
|
|
}
|
|
rpcRes.Options.ReplaceOnChanges = opts.ReplaceOnChanges
|
|
rpcRes.Options.RetainOnDelete = opts.RetainOnDelete
|
|
rpcRes.Options.Version = opts.Version
|
|
}
|
|
|
|
return rpcRes, nil
|
|
}
|
|
|
|
err := func() error {
|
|
ctx.state.callbacksLock.Lock()
|
|
defer ctx.state.callbacksLock.Unlock()
|
|
if ctx.state.callbacks == nil {
|
|
c, err := newCallbackServer()
|
|
if err != nil {
|
|
return fmt.Errorf("creating callback server: %w", err)
|
|
}
|
|
ctx.state.callbacks = c
|
|
}
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cb, err := ctx.state.callbacks.RegisterCallback(callback)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("registering callback: %w", err)
|
|
}
|
|
|
|
return cb, nil
|
|
}
|
|
|
|
// registerTransform starts up a callback server if not already running and registers the given transform.
|
|
func (ctx *Context) registerInvokeTransform(t InvokeTransform) (*pulumirpc.Callback, error) {
|
|
if !ctx.state.supportsInvokeTransforms {
|
|
return nil, errors.New("the Pulumi CLI does not support invoke transforms. Please update the Pulumi CLI")
|
|
}
|
|
|
|
// Wrap the transform in a callback function.
|
|
callback := func(innerCtx context.Context, req []byte) (proto.Message, error) {
|
|
var rpcReq pulumirpc.TransformInvokeRequest
|
|
if err := proto.Unmarshal(req, &rpcReq); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling request: %w", err)
|
|
}
|
|
|
|
// Unmarshal the resource inputs.
|
|
args, err := plugin.UnmarshalProperties(rpcReq.Args, plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
KeepOutputValues: true,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unmarshaling transform protobuf properties: %w", err)
|
|
}
|
|
|
|
unmarshalledArgs, err := unmarshalPropertyMap(ctx, args)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unmarshaling transform properties: %w", err)
|
|
}
|
|
|
|
var opts InvokeOptions
|
|
if rpcReq.Options != nil {
|
|
if rpcReq.Options.Provider != "" {
|
|
opts.Provider = ctx.newDependencyProviderResourceFromRef(rpcReq.Options.Provider)
|
|
}
|
|
opts.Version = rpcReq.Options.Version
|
|
opts.PluginDownloadURL = rpcReq.Options.PluginDownloadUrl
|
|
}
|
|
|
|
transformArgs := &InvokeTransformArgs{
|
|
Token: rpcReq.Token,
|
|
Args: unmarshalledArgs,
|
|
Opts: opts,
|
|
}
|
|
|
|
res := t(innerCtx, transformArgs)
|
|
rpcRes := &pulumirpc.TransformInvokeResponse{
|
|
Args: nil,
|
|
Options: nil,
|
|
}
|
|
|
|
if res != nil {
|
|
opts := res.Opts
|
|
|
|
umArgs := res.Args
|
|
if umArgs == nil {
|
|
umArgs = Map{}
|
|
}
|
|
mArgs, _, err := marshalInput(umArgs, anyType, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
args = resource.PropertyMap{}
|
|
if mArgs.IsObject() {
|
|
args = mArgs.ObjectValue()
|
|
}
|
|
|
|
rpcRes.Args, err = plugin.MarshalProperties(
|
|
args,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
rpcRes.Options = &pulumirpc.TransformInvokeOptions{}
|
|
rpcRes.Options.PluginDownloadUrl = opts.PluginDownloadURL
|
|
|
|
if opts.Provider != nil {
|
|
rpcRes.Options.Provider, err = ctx.resolveProviderReference(opts.Provider)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling provider: %w", err)
|
|
}
|
|
}
|
|
rpcRes.Options.Version = opts.Version
|
|
}
|
|
|
|
return rpcRes, nil
|
|
}
|
|
|
|
err := func() error {
|
|
ctx.state.callbacksLock.Lock()
|
|
defer ctx.state.callbacksLock.Unlock()
|
|
if ctx.state.callbacks == nil {
|
|
c, err := newCallbackServer()
|
|
if err != nil {
|
|
return fmt.Errorf("creating callback server: %w", err)
|
|
}
|
|
ctx.state.callbacks = c
|
|
}
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cb, err := ctx.state.callbacks.RegisterCallback(callback)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("registering callback: %w", err)
|
|
}
|
|
|
|
return cb, nil
|
|
}
|
|
|
|
// Invoke will invoke a provider's function, identified by its token tok. This function call is synchronous.
|
|
//
|
|
// args and result must be pointers to struct values fields and appropriately tagged and typed for use with Pulumi.
|
|
func (ctx *Context) Invoke(tok string, args interface{}, result interface{}, opts ...InvokeOption) (err error) {
|
|
return ctx.InvokePackage(tok, args, result, "" /* packageRef */, opts...)
|
|
}
|
|
|
|
// InvokePackage will invoke a provider's function, identified by its token tok. This function call is synchronous.
|
|
//
|
|
// args and result must be pointers to struct values fields and appropriately tagged and typed for use with Pulumi.
|
|
func (ctx *Context) InvokePackage(
|
|
tok string, args interface{}, result interface{}, packageRef string, opts ...InvokeOption,
|
|
) (err error) {
|
|
if tok == "" {
|
|
return errors.New("invoke token must not be empty")
|
|
}
|
|
|
|
resultV := reflect.ValueOf(result)
|
|
if !(resultV.Kind() == reflect.Ptr &&
|
|
(resultV.Elem().Kind() == reflect.Struct ||
|
|
(resultV.Elem().Kind() == reflect.Map && resultV.Elem().Type().Key().Kind() == reflect.String))) {
|
|
return errors.New("result must be a pointer to a struct or map value")
|
|
}
|
|
|
|
options, err := NewInvokeOptions(opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Note that we're about to make an outstanding RPC request, so that we can rendezvous during shutdown.
|
|
if err = ctx.beginRPC(); err != nil {
|
|
return err
|
|
}
|
|
defer ctx.endRPC(err)
|
|
|
|
var providerRef string
|
|
providers, err := ctx.mergeProviders(tok, options.Parent, options.Provider, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if provider := providers[getPackage(tok)]; provider != nil {
|
|
pr, err := ctx.resolveProviderReference(provider)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
providerRef = pr
|
|
}
|
|
|
|
// Serialize arguments. Outputs will not be awaited: instead, an error will be returned if any Outputs are present.
|
|
if args == nil {
|
|
args = struct{}{}
|
|
}
|
|
resolvedArgs, _, err := marshalInput(args, anyType, false)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling arguments: %w", err)
|
|
}
|
|
|
|
resolvedArgsMap := resource.PropertyMap{}
|
|
if resolvedArgs.IsObject() {
|
|
resolvedArgsMap = resolvedArgs.ObjectValue()
|
|
}
|
|
|
|
rpcArgs, err := plugin.MarshalProperties(
|
|
resolvedArgsMap,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling arguments: %w", err)
|
|
}
|
|
|
|
// Now, invoke the RPC to the provider synchronously.
|
|
logging.V(9).Infof("Invoke(%s, #args=%d): RPC call being made synchronously", tok, len(resolvedArgsMap))
|
|
resp, err := ctx.state.monitor.Invoke(ctx.ctx, &pulumirpc.ResourceInvokeRequest{
|
|
Tok: tok,
|
|
Args: rpcArgs,
|
|
Provider: providerRef,
|
|
Version: options.Version,
|
|
PluginDownloadURL: options.PluginDownloadURL,
|
|
AcceptResources: !disableResourceReferences,
|
|
PackageRef: packageRef,
|
|
})
|
|
if err != nil {
|
|
logging.V(9).Infof("Invoke(%s, ...): error: %v", tok, err)
|
|
return err
|
|
}
|
|
|
|
// If there were any failures from the provider, return them.
|
|
if len(resp.Failures) > 0 {
|
|
logging.V(9).Infof("Invoke(%s, ...): success: w/ %d failures", tok, len(resp.Failures))
|
|
var ferr error
|
|
for _, failure := range resp.Failures {
|
|
ferr = multierror.Append(ferr,
|
|
fmt.Errorf("%s invoke failed: %s (%s)", tok, failure.Reason, failure.Property))
|
|
}
|
|
return ferr
|
|
}
|
|
|
|
// Otherwise, simply unmarshal the output properties and return the result.
|
|
outProps, err := plugin.UnmarshalProperties(
|
|
resp.Return,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// fail if there are secrets returned from the invoke
|
|
hasSecret, err := unmarshalOutput(ctx, resource.NewObjectProperty(outProps), resultV.Elem())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if hasSecret {
|
|
return errors.New("unexpected secret result returned to invoke call")
|
|
}
|
|
logging.V(9).Infof("Invoke(%s, ...): success: w/ %d outs (err=%v)", tok, len(outProps), err)
|
|
return nil
|
|
}
|
|
|
|
// Call will invoke a provider call function, identified by its token tok.
|
|
//
|
|
// output is used to determine the output type to return; self is optional for methods.
|
|
func (ctx *Context) Call(tok string, args Input, output Output, self Resource, opts ...InvokeOption) (Output, error) {
|
|
return ctx.CallPackage(tok, args, output, self, "" /* packageRef */, opts...)
|
|
}
|
|
|
|
// CallPackage will invoke a provider call function, identified by its token tok.
|
|
//
|
|
// output is used to determine the output type to return; self is optional for methods.
|
|
func (ctx *Context) CallPackage(
|
|
tok string, args Input, output Output, self Resource, packageRef string, opts ...InvokeOption,
|
|
) (Output, error) {
|
|
if tok == "" {
|
|
return nil, errors.New("call token must not be empty")
|
|
}
|
|
|
|
output = ctx.newOutput(reflect.TypeOf(output))
|
|
|
|
options, err := NewInvokeOptions(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Note that we're about to make an outstanding RPC request, so that we can rendezvous during shutdown.
|
|
if err := ctx.beginRPC(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
prepareCallRequest := func() (*pulumirpc.ResourceCallRequest, error) {
|
|
// Determine the provider, version and url to use.
|
|
var provider ProviderResource
|
|
var version string
|
|
var pluginURL string
|
|
if self != nil {
|
|
provider = self.getProvider()
|
|
version = self.getVersion()
|
|
pluginURL = self.getPluginDownloadURL()
|
|
} else {
|
|
providers, err := ctx.mergeProviders(tok, options.Parent, options.Provider, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
provider = providers[getPackage(tok)]
|
|
version = options.Version
|
|
pluginURL = options.PluginDownloadURL
|
|
}
|
|
var providerRef string
|
|
if provider != nil {
|
|
pr, err := ctx.resolveProviderReference(provider)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
providerRef = pr
|
|
}
|
|
|
|
// Serialize all args, first by awaiting them, and then marshaling them to the requisite gRPC values.
|
|
resolvedArgs, argDeps, _, err := marshalInputs(args)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling args: %w", err)
|
|
}
|
|
|
|
// If we have a value for self, add it to the arguments.
|
|
if self != nil {
|
|
var deps []URN
|
|
resolvedSelf, selfDeps, err := marshalInput(self, reflect.TypeOf(self), true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling __self__: %w", err)
|
|
}
|
|
for _, dep := range selfDeps {
|
|
depURN, _, _, err := dep.URN().awaitURN(context.TODO())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
deps = append(deps, depURN)
|
|
}
|
|
resolvedArgs["__self__"] = resolvedSelf
|
|
argDeps["__self__"] = deps
|
|
}
|
|
|
|
// Marshal all properties for the RPC call.
|
|
rpcArgs, err := plugin.MarshalProperties(
|
|
resolvedArgs,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
KeepOutputValues: ctx.state.keepOutputValues,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling args: %w", err)
|
|
}
|
|
|
|
// Convert the arg dependencies map for RPC and remove duplicates.
|
|
rpcArgDeps := make(map[string]*pulumirpc.ResourceCallRequest_ArgumentDependencies)
|
|
for k, deps := range argDeps {
|
|
sort.Slice(deps, func(i, j int) bool { return deps[i] < deps[j] })
|
|
|
|
urns := slice.Prealloc[string](len(deps))
|
|
for i, d := range deps {
|
|
if i > 0 && urns[i-1] == string(d) {
|
|
continue
|
|
}
|
|
urns = append(urns, string(d))
|
|
}
|
|
|
|
rpcArgDeps[k] = &pulumirpc.ResourceCallRequest_ArgumentDependencies{
|
|
Urns: urns,
|
|
}
|
|
}
|
|
|
|
return &pulumirpc.ResourceCallRequest{
|
|
Tok: tok,
|
|
Args: rpcArgs,
|
|
ArgDependencies: rpcArgDeps,
|
|
Provider: providerRef,
|
|
Version: version,
|
|
PluginDownloadURL: pluginURL,
|
|
PackageRef: packageRef,
|
|
}, nil
|
|
}
|
|
|
|
// Kick off the call.
|
|
go func() {
|
|
var ret *structpb.Struct
|
|
var deps []Resource
|
|
var err error
|
|
defer func() {
|
|
defer ctx.endRPC(err)
|
|
|
|
var outprops resource.PropertyMap
|
|
if err == nil {
|
|
outprops, err = plugin.UnmarshalProperties(ret, plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
}
|
|
if err != nil {
|
|
logging.V(9).Infof("Call(%s, ...): success: w/ unmarshal error: %v", tok, err)
|
|
internal.RejectOutput(output, err)
|
|
return
|
|
}
|
|
|
|
// Allocate storage for the unmarshalled output.
|
|
var secret bool
|
|
dest := reflect.New(output.ElementType()).Elem()
|
|
known := !outprops.ContainsUnknowns()
|
|
secret, err = unmarshalOutput(ctx, resource.NewObjectProperty(outprops), dest)
|
|
if err != nil {
|
|
internal.RejectOutput(output, err)
|
|
} else {
|
|
internal.ResolveOutput(output, dest.Interface(), known, secret, resourcesToInternal(deps))
|
|
}
|
|
|
|
logging.V(9).Infof("Call(%s, ...): success: w/ %d outs (err=%v)", tok, len(outprops), err)
|
|
}()
|
|
|
|
// Prepare the RPC request.
|
|
var req *pulumirpc.ResourceCallRequest
|
|
req, err = prepareCallRequest()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Now, call the RPC.
|
|
var resp *pulumirpc.CallResponse
|
|
logging.V(9).Infof("Call(%s): Goroutine spawned, RPC call being made", tok)
|
|
resp, err = ctx.state.monitor.Call(ctx.ctx, req)
|
|
if err != nil {
|
|
logging.V(9).Infof("Call(%s, ...): error: %v", tok, err)
|
|
} else if len(resp.Failures) > 0 {
|
|
logging.V(9).Infof("Call(%s, ...): success: w/ %d failures", tok, len(resp.Failures))
|
|
for _, failure := range resp.Failures {
|
|
err = multierror.Append(err,
|
|
fmt.Errorf("%s call failed: %s (%s)", tok, failure.Reason, failure.Property))
|
|
}
|
|
}
|
|
|
|
if resp != nil {
|
|
ret = resp.Return
|
|
|
|
// Combine the individual dependencies into a single set of dependency resources.
|
|
urns := make(map[string]struct{})
|
|
for _, returnDependencies := range resp.GetReturnDependencies() {
|
|
for _, urn := range returnDependencies.GetUrns() {
|
|
urns[urn] = struct{}{}
|
|
}
|
|
}
|
|
for urn := range urns {
|
|
deps = append(deps, ctx.newDependencyResource(URN(urn)))
|
|
}
|
|
}
|
|
}()
|
|
|
|
return output, nil
|
|
}
|
|
|
|
// ReadResource reads an existing custom resource's state from the resource monitor. t is the fully qualified type
|
|
// token and name is the "name" part to use in creating a stable and globally unique URN for the object. id is the ID
|
|
// of the resource to read, and props contains any state necessary to perform the read (typically props will be nil).
|
|
// opts contains optional settings that govern the way the resource is managed.
|
|
//
|
|
// The value passed to resource must be a pointer to a struct. The fields of this struct that correspond to output
|
|
// properties of the resource must have types that are assignable from Output, and must have a `pulumi` tag that
|
|
// records the name of the corresponding output property. The struct must embed the CustomResourceState type.
|
|
//
|
|
// For example, given a custom resource with an int-typed output "foo" and a string-typed output "bar", one would
|
|
// define the following CustomResource type:
|
|
//
|
|
// type MyResource struct {
|
|
// pulumi.CustomResourceState
|
|
//
|
|
// Foo pulumi.IntOutput `pulumi:"foo"`
|
|
// Bar pulumi.StringOutput `pulumi:"bar"`
|
|
// }
|
|
//
|
|
// And invoke ReadResource like so:
|
|
//
|
|
// var resource MyResource
|
|
// err := ctx.ReadResource(tok, name, id, nil, &resource, opts...)
|
|
func (ctx *Context) ReadResource(
|
|
t, name string, id IDInput, props Input, resource CustomResource, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.readPackageResource(t, name, id, props, resource, "" /* packageRef */, opts...)
|
|
}
|
|
|
|
// ReadPackageResource reads an existing custom resource's state from the resource monitor. t is the fully qualified
|
|
// type token and name is the "name" part to use in creating a stable and globally unique URN for the object. id is the
|
|
// ID of the resource to read, and props contains any state necessary to perform the read (typically props will be nil).
|
|
// opts contains optional settings that govern the way the resource is managed.
|
|
//
|
|
// The value passed to resource must be a pointer to a struct. The fields of this struct that correspond to output
|
|
// properties of the resource must have types that are assignable from Output, and must have a `pulumi` tag that records
|
|
// the name of the corresponding output property. The struct must embed the CustomResourceState type.
|
|
//
|
|
// For example, given a custom resource with an int-typed output "foo" and a string-typed output "bar", one would define
|
|
// the following CustomResource type:
|
|
//
|
|
// type MyResource struct {
|
|
// pulumi.CustomResourceState
|
|
//
|
|
// Foo pulumi.IntOutput `pulumi:"foo"`
|
|
// Bar pulumi.StringOutput `pulumi:"bar"`
|
|
// }
|
|
//
|
|
// And invoke ReadPackageResource like so:
|
|
//
|
|
// var resource MyResource
|
|
// err := ctx.ReadPackageResource(tok, name, id, nil, &resource, opts...)
|
|
func (ctx *Context) ReadPackageResource(
|
|
t, name string, id IDInput, props Input, resource CustomResource, packageRef string, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.readPackageResource(t, name, id, props, resource, packageRef, opts...)
|
|
}
|
|
|
|
func (ctx *Context) readPackageResource(
|
|
t, name string, id IDInput, props Input, resource CustomResource, packageRef string, opts ...ResourceOption,
|
|
) error {
|
|
if t == "" {
|
|
return errors.New("resource type argument cannot be empty")
|
|
} else if name == "" {
|
|
return errors.New("resource name argument (for URN creation) cannot be empty")
|
|
} else if id == nil {
|
|
return errors.New("resource ID is required for lookup and cannot be empty")
|
|
}
|
|
|
|
if props != nil {
|
|
propsType := reflect.TypeOf(props)
|
|
if propsType.Kind() == reflect.Ptr {
|
|
propsType = propsType.Elem()
|
|
}
|
|
if !(propsType.Kind() == reflect.Struct ||
|
|
(propsType.Kind() == reflect.Map && propsType.Key().Kind() == reflect.String)) {
|
|
return errors.New("props must be a struct or map or a pointer to a struct or map")
|
|
}
|
|
}
|
|
|
|
options := merge(opts...)
|
|
parent := options.Parent
|
|
if options.Parent == nil {
|
|
options.Parent = ctx.state.stack
|
|
}
|
|
|
|
// Before anything else, if there are transformations registered, give them a chance to run to modify the
|
|
// user-provided properties and options assigned to this resource.
|
|
var transformations []ResourceTransformation
|
|
var err error
|
|
props, options, transformations, err = applyTransformations(t, name, props, resource, opts, options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Collapse aliases to URNs.
|
|
var aliasURNs []URNOutput
|
|
if options.Aliases != nil {
|
|
aliasURNs = make([]URNOutput, len(options.Aliases))
|
|
project, stack := ctx.Project(), ctx.Stack()
|
|
for i, alias := range options.Aliases {
|
|
aliasURN, err := alias.collapseToURN(name, t, parent, project, stack)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to collapse alias to URN: %w", err)
|
|
}
|
|
aliasURNs[i] = aliasURN
|
|
}
|
|
}
|
|
|
|
if options.DeletedWith != nil && !ctx.state.supportsDeletedWith {
|
|
return errors.New("the Pulumi CLI does not support the DeletedWith option. Please update the Pulumi CLI")
|
|
}
|
|
|
|
// Note that we're about to make an outstanding RPC request, so that we can rendezvous during shutdown.
|
|
if err := ctx.beginRPC(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Merge providers.
|
|
providers, err := ctx.mergeProviders(t, options.Parent, options.Provider, options.Providers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get the provider for the resource.
|
|
provider := getProvider(t, options.Provider, providers)
|
|
|
|
// Create resolvers for the resource's outputs.
|
|
res := ctx.makeResourceState(t, name, resource, providers, provider,
|
|
options.Version, options.PluginDownloadURL, aliasURNs, transformations)
|
|
|
|
// Get the source position for the resource registration. Note that this assumes that there is an intermediate
|
|
// between the this function and user code.
|
|
sourcePosition := ctx.getSourcePosition(3)
|
|
|
|
// Kick off the resource read operation. This will happen asynchronously and resolve the above properties.
|
|
go func() {
|
|
// No matter the outcome, make sure all promises are resolved and that we've signaled completion of this RPC.
|
|
var urn, resID string
|
|
var inputs *resourceInputs
|
|
var state *structpb.Struct
|
|
var err error
|
|
defer func() {
|
|
res.resolve(ctx, err, inputs, urn, resID, state, nil, false)
|
|
ctx.endRPC(err)
|
|
}()
|
|
|
|
idToRead, known, _, err := id.ToIDOutput().awaitID(context.TODO())
|
|
if !known || err != nil {
|
|
return
|
|
}
|
|
|
|
// Prepare the inputs for an impending operation.
|
|
inputs, err = ctx.prepareResourceInputs(resource, props, t, options, res, false /* remote */, true /* custom */)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
logging.V(9).Infof("ReadResource(%s, %s): Goroutine spawned, RPC call being made", t, name)
|
|
resp, err := ctx.state.monitor.ReadResource(ctx.ctx, &pulumirpc.ReadResourceRequest{
|
|
Type: t,
|
|
Name: name,
|
|
Parent: inputs.parent,
|
|
Properties: inputs.rpcProps,
|
|
Provider: inputs.provider,
|
|
Id: string(idToRead),
|
|
AcceptSecrets: true,
|
|
AcceptResources: !disableResourceReferences,
|
|
AdditionalSecretOutputs: inputs.additionalSecretOutputs,
|
|
SourcePosition: sourcePosition,
|
|
PackageRef: packageRef,
|
|
})
|
|
if err != nil {
|
|
logging.V(9).Infof("ReadResource(%s, %s): error: %v", t, name, err)
|
|
} else {
|
|
logging.V(9).Infof("ReadResource(%s, %s): success: %s %s ...", t, name, resp.Urn, id)
|
|
}
|
|
if resp != nil {
|
|
urn, resID = resp.Urn, string(idToRead)
|
|
state = resp.Properties
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ctx *Context) getResource(urn string) (*pulumirpc.RegisterResourceResponse, error) {
|
|
// This is a resource that already exists. Read its state from the engine.
|
|
resolvedArgsMap := resource.NewPropertyMapFromMap(map[string]interface{}{
|
|
"urn": urn,
|
|
})
|
|
|
|
rpcArgs, err := plugin.MarshalProperties(
|
|
resolvedArgsMap,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling arguments: %w", err)
|
|
}
|
|
|
|
tok := "pulumi:pulumi:getResource"
|
|
logging.V(9).Infof("Invoke(%s, #args=%d): RPC call being made synchronously", tok, len(resolvedArgsMap))
|
|
resp, err := ctx.state.monitor.Invoke(ctx.ctx, &pulumirpc.ResourceInvokeRequest{
|
|
Tok: "pulumi:pulumi:getResource",
|
|
Args: rpcArgs,
|
|
AcceptResources: !disableResourceReferences,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invoke(%s, ...): error: %w", tok, err)
|
|
}
|
|
|
|
// If there were any failures from the provider, return them.
|
|
if len(resp.Failures) > 0 {
|
|
logging.V(9).Infof("Invoke(%s, ...): success: w/ %d failures", tok, len(resp.Failures))
|
|
var ferr error
|
|
for _, failure := range resp.Failures {
|
|
ferr = multierror.Append(ferr,
|
|
fmt.Errorf("%s invoke failed: %s (%s)", tok, failure.Reason, failure.Property))
|
|
}
|
|
return nil, ferr
|
|
}
|
|
|
|
return &pulumirpc.RegisterResourceResponse{
|
|
Urn: resp.Return.Fields["urn"].GetStringValue(),
|
|
Id: resp.Return.Fields["id"].GetStringValue(),
|
|
Object: resp.Return.Fields["state"].GetStructValue(),
|
|
}, nil
|
|
}
|
|
|
|
func (ctx *Context) registerResource(
|
|
t, name string, props Input, resource Resource, remote bool, packageRef string, opts ...ResourceOption,
|
|
) error {
|
|
if t == "" {
|
|
return errors.New("resource type argument cannot be empty")
|
|
} else if name == "" {
|
|
return errors.New("resource name argument (for URN creation) cannot be empty")
|
|
}
|
|
|
|
if _, isProvider := resource.(ProviderResource); isProvider && !strings.HasPrefix(t, "pulumi:providers:") {
|
|
return errors.New("provider resource type must begin with \"pulumi:providers:\"")
|
|
}
|
|
|
|
if props != nil {
|
|
propsType := reflect.TypeOf(props)
|
|
if propsType.Kind() == reflect.Ptr {
|
|
propsType = propsType.Elem()
|
|
}
|
|
if !(propsType.Kind() == reflect.Struct ||
|
|
(propsType.Kind() == reflect.Map && propsType.Key().Kind() == reflect.String)) {
|
|
return errors.New("props must be a struct or map or a pointer to a struct or map")
|
|
}
|
|
}
|
|
|
|
options := merge(opts...)
|
|
|
|
if parent := options.Parent; parent != nil && internal.GetOutputState(parent.URN()) == nil {
|
|
// Guard against uninitialized parent resources to prevent
|
|
// panics from invalid state further down the line.
|
|
// Uninitialized parent resources won't have a URN.
|
|
|
|
resourceType := "resource"
|
|
registerMethod := "RegisterResource"
|
|
if _, parentIsCustom := parent.(CustomResource); !parentIsCustom {
|
|
resourceType = "component resource"
|
|
registerMethod = "RegisterComponentResource"
|
|
}
|
|
err := ctx.Log.Warn(fmt.Sprintf(
|
|
"Ignoring %v %T (parent of %v :: %v) because it was not registered with %v",
|
|
resourceType, parent, name, t, registerMethod), nil /* args */)
|
|
contract.IgnoreError(err)
|
|
|
|
options.Parent = nil
|
|
}
|
|
|
|
_, custom := resource.(CustomResource)
|
|
isRemoteComponentOrRehydratedComponent := !custom && (remote || options.URN != "")
|
|
if isRemoteComponentOrRehydratedComponent {
|
|
resource.setKeepDependency()
|
|
}
|
|
|
|
parent := options.Parent
|
|
if options.Parent == nil {
|
|
options.Parent = ctx.state.stack
|
|
}
|
|
|
|
// Before anything else, if there are transformations registered, give them a chance to run to modify the
|
|
// user-provided properties and options assigned to this resource.
|
|
props, options, transformations, err := applyTransformations(t, name, props, resource, opts, options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Collapse aliases to URNs.
|
|
var aliasURNs []URNOutput
|
|
if options.Aliases != nil {
|
|
aliasURNs = make([]URNOutput, len(options.Aliases))
|
|
project, stack := ctx.Project(), ctx.Stack()
|
|
for i, alias := range options.Aliases {
|
|
aliasURN, err := alias.collapseToURN(name, t, parent, project, stack)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to collapse alias to URN: %w", err)
|
|
}
|
|
aliasURNs[i] = aliasURN
|
|
}
|
|
}
|
|
|
|
// Note that we're about to make an outstanding RPC request, so that we can rendezvous during shutdown.
|
|
if err := ctx.beginRPC(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Merge providers.
|
|
providers, err := ctx.mergeProviders(t, options.Parent, options.Provider, options.Providers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get the provider for the resource.
|
|
provider := getProvider(t, options.Provider, providers)
|
|
|
|
// Create resolvers for the resource's outputs.
|
|
resState := ctx.makeResourceState(t, name, resource, providers, provider,
|
|
options.Version, options.PluginDownloadURL, aliasURNs, transformations)
|
|
|
|
// Get the source position for the resource registration. Note that this assumes that there are two intermediate
|
|
// frames between this function and user code.
|
|
sourcePosition := ctx.getSourcePosition(4)
|
|
|
|
// Kick off the resource registration. If we are actually performing a deployment, the resulting properties
|
|
// will be resolved asynchronously as the RPC operation completes. If we're just planning, values won't resolve.
|
|
go func() {
|
|
// No matter the outcome, make sure all promises are resolved and that we've signaled completion of this RPC.
|
|
var urn, resID string
|
|
var inputs *resourceInputs
|
|
var state *structpb.Struct
|
|
deps := make(map[string][]Resource)
|
|
var err error
|
|
keepUnknowns := false
|
|
defer func() {
|
|
resState.resolve(ctx, err, inputs, urn, resID, state, deps, keepUnknowns)
|
|
ctx.endRPC(err)
|
|
}()
|
|
|
|
// Register the transform functions
|
|
transforms := make([]*pulumirpc.Callback, 0, len(options.Transforms))
|
|
for _, t := range options.Transforms {
|
|
var cb *pulumirpc.Callback
|
|
cb, err = ctx.registerTransform(t)
|
|
if err != nil {
|
|
return
|
|
}
|
|
transforms = append(transforms, cb)
|
|
}
|
|
|
|
// Prepare the inputs for an impending operation.
|
|
inputs, err = ctx.prepareResourceInputs(resource, props, t, options, resState, remote, custom)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// initialize both aliases and aliasURNs slices as nil
|
|
// depending on supportsAliasSpecs flag, one of them will be populated
|
|
// and sent off to the engine for registring the resource
|
|
var (
|
|
aliasURNs []string
|
|
aliases []*pulumirpc.Alias
|
|
)
|
|
|
|
if !ctx.state.supportsAliasSpecs {
|
|
aliasURNs = make([]string, len(inputs.aliases))
|
|
for i, alias := range inputs.aliases {
|
|
aliasURNs[i] = alias.GetUrn()
|
|
}
|
|
} else {
|
|
aliases = inputs.aliases
|
|
}
|
|
|
|
var resp *pulumirpc.RegisterResourceResponse
|
|
if options.URN != "" {
|
|
resp, err = ctx.getResource(options.URN)
|
|
if err != nil {
|
|
logging.V(9).Infof("getResource(%s, %s): error: %v", t, name, err)
|
|
} else {
|
|
logging.V(9).Infof("getResource(%s, %s): success: %s %s ...", t, name, resp.Urn, resp.Id)
|
|
}
|
|
} else {
|
|
logging.V(9).Infof("RegisterResource(%s, %s): Goroutine spawned, RPC call being made", t, name)
|
|
resp, err = ctx.state.monitor.RegisterResource(ctx.ctx, &pulumirpc.RegisterResourceRequest{
|
|
Type: t,
|
|
Name: name,
|
|
Parent: inputs.parent,
|
|
Object: inputs.rpcProps,
|
|
Custom: custom,
|
|
Protect: inputs.protect,
|
|
Dependencies: inputs.deps,
|
|
Provider: inputs.provider,
|
|
Providers: inputs.providers,
|
|
PropertyDependencies: inputs.rpcPropertyDeps,
|
|
DeleteBeforeReplace: inputs.deleteBeforeReplace,
|
|
ImportId: inputs.importID,
|
|
CustomTimeouts: inputs.customTimeouts,
|
|
IgnoreChanges: inputs.ignoreChanges,
|
|
AliasURNs: aliasURNs,
|
|
Aliases: aliases,
|
|
AcceptSecrets: true,
|
|
AcceptResources: !disableResourceReferences,
|
|
AdditionalSecretOutputs: inputs.additionalSecretOutputs,
|
|
Version: inputs.version,
|
|
PluginDownloadURL: inputs.pluginDownloadURL,
|
|
Remote: remote,
|
|
ReplaceOnChanges: inputs.replaceOnChanges,
|
|
RetainOnDelete: inputs.retainOnDelete,
|
|
DeletedWith: inputs.deletedWith,
|
|
SourcePosition: sourcePosition,
|
|
Transforms: transforms,
|
|
SupportsResultReporting: true,
|
|
PackageRef: packageRef,
|
|
})
|
|
if err != nil {
|
|
logging.V(9).Infof("RegisterResource(%s, %s): error: %v", t, name, err)
|
|
} else {
|
|
logging.V(9).Infof("RegisterResource(%s, %s): success: %s %s ...", t, name, resp.Urn, resp.Id)
|
|
}
|
|
}
|
|
|
|
if resp != nil {
|
|
urn, resID = resp.Urn, resp.Id
|
|
state = resp.Object
|
|
for key, propertyDependencies := range resp.GetPropertyDependencies() {
|
|
var resources []Resource
|
|
for _, urn := range propertyDependencies.GetUrns() {
|
|
resources = append(resources, &ResourceState{
|
|
urn: URNInput(URN(urn)).ToURNOutput(),
|
|
})
|
|
}
|
|
deps[key] = resources
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// RegisterResource creates and registers a new resource object. t is the fully qualified type token and name is
|
|
// the "name" part to use in creating a stable and globally unique URN for the object. props contains the goal state
|
|
// for the resource object and opts contains optional settings that govern the way the resource is created.
|
|
//
|
|
// The value passed to resource must be a pointer to a struct. The fields of this struct that correspond to output
|
|
// properties of the resource must have types that are assignable from Output, and must have a `pulumi` tag that
|
|
// records the name of the corresponding output property. The struct must embed either the ResourceState or the
|
|
// CustomResourceState type.
|
|
//
|
|
// For example, given a custom resource with an int-typed output "foo" and a string-typed output "bar", one would
|
|
// define the following CustomResource type:
|
|
//
|
|
// type MyResource struct {
|
|
// pulumi.CustomResourceState
|
|
//
|
|
// Foo pulumi.IntOutput `pulumi:"foo"`
|
|
// Bar pulumi.StringOutput `pulumi:"bar"`
|
|
// }
|
|
//
|
|
// And invoke RegisterResource like so:
|
|
//
|
|
// var resource MyResource
|
|
// err := ctx.RegisterResource(tok, name, props, &resource, opts...)
|
|
func (ctx *Context) RegisterResource(
|
|
t, name string, props Input, resource Resource, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.registerResource(t, name, props, resource, false /*remote*/, "" /* packageRef */, opts...)
|
|
}
|
|
|
|
func (ctx *Context) RegisterComponentResource(
|
|
t, name string, resource ComponentResource, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.registerResource(t, name, nil /*props*/, resource, false /*remote*/, "" /* packageRef */, opts...)
|
|
}
|
|
|
|
func (ctx *Context) RegisterRemoteComponentResource(
|
|
t, name string, props Input, resource ComponentResource, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.registerResource(t, name, props, resource, true /*remote*/, "" /* packageRef */, opts...)
|
|
}
|
|
|
|
func (ctx *Context) RegisterPackageResource(
|
|
t, name string, props Input, resource Resource, packageRef string, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.registerResource(t, name, props, resource, false /*remote*/, packageRef, opts...)
|
|
}
|
|
|
|
func (ctx *Context) RegisterPackageRemoteComponentResource(
|
|
t, name string, props Input, resource ComponentResource, packageRef string, opts ...ResourceOption,
|
|
) error {
|
|
return ctx.registerResource(t, name, props, resource, true /*remote*/, packageRef, opts...)
|
|
}
|
|
|
|
func (ctx *Context) RegisterPackage(
|
|
in *pulumirpc.RegisterPackageRequest,
|
|
) (*pulumirpc.RegisterPackageResponse, error) {
|
|
return ctx.state.monitor.RegisterPackage(ctx.ctx, in)
|
|
}
|
|
|
|
// resourceState contains the results of a resource registration operation.
|
|
type resourceState struct {
|
|
rawOutputs Output
|
|
outputs map[string]Output
|
|
providers map[string]ProviderResource
|
|
provider ProviderResource
|
|
version string
|
|
pluginDownloadURL string
|
|
name string
|
|
transformations []ResourceTransformation
|
|
}
|
|
|
|
// Apply transformations and return the transformations themselves, as well as the transformed props and opts.
|
|
func applyTransformations(t, name string, props Input, resource Resource, opts []ResourceOption,
|
|
options *resourceOptions,
|
|
) (Input, *resourceOptions, []ResourceTransformation, error) {
|
|
transformations := options.Transformations
|
|
if options.Parent != nil {
|
|
transformations = append(transformations, options.Parent.getTransformations()...)
|
|
}
|
|
|
|
for _, transformation := range transformations {
|
|
args := &ResourceTransformationArgs{
|
|
Resource: resource,
|
|
Type: t,
|
|
Name: name,
|
|
Props: props,
|
|
Opts: opts,
|
|
}
|
|
|
|
res := transformation(args)
|
|
if res != nil {
|
|
resOptions := merge(res.Opts...)
|
|
|
|
if resOptions.Parent != nil && resOptions.Parent.URN() != options.Parent.URN() {
|
|
return nil, nil, nil, errors.New("transformations cannot currently be used to change the `parent` of a resource")
|
|
}
|
|
props = res.Props
|
|
options = resOptions
|
|
}
|
|
}
|
|
|
|
return props, options, transformations, nil
|
|
}
|
|
|
|
// checks all possible sources of providers and merges them with preference given to the most specific
|
|
func (ctx *Context) mergeProviders(t string, parent Resource, provider ProviderResource,
|
|
providerMap map[string]ProviderResource,
|
|
) (map[string]ProviderResource, error) {
|
|
// copy parent providers
|
|
result := make(map[string]ProviderResource)
|
|
if parent != nil {
|
|
for k, v := range parent.getProviders() {
|
|
result[k] = v
|
|
}
|
|
}
|
|
|
|
// copy provider map
|
|
for k, v := range providerMap {
|
|
result[k] = v
|
|
}
|
|
|
|
// copy specific provider, if any
|
|
if provider != nil {
|
|
pkg := provider.getPackage()
|
|
// We want to warn users if there's a conflicting
|
|
// provider entry in the map.
|
|
// However, since we merge the Provider into the Providers map
|
|
// when combining the functional options,
|
|
// there will always be a conflicting entry in the map.
|
|
// So we need to also check that it's a different provider.
|
|
if other, alreadyExists := providerMap[pkg]; alreadyExists && other != provider {
|
|
err := ctx.Log.Warn(fmt.Sprintf("Provider for %s conflicts with providers map. %s %s", pkg,
|
|
"This will become an error in a future version.",
|
|
"See https://github.com/pulumi/pulumi/issues/8799 for more details.",
|
|
), nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
result[pkg] = provider
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// getProvider gets the provider for the resource.
|
|
func getProvider(t string, provider ProviderResource, providers map[string]ProviderResource) ProviderResource {
|
|
pkg := getPackage(t)
|
|
if provider == nil || provider.getPackage() != pkg {
|
|
provider = providers[pkg]
|
|
}
|
|
return provider
|
|
}
|
|
|
|
// getPackage takes in a type and returns the pkg
|
|
func getPackage(t string) string {
|
|
components := strings.Split(t, ":")
|
|
if len(components) != 3 {
|
|
return ""
|
|
}
|
|
return components[0]
|
|
}
|
|
|
|
// collapseAliases collapses a list of Aliases into a list of URNs. Parent aliases
|
|
// are also included. If there are N child aliases, and M parent aliases, there will
|
|
// be (M+1)*(N+1)-1 total aliases, or, as calculated in the logic below, N+(M*(1+N)).
|
|
func (ctx *Context) collapseAliases(aliases []Alias, t, name string, parent Resource) ([]URNOutput, error) {
|
|
project, stack := ctx.Project(), ctx.Stack()
|
|
|
|
aliasURNs := slice.Prealloc[URNOutput](len(aliases))
|
|
|
|
for _, alias := range aliases {
|
|
urn, err := alias.collapseToURN(name, t, parent, project, stack)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error collapsing alias to URN: %w", err)
|
|
}
|
|
aliasURNs = append(aliasURNs, urn)
|
|
}
|
|
|
|
if parent != nil {
|
|
parentAliases := parent.getAliases()
|
|
for i := range parentAliases {
|
|
parentAlias := parentAliases[i]
|
|
urn := inheritedChildAlias(name, parent.getName(), t, project, stack, parentAlias)
|
|
aliasURNs = append(aliasURNs, urn)
|
|
for j := range aliases {
|
|
childAlias := aliases[j]
|
|
urn, err := childAlias.collapseToURN(name, t, parent, project, stack)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error collapsing alias to URN: %w", err)
|
|
}
|
|
inheritedAlias := urn.ApplyT(func(urn URN) URNOutput {
|
|
aliasedChildName := resource.URN(urn).Name()
|
|
aliasedChildType := string(resource.URN(urn).Type())
|
|
return inheritedChildAlias(aliasedChildName, parent.getName(), aliasedChildType, project, stack, parentAlias)
|
|
}).ApplyT(func(urn interface{}) URN {
|
|
return urn.(URN)
|
|
}).(URNOutput)
|
|
aliasURNs = append(aliasURNs, inheritedAlias)
|
|
}
|
|
}
|
|
}
|
|
|
|
return aliasURNs, nil
|
|
}
|
|
|
|
var mapOutputType = reflect.TypeOf((*MapOutput)(nil)).Elem()
|
|
|
|
// makeResourceState creates a set of resolvers that we'll use to finalize state, for URNs, IDs, and output
|
|
// properties.
|
|
func (ctx *Context) makeResourceState(t, name string, resourceV Resource, providers map[string]ProviderResource,
|
|
provider ProviderResource, version, pluginDownloadURL string, aliases []URNOutput,
|
|
transformations []ResourceTransformation,
|
|
) *resourceState {
|
|
// Ensure that the input res is a pointer to a struct. Note that we don't fail if it is not, and we probably
|
|
// ought to.
|
|
res := reflect.ValueOf(resourceV)
|
|
typ := res.Type()
|
|
if typ.Kind() != reflect.Ptr || typ.Elem().Kind() != reflect.Struct {
|
|
return &resourceState{}
|
|
}
|
|
res, typ = res.Elem(), typ.Elem()
|
|
|
|
var rs *ResourceState
|
|
var crs *CustomResourceState
|
|
var prs *ProviderResourceState
|
|
|
|
// Check to see if a value of exactly `*ResourceState`, `*CustomResourceState`, or `*ProviderResourceState` was
|
|
// provided.
|
|
switch r := resourceV.(type) {
|
|
case *ResourceState:
|
|
rs = r
|
|
case *CustomResourceState:
|
|
crs = r
|
|
case *ProviderResourceState:
|
|
prs = r
|
|
}
|
|
|
|
// Find the particular Resource implementation and the settable, `pulumi`-tagged fields in the input type. The
|
|
// former is used for any URN or ID fields; the latter are used to determine the expected outputs of the resource
|
|
// after its RegisterResource call completes. For each of those fields, create an appropriately-typed Output and
|
|
// map the Output to its property name so we can resolve it later.
|
|
state := &resourceState{outputs: map[string]Output{}}
|
|
for i := 0; i < typ.NumField(); i++ {
|
|
fieldV := res.Field(i)
|
|
if !fieldV.CanSet() {
|
|
continue
|
|
}
|
|
|
|
field := typ.Field(i)
|
|
switch {
|
|
case field.Anonymous && field.Type == resourceStateType:
|
|
rs = fieldV.Addr().Interface().(*ResourceState)
|
|
case field.Anonymous && field.Type == customResourceStateType:
|
|
crs = fieldV.Addr().Interface().(*CustomResourceState)
|
|
case field.Anonymous && field.Type == providerResourceStateType:
|
|
prs = fieldV.Addr().Interface().(*ProviderResourceState)
|
|
case field.Type.Implements(outputType):
|
|
tag, has := typ.Field(i).Tag.Lookup("pulumi")
|
|
if !has {
|
|
continue
|
|
}
|
|
|
|
output := ctx.newOutput(field.Type, resourceV)
|
|
fieldV.Set(reflect.ValueOf(output))
|
|
|
|
if tag == "" && field.Type != mapOutputType {
|
|
internal.RejectOutput(output,
|
|
fmt.Errorf("the field %v must be a MapOutput or its tag must be non-empty", field.Name))
|
|
}
|
|
|
|
state.outputs[tag] = output
|
|
}
|
|
}
|
|
|
|
// Create provider- and custom resource-specific state/resolvers.
|
|
if prs != nil {
|
|
crs = &prs.CustomResourceState
|
|
prs.pkg = t[len("pulumi:providers:"):]
|
|
}
|
|
if crs != nil {
|
|
rs = &crs.ResourceState
|
|
crs.id = IDOutput{ctx.newOutputState(idType, resourceV)}
|
|
state.outputs["id"] = crs.id
|
|
}
|
|
|
|
rawOutputState := ctx.newOutputState(anyType, resourceV)
|
|
rawOutputs := AnyOutput{rawOutputState}
|
|
|
|
// Populate ResourceState resolvers. (Pulled into function to keep the nil-ness linter check happy).
|
|
populateResourceStateResolvers := func() {
|
|
contract.Assertf(rs != nil, "ResourceState must not be nil")
|
|
if rs.urn.OutputState != nil {
|
|
err := ctx.Log.Error(
|
|
fmt.Sprintf("The resource named %v (type: %v) was initialized multiple times.", name, t),
|
|
nil)
|
|
contract.IgnoreError(err)
|
|
}
|
|
state.providers = providers
|
|
rs.providers = providers
|
|
state.provider = provider
|
|
rs.provider = provider
|
|
state.version = version
|
|
rs.version = version
|
|
state.rawOutputs = rawOutputs
|
|
rs.rawOutputs = rawOutputs
|
|
state.pluginDownloadURL = pluginDownloadURL
|
|
rs.pluginDownloadURL = pluginDownloadURL
|
|
rs.urn = URNOutput{ctx.newOutputState(urnType, resourceV)}
|
|
state.outputs["urn"] = rs.urn
|
|
state.name = name
|
|
rs.name = name
|
|
rs.aliases = aliases
|
|
state.transformations = transformations
|
|
rs.transformations = transformations
|
|
}
|
|
populateResourceStateResolvers()
|
|
|
|
return state
|
|
}
|
|
|
|
// resolve resolves the resource outputs using the given error and/or values.
|
|
func (state *resourceState) resolve(ctx *Context, err error, inputs *resourceInputs, urn, id string,
|
|
result *structpb.Struct, deps map[string][]Resource, keepUnknowns bool,
|
|
) {
|
|
keepUnknowns = keepUnknowns || ctx.DryRun()
|
|
|
|
var inprops resource.PropertyMap
|
|
if inputs != nil {
|
|
inprops = inputs.resolvedProps
|
|
}
|
|
|
|
var outprops resource.PropertyMap
|
|
if err == nil {
|
|
outprops, err = plugin.UnmarshalProperties(
|
|
result,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
},
|
|
)
|
|
}
|
|
if err != nil {
|
|
// If there was an error, we must reject everything.
|
|
for _, output := range state.outputs {
|
|
internal.RejectOutput(output, err)
|
|
}
|
|
internal.RejectOutput(state.rawOutputs, err)
|
|
return
|
|
}
|
|
|
|
outprops["urn"] = resource.NewStringProperty(urn)
|
|
if id != "" || !keepUnknowns {
|
|
outprops["id"] = resource.NewStringProperty(id)
|
|
} else {
|
|
outprops["id"] = resource.MakeComputed(resource.PropertyValue{})
|
|
}
|
|
|
|
if _, hasRemainingOutput := state.outputs[""]; hasRemainingOutput {
|
|
remaining, known := resource.PropertyMap{}, true
|
|
for k, v := range outprops {
|
|
if v.IsNull() || v.IsComputed() || v.IsOutput() {
|
|
known = !keepUnknowns
|
|
}
|
|
if _, ok := state.outputs[string(k)]; !ok {
|
|
remaining[k] = v
|
|
}
|
|
}
|
|
if !known {
|
|
outprops[""] = resource.MakeComputed(resource.NewStringProperty(""))
|
|
} else {
|
|
outprops[""] = resource.NewObjectProperty(remaining)
|
|
}
|
|
}
|
|
|
|
// We need to wait until after we finish mutating outprops to resolve. Resolving
|
|
// unlocks multithreaded access to the resolved value, making mutation a data race.
|
|
internal.ResolveOutput(state.rawOutputs, outprops, true, false, resourcesToInternal(nil))
|
|
|
|
for k, output := range state.outputs {
|
|
// If this is an unknown or missing value during a dry run, do nothing.
|
|
v, ok := outprops[resource.PropertyKey(k)]
|
|
if !ok && !keepUnknowns {
|
|
v = inprops[resource.PropertyKey(k)]
|
|
}
|
|
|
|
known := true
|
|
if v.IsNull() || v.IsComputed() || v.IsOutput() {
|
|
known = !keepUnknowns
|
|
}
|
|
|
|
// Allocate storage for the unmarshalled output.
|
|
dest := reflect.New(output.ElementType()).Elem()
|
|
secret, err := unmarshalOutput(ctx, v, dest)
|
|
if err != nil {
|
|
internal.RejectOutput(output, err)
|
|
} else {
|
|
internal.ResolveOutput(output, dest.Interface(), known, secret, resourcesToInternal(deps[k]))
|
|
}
|
|
}
|
|
}
|
|
|
|
// resourceInputs reflects all of the inputs necessary to perform core resource RPC operations.
|
|
type resourceInputs struct {
|
|
parent string
|
|
deps []string
|
|
protect bool
|
|
provider string
|
|
providers map[string]string
|
|
resolvedProps resource.PropertyMap
|
|
rpcProps *structpb.Struct
|
|
rpcPropertyDeps map[string]*pulumirpc.RegisterResourceRequest_PropertyDependencies
|
|
deleteBeforeReplace bool
|
|
importID string
|
|
customTimeouts *pulumirpc.RegisterResourceRequest_CustomTimeouts
|
|
ignoreChanges []string
|
|
aliases []*pulumirpc.Alias
|
|
additionalSecretOutputs []string
|
|
version string
|
|
pluginDownloadURL string
|
|
replaceOnChanges []string
|
|
retainOnDelete bool
|
|
deletedWith string
|
|
}
|
|
|
|
func (ctx *Context) resolveAliasParent(alias Alias, spec *pulumirpc.Alias_Spec) error {
|
|
var parentURN URNOutput
|
|
if alias.ParentURN != nil {
|
|
parentURN = alias.ParentURN.ToURNOutput()
|
|
} else if alias.Parent != nil {
|
|
parentURN = alias.Parent.URN()
|
|
} else {
|
|
// alias has no original parent set
|
|
// either use the default parent when alias.NoParent == true
|
|
// or explicitly set the parent to NoParent when alias.NoParent == false
|
|
// in either case, pass the NoParent flag to the engine as is.
|
|
if alias.NoParent == nil {
|
|
spec.Parent = &pulumirpc.Alias_Spec_NoParent{
|
|
NoParent: false,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
noParent, _, _, _, err := internal.AwaitOutput(ctx.Context(), alias.NoParent.ToBoolOutput())
|
|
if err != nil {
|
|
return fmt.Errorf("alias NoParent field could not be resolved: %w", err)
|
|
}
|
|
spec.Parent = &pulumirpc.Alias_Spec_NoParent{
|
|
NoParent: noParent.(bool),
|
|
}
|
|
// We're done here.
|
|
return nil
|
|
}
|
|
|
|
resolvedParentURN, known, secret, err := parentURN.awaitURN(ctx.Context())
|
|
if err != nil {
|
|
return fmt.Errorf("alias parent could not be resolved: %w", err)
|
|
}
|
|
|
|
if !known {
|
|
return errors.New("alias parent urn must be known")
|
|
}
|
|
|
|
if secret {
|
|
return errors.New("alias parent urn must not be secret")
|
|
}
|
|
|
|
spec.Parent = &pulumirpc.Alias_Spec_ParentUrn{
|
|
ParentUrn: string(resolvedParentURN),
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// mapAliases maps a list of aliases coming from resource options
|
|
// to their RPC representation which the engine understands.
|
|
func (ctx *Context) mapAliases(aliases []Alias,
|
|
resourceType string,
|
|
name string,
|
|
parent Resource,
|
|
) ([]*pulumirpc.Alias, error) {
|
|
aliasSpecs := slice.Prealloc[*pulumirpc.Alias](len(aliases))
|
|
await := func(input StringInput) (string, error) {
|
|
if input == nil {
|
|
return "", nil
|
|
}
|
|
content, known, secret, _, err := internal.AwaitOutput(ctx.Context(), input.ToStringOutput())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if !known {
|
|
return "", errors.New("must be known")
|
|
}
|
|
|
|
if secret {
|
|
return "", errors.New("must not be secret")
|
|
}
|
|
|
|
if content == nil {
|
|
// it is fine if the value is nil, we just return an empty string
|
|
// the engine can fill this in
|
|
return "", nil
|
|
}
|
|
|
|
value, ok := content.(string)
|
|
if !ok {
|
|
return "", errors.New("must be a string")
|
|
}
|
|
return value, nil
|
|
}
|
|
|
|
if ctx.state.supportsAliasSpecs {
|
|
for _, alias := range aliases {
|
|
if alias.URN != nil {
|
|
// fully specified URN, map it as is
|
|
aliasUrn, _, _, err := alias.URN.ToURNOutput().awaitURN(ctx.Context())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("alias urn could not be resolved: %w", err)
|
|
}
|
|
newAliasSpec := &pulumirpc.Alias{
|
|
Alias: &pulumirpc.Alias_Urn{
|
|
Urn: string(aliasUrn),
|
|
},
|
|
}
|
|
|
|
aliasSpecs = append(aliasSpecs, newAliasSpec)
|
|
continue
|
|
}
|
|
|
|
aliasName, err := await(alias.Name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("alias name could not be resolved: %w", err)
|
|
}
|
|
|
|
aliasType, err := await(alias.Type)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("alias type could not be resolved: %w", err)
|
|
}
|
|
|
|
aliasProject, err := await(alias.Project)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("alias project could not be resolved: %w", err)
|
|
}
|
|
|
|
aliasStack, err := await(alias.Stack)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("alias stack could not be resolved: %w", err)
|
|
}
|
|
|
|
spec := &pulumirpc.Alias_Spec{
|
|
Name: aliasName,
|
|
Type: aliasType,
|
|
Project: aliasProject,
|
|
Stack: aliasStack,
|
|
}
|
|
|
|
if err := ctx.resolveAliasParent(alias, spec); err != nil {
|
|
return nil, fmt.Errorf("alias parent could not be resolved: %w", err)
|
|
}
|
|
|
|
newAliasSpec := &pulumirpc.Alias{
|
|
Alias: &pulumirpc.Alias_Spec_{
|
|
Spec: spec,
|
|
},
|
|
}
|
|
|
|
aliasSpecs = append(aliasSpecs, newAliasSpec)
|
|
}
|
|
} else {
|
|
// If the engine does not support full alias specs, we will use the URN format
|
|
// Collapse top level aliases into urns
|
|
// this populates the aliasURNs of the resourceInputs
|
|
// which is then used in RegisterResourceRequest
|
|
aliasURNs, err := ctx.collapseAliases(aliases, resourceType, name, parent)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to collapse alias combinations: %w", err)
|
|
}
|
|
|
|
for _, aliasURN := range aliasURNs {
|
|
urn, _, _, err := aliasURN.awaitURN(ctx.Context())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error waiting for alias URN to resolve: %w", err)
|
|
}
|
|
|
|
newAliasSpec := &pulumirpc.Alias{
|
|
Alias: &pulumirpc.Alias_Urn{
|
|
Urn: string(urn),
|
|
},
|
|
}
|
|
|
|
aliasSpecs = append(aliasSpecs, newAliasSpec)
|
|
}
|
|
}
|
|
|
|
return aliasSpecs, nil
|
|
}
|
|
|
|
// prepareResourceInputs prepares the inputs for a resource operation, shared between read and register.
|
|
func (ctx *Context) prepareResourceInputs(res Resource, props Input, t string, opts *resourceOptions,
|
|
state *resourceState, remote, custom bool,
|
|
) (*resourceInputs, error) {
|
|
// Get the parent and dependency URNs from the options, in addition to the protection bit. If there wasn't an
|
|
// explicit parent, and a root stack resource exists, we will automatically parent to that.
|
|
resOpts, err := ctx.getOpts(res, t, state.provider, opts, remote, custom)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("resolving options: %w", err)
|
|
}
|
|
|
|
// Serialize all properties, first by awaiting them, and then marshaling them to the requisite gRPC values.
|
|
resolvedProps, propertyDeps, rpcDeps, err := marshalInputs(props)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
// Marshal all properties for the RPC call.
|
|
rpcProps, err := plugin.MarshalProperties(
|
|
resolvedProps,
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
// To initially scope the use of this new feature, we only keep output values when
|
|
// remote is true (for multi-lang components).
|
|
KeepOutputValues: remote && ctx.state.keepOutputValues,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling properties: %w", err)
|
|
}
|
|
|
|
// Convert the property dependencies map for RPC and remove duplicates.
|
|
rpcPropertyDeps := make(map[string]*pulumirpc.RegisterResourceRequest_PropertyDependencies)
|
|
for k, deps := range propertyDeps {
|
|
urns := make([]string, len(deps))
|
|
for i, d := range deps {
|
|
urns[i] = string(d)
|
|
}
|
|
sort.Strings(urns)
|
|
|
|
rpcPropertyDeps[k] = &pulumirpc.RegisterResourceRequest_PropertyDependencies{
|
|
Urns: urns,
|
|
}
|
|
}
|
|
|
|
// Merge all dependencies with what we got earlier from property marshaling, and remove duplicates.
|
|
var deps []string
|
|
depSet := urnSet{}
|
|
for _, dep := range append(resOpts.depURNs, rpcDeps...) {
|
|
if !depSet.has(dep) {
|
|
deps = append(deps, string(dep))
|
|
depSet.add(dep)
|
|
}
|
|
}
|
|
sort.Strings(deps)
|
|
|
|
aliases, err := ctx.mapAliases(opts.Aliases, t, state.name, opts.Parent)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("mapping aliases: %w", err)
|
|
}
|
|
|
|
var deletedWithURN URN
|
|
if opts.DeletedWith != nil {
|
|
urn, _, _, err := opts.DeletedWith.URN().awaitURN(context.Background())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error waiting for DeletedWith URN to resolve: %w", err)
|
|
}
|
|
deletedWithURN = urn
|
|
}
|
|
|
|
return &resourceInputs{
|
|
parent: string(resOpts.parentURN),
|
|
deps: deps,
|
|
protect: resOpts.protect,
|
|
provider: resOpts.providerRef,
|
|
providers: resOpts.providerRefs,
|
|
resolvedProps: resolvedProps,
|
|
rpcProps: rpcProps,
|
|
rpcPropertyDeps: rpcPropertyDeps,
|
|
deleteBeforeReplace: resOpts.deleteBeforeReplace,
|
|
importID: string(resOpts.importID),
|
|
customTimeouts: getTimeouts(opts.CustomTimeouts),
|
|
ignoreChanges: resOpts.ignoreChanges,
|
|
aliases: aliases,
|
|
additionalSecretOutputs: resOpts.additionalSecretOutputs,
|
|
version: state.version,
|
|
pluginDownloadURL: state.pluginDownloadURL,
|
|
replaceOnChanges: resOpts.replaceOnChanges,
|
|
retainOnDelete: opts.RetainOnDelete,
|
|
deletedWith: string(deletedWithURN),
|
|
}, nil
|
|
}
|
|
|
|
func getTimeouts(custom *CustomTimeouts) *pulumirpc.RegisterResourceRequest_CustomTimeouts {
|
|
var timeouts pulumirpc.RegisterResourceRequest_CustomTimeouts
|
|
if custom != nil {
|
|
timeouts.Update = custom.Update
|
|
timeouts.Create = custom.Create
|
|
timeouts.Delete = custom.Delete
|
|
}
|
|
return &timeouts
|
|
}
|
|
|
|
// Helper struct for the return type of `getOpts`.
|
|
type resourceOpts struct {
|
|
parentURN URN
|
|
depURNs []URN
|
|
protect bool
|
|
providerRef string
|
|
providerRefs map[string]string
|
|
deleteBeforeReplace bool
|
|
importID ID
|
|
ignoreChanges []string
|
|
additionalSecretOutputs []string
|
|
replaceOnChanges []string
|
|
}
|
|
|
|
// getOpts returns a set of resource options from an array of them. This includes the parent URN, any dependency URNs,
|
|
// a boolean indicating whether the resource is to be protected, and the URN and ID of the resource's provider, if any.
|
|
func (ctx *Context) getOpts(
|
|
res Resource, t string, provider ProviderResource, opts *resourceOptions, remote, custom bool,
|
|
) (resourceOpts, error) {
|
|
var importID ID
|
|
if opts.Import != nil {
|
|
id, _, _, err := opts.Import.ToIDOutput().awaitID(context.TODO())
|
|
if err != nil {
|
|
return resourceOpts{}, err
|
|
}
|
|
importID = id
|
|
}
|
|
|
|
var parentURN URN
|
|
if opts.Parent != nil {
|
|
opts.Parent.addChild(res)
|
|
|
|
urn, _, _, err := opts.Parent.URN().awaitURN(context.TODO())
|
|
if err != nil {
|
|
return resourceOpts{}, err
|
|
}
|
|
parentURN = urn
|
|
}
|
|
|
|
var depURNs []URN
|
|
if opts.DependsOn != nil {
|
|
depSet := urnSet{}
|
|
for _, ds := range opts.DependsOn {
|
|
if err := ds.addURNs(ctx.ctx, depSet, res); err != nil {
|
|
return resourceOpts{}, err
|
|
}
|
|
}
|
|
depURNs = depSet.values()
|
|
}
|
|
|
|
var providerRef string
|
|
if provider != nil {
|
|
pr, err := ctx.resolveProviderReference(provider)
|
|
if err != nil {
|
|
return resourceOpts{}, err
|
|
}
|
|
providerRef = pr
|
|
}
|
|
|
|
var providerRefs map[string]string
|
|
if remote || !custom {
|
|
if opts.Providers != nil {
|
|
providerRefs = make(map[string]string, len(opts.Providers))
|
|
for name, provider := range opts.Providers {
|
|
pr, err := ctx.resolveProviderReference(provider)
|
|
if err != nil {
|
|
return resourceOpts{}, err
|
|
}
|
|
providerRefs[name] = pr
|
|
}
|
|
}
|
|
}
|
|
|
|
return resourceOpts{
|
|
parentURN: parentURN,
|
|
depURNs: depURNs,
|
|
protect: opts.Protect,
|
|
providerRef: providerRef,
|
|
providerRefs: providerRefs,
|
|
deleteBeforeReplace: opts.DeleteBeforeReplace,
|
|
importID: importID,
|
|
ignoreChanges: opts.IgnoreChanges,
|
|
additionalSecretOutputs: opts.AdditionalSecretOutputs,
|
|
replaceOnChanges: opts.ReplaceOnChanges,
|
|
}, nil
|
|
}
|
|
|
|
func (ctx *Context) resolveProviderReference(provider ProviderResource) (string, error) {
|
|
urn, _, _, err := provider.URN().awaitURN(context.TODO())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
id, known, _, err := provider.ID().awaitID(context.TODO())
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !known {
|
|
id = rpcTokenUnknownValue
|
|
}
|
|
return string(urn) + "::" + string(id), nil
|
|
}
|
|
|
|
// noMoreRPCs is a sentinel value used to stop subsequent RPCs from occurring.
|
|
const noMoreRPCs = -1
|
|
|
|
// beginRPC attempts to start a new RPC request, returning a non-nil error if no more RPCs are permitted
|
|
// (usually because the program is shutting down).
|
|
func (ctx *Context) beginRPC() error {
|
|
ctx.state.rpcsLock.Lock()
|
|
defer ctx.state.rpcsLock.Unlock()
|
|
|
|
// If we're done with RPCs, return an error.
|
|
if ctx.state.rpcs == noMoreRPCs {
|
|
return errors.New("attempted illegal RPC after program completion")
|
|
}
|
|
|
|
ctx.state.rpcs++
|
|
return nil
|
|
}
|
|
|
|
// endRPC signals the completion of an RPC and notifies any potential awaiters when outstanding RPCs hit zero.
|
|
func (ctx *Context) endRPC(err error) {
|
|
ctx.state.rpcsLock.Lock()
|
|
defer ctx.state.rpcsLock.Unlock()
|
|
|
|
if err != nil && ctx.state.rpcError == nil {
|
|
ctx.state.rpcError = err
|
|
}
|
|
|
|
ctx.state.rpcs--
|
|
if ctx.state.rpcs == 0 {
|
|
ctx.state.rpcsDone.Broadcast()
|
|
}
|
|
}
|
|
|
|
// RegisterResourceOutputs completes the resource registration, attaching an optional set of computed outputs.
|
|
func (ctx *Context) RegisterResourceOutputs(resource Resource, outs Map) error {
|
|
// Note that we're about to make an outstanding RPC request, so that we can rendezvous during shutdown.
|
|
if err := ctx.beginRPC(); err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
// No matter the outcome, make sure all promises are resolved and that we've signaled completion of this RPC.
|
|
var err error
|
|
defer func() {
|
|
// Signal the completion of this RPC and notify any potential awaiters.
|
|
ctx.endRPC(err)
|
|
}()
|
|
|
|
urn, _, _, err := resource.URN().awaitURN(context.TODO())
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
outsResolved, _, err := marshalInput(outs, anyType, true)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
outsMarshalled, err := plugin.MarshalProperties(
|
|
outsResolved.ObjectValue(),
|
|
plugin.MarshalOptions{
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: ctx.state.keepResources,
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Register the outputs
|
|
logging.V(9).Infof("RegisterResourceOutputs(%s): RPC call being made", urn)
|
|
_, err = ctx.state.monitor.RegisterResourceOutputs(ctx.ctx, &pulumirpc.RegisterResourceOutputsRequest{
|
|
Urn: string(urn),
|
|
Outputs: outsMarshalled,
|
|
})
|
|
|
|
logging.V(9).Infof("RegisterResourceOutputs(%s): %v", urn, err)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Export registers a key and value pair with the current context's stack.
|
|
func (ctx *Context) Export(name string, value Input) {
|
|
ctx.state.exports[name] = value
|
|
}
|
|
|
|
// RegisterStackTransformation adds a transformation to all future resources constructed in this Pulumi stack.
|
|
func (ctx *Context) RegisterStackTransformation(t ResourceTransformation) error {
|
|
ctx.state.stack.addTransformation(t)
|
|
return nil
|
|
}
|
|
|
|
// RegisterResourceTransform adds a transform to all future resources constructed in this Pulumi stack.
|
|
func (ctx *Context) RegisterResourceTransform(t ResourceTransform) error {
|
|
cb, err := ctx.registerTransform(t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = ctx.state.monitor.RegisterStackTransform(ctx.ctx, cb)
|
|
return err
|
|
}
|
|
|
|
// RegisterStackTransform adds a transform to all future resources constructed in this Pulumi stack.
|
|
//
|
|
// Deprecated: Use RegisterResourceTransform instead.
|
|
func (ctx *Context) RegisterStackTransform(t ResourceTransform) error {
|
|
return ctx.RegisterResourceTransform(t)
|
|
}
|
|
|
|
// RegisterInvokeTransform adds a transform to all future invokes in this Pulumi stack.
|
|
func (ctx *Context) RegisterInvokeTransform(t InvokeTransform) error {
|
|
cb, err := ctx.registerInvokeTransform(t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = ctx.state.monitor.RegisterStackInvokeTransform(ctx.ctx, cb)
|
|
return err
|
|
}
|
|
|
|
func (ctx *Context) newOutputState(elementType reflect.Type, deps ...Resource) *OutputState {
|
|
return internal.NewOutputState(&ctx.state.join, elementType, resourcesToInternal(deps)...)
|
|
}
|
|
|
|
func (ctx *Context) newOutput(typ reflect.Type, deps ...Resource) Output {
|
|
return internal.NewOutput(&ctx.state.join, typ, resourcesToInternal(deps)...)
|
|
}
|
|
|
|
// NewOutput creates a new output associated with this context.
|
|
func (ctx *Context) NewOutput() (Output, func(interface{}), func(error)) {
|
|
return newAnyOutput(&ctx.state.join)
|
|
}
|
|
|
|
// Returns the source position of the Nth stack frame, where N is skip+1.
|
|
//
|
|
// This is used to compute the source position of the user code that instantiated a resource. The number of frames to
|
|
// skip is parameterized in order to account for differing call stacks for different operations.
|
|
func (ctx *Context) getSourcePosition(skip int) *pulumirpc.SourcePosition {
|
|
var pcs [1]uintptr
|
|
if callers := runtime.Callers(skip+1, pcs[:]); callers != 1 {
|
|
return nil
|
|
}
|
|
frames := runtime.CallersFrames(pcs[:])
|
|
frame, _ := frames.Next()
|
|
if frame.File == "" || frame.Line == 0 {
|
|
return nil
|
|
}
|
|
elems := filepath.SplitList(frame.File)
|
|
for i := range elems {
|
|
elems[i] = url.PathEscape(elems[i])
|
|
}
|
|
var line int32
|
|
if frame.Line <= math.MaxInt32 {
|
|
//nolint:gosec
|
|
line = int32(frame.Line)
|
|
} else {
|
|
// line is out of range for int32, that's a long sourcefile!
|
|
line = -1
|
|
}
|
|
return &pulumirpc.SourcePosition{
|
|
Uri: "project://" + path.Join(elems...),
|
|
Line: line,
|
|
}
|
|
}
|