pulumi/pkg/resource/deploy/source_eval.go

2766 lines
96 KiB
Go

// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deploy
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/blang/semver"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
opentracing "github.com/opentracing/opentracing-go"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"github.com/pulumi/pulumi/pkg/v3/codegen/schema"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/env"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/slice"
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/result"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ryboe/q"
)
// EvalRunInfo provides information required to execute and deploy resources within a package.
type EvalRunInfo struct {
// the package metadata.
Proj *workspace.Project `json:"proj" yaml:"proj"`
// the package's working directory.
Pwd string `json:"pwd" yaml:"pwd"`
// the path to the program.
Program string `json:"program" yaml:"program"`
// the path to the project's directory.
ProjectRoot string `json:"projectRoot,omitempty" yaml:"projectRoot,omitempty"`
// any arguments to pass to the package.
Args []string `json:"args,omitempty" yaml:"args,omitempty"`
// the target being deployed into.
Target *Target `json:"target,omitempty" yaml:"target,omitempty"`
}
// EvalRunInfoOptions provides options for configuring an evaluation source.
type EvalSourceOptions struct {
// true if the evaluation is producing resources for a dry-run/preview.
DryRun bool
// the degree of parallelism for resource operations (<=1 for serial).
Parallel int32
// true to disable resource reference support.
DisableResourceReferences bool
// true to disable output value support.
DisableOutputValues bool
// AttachDebugger to launch the language host in debug mode.
AttachDebugger bool
}
// NewEvalSource returns a planning source that fetches resources by evaluating a package with a set of args and
// a confgiuration map. This evaluation is performed using the given plugin context and may optionally use the
// given plugin host (or the default, if this is nil). Note that closing the eval source also closes the host.
func NewEvalSource(
plugctx *plugin.Context,
runinfo *EvalRunInfo,
defaultProviderInfo map[tokens.Package]workspace.PluginSpec,
opts EvalSourceOptions,
) Source {
return &evalSource{
plugctx: plugctx,
runinfo: runinfo,
defaultProviderInfo: defaultProviderInfo,
opts: opts,
}
}
type evalSource struct {
plugctx *plugin.Context // the plugin context.
runinfo *EvalRunInfo // the directives to use when running the program.
defaultProviderInfo map[tokens.Package]workspace.PluginSpec // the default provider versions for this source.
opts EvalSourceOptions // options for the evaluation source.
}
func (src *evalSource) Close() error {
return nil
}
// Project is the name of the project being run by this evaluation source.
func (src *evalSource) Project() tokens.PackageName {
return src.runinfo.Proj.Name
}
// Stack is the name of the stack being targeted by this evaluation source.
func (src *evalSource) Stack() tokens.StackName {
return src.runinfo.Target.Name
}
func (src *evalSource) Info() interface{} { return src.runinfo }
// Iterate will spawn an evaluator coroutine and prepare to interact with it on subsequent calls to Next.
func (src *evalSource) Iterate(ctx context.Context, providers ProviderSource) (SourceIterator, error) {
tracingSpan := opentracing.SpanFromContext(ctx)
// Decrypt the configuration.
config, err := src.runinfo.Target.Config.Decrypt(src.runinfo.Target.Decrypter)
if err != nil {
return nil, fmt.Errorf("failed to decrypt config: %w", err)
}
// Keep track of any config keys that have secure values.
configSecretKeys := src.runinfo.Target.Config.SecureKeys()
configMap, err := src.runinfo.Target.Config.AsDecryptedPropertyMap(ctx, src.runinfo.Target.Decrypter)
if err != nil {
return nil, fmt.Errorf("failed to convert config to map: %w", err)
}
// First, fire up a resource monitor that will watch for and record resource creation.
regChan := make(chan *registerResourceEvent)
regOutChan := make(chan *registerResourceOutputsEvent)
regReadChan := make(chan *readResourceEvent)
mon, err := newResourceMonitor(
src, providers, regChan, regOutChan, regReadChan, config, configSecretKeys, tracingSpan)
if err != nil {
return nil, fmt.Errorf("failed to start resource monitor: %w", err)
}
// Also start up a schema loader for the language runtime to use to fetch schema information.
loaderRegistration := schema.LoaderRegistration(
schema.NewLoaderServer(schema.NewPluginLoader(src.plugctx.Host)))
loaderServer, err := plugin.NewServer(src.plugctx, loaderRegistration)
if err != nil {
return nil, fmt.Errorf("failed to start loader server: %w", err)
}
// Create a new iterator with appropriate channels, and gear up to go!
iter := &evalSourceIterator{
loaderServer: loaderServer,
mon: mon,
src: src,
regChan: regChan,
regOutChan: regOutChan,
regReadChan: regReadChan,
finChan: make(chan error),
}
// Now invoke Run in a goroutine. All subsequent resource creation events will come in over the gRPC channel,
// and we will pump them through the channel. If the Run call ultimately fails, we need to propagate the error.
iter.forkRun(config, configSecretKeys, configMap)
// Finally, return the fresh iterator that the caller can use to take things from here.
return iter, nil
}
type evalSourceIterator struct {
loaderServer *plugin.GrpcServer // the grpc server for the schema loader.
mon SourceResourceMonitor // the resource monitor, per iterator.
src *evalSource // the owning eval source object.
regChan chan *registerResourceEvent // the channel that contains resource registrations.
regOutChan chan *registerResourceOutputsEvent // the channel that contains resource completions.
regReadChan chan *readResourceEvent // the channel that contains read resource requests.
finChan chan error // the channel that communicates completion.
done bool // set to true when the evaluation is done.
aborted bool // set to true when the iterator is aborted.
}
func (iter *evalSourceIterator) Close() error {
// Cancel the monitor and reclaim any associated resources.
return iter.mon.Cancel()
}
func (iter *evalSourceIterator) ResourceMonitor() SourceResourceMonitor {
return iter.mon
}
func (iter *evalSourceIterator) Next() (SourceEvent, error) {
// if the iterator is aborted, return an error.
if iter.aborted {
return nil, result.BailErrorf("EvalSourceIterator aborted")
}
// If we are done, quit.
if iter.done {
return nil, nil
}
// Await the program to compute some more state and then inspect what it has to say.
select {
case <-iter.mon.AbortChan():
iter.aborted = true
return nil, result.BailErrorf("EvalSourceIterator aborted")
case reg := <-iter.regChan:
contract.Assertf(reg != nil, "received a nil registerResourceEvent")
goal := reg.Goal()
logging.V(5).Infof("EvalSourceIterator produced a registration: t=%v,name=%v,#props=%v",
goal.Type, goal.Name, len(goal.Properties))
return reg, nil
case regOut := <-iter.regOutChan:
contract.Assertf(regOut != nil, "received a nil registerResourceOutputsEvent")
logging.V(5).Infof("EvalSourceIterator produced a completion: urn=%v,#outs=%v",
regOut.URN(), len(regOut.Outputs()))
return regOut, nil
case read := <-iter.regReadChan:
contract.Assertf(read != nil, "received a nil readResourceEvent")
logging.V(5).Infoln("EvalSourceIterator produced a read")
return read, nil
case err := <-iter.finChan:
// If we are finished, we can safely exit. The contract with the language provider is that this implies
// that the language runtime has exited and so calling Close on the plugin is fine.
iter.done = true
if err != nil {
if result.IsBail(err) {
logging.V(5).Infof("EvalSourceIterator ended with bail.")
} else {
logging.V(5).Infof("EvalSourceIterator ended with an error: %v", err)
}
}
return nil, err
}
}
// forkRun performs the evaluation from a distinct goroutine. This function blocks until it's our turn to go.
func (iter *evalSourceIterator) forkRun(
config map[config.Key]string,
configSecretKeys []config.Key,
configPropertyMap resource.PropertyMap,
) {
// Fire up the goroutine to make the RPC invocation against the language runtime. As this executes, calls
// to queue things up in the resource channel will occur, and we will serve them concurrently.
go func() {
// Next, launch the language plugin.
run := func() error {
rt := iter.src.runinfo.Proj.Runtime.Name()
rtopts := iter.src.runinfo.Proj.Runtime.Options()
programInfo := plugin.NewProgramInfo(
/* rootDirectory */ iter.src.runinfo.ProjectRoot,
/* programDirectory */ iter.src.runinfo.Pwd,
/* entryPoint */ iter.src.runinfo.Program,
/* options */ rtopts)
langhost, err := iter.src.plugctx.Host.LanguageRuntime(rt, programInfo)
if err != nil {
return fmt.Errorf("failed to launch language host %s: %w", rt, err)
}
contract.Assertf(langhost != nil, "expected non-nil language host %s", rt)
// Now run the actual program.
progerr, bail, err := langhost.Run(plugin.RunInfo{
MonitorAddress: iter.mon.Address(),
Stack: iter.src.runinfo.Target.Name.String(),
Project: string(iter.src.runinfo.Proj.Name),
Pwd: iter.src.runinfo.Pwd,
Args: iter.src.runinfo.Args,
Config: config,
ConfigSecretKeys: configSecretKeys,
ConfigPropertyMap: configPropertyMap,
DryRun: iter.src.opts.DryRun,
Parallel: iter.src.opts.Parallel,
Organization: string(iter.src.runinfo.Target.Organization),
Info: programInfo,
LoaderAddress: iter.loaderServer.Addr(),
AttachDebugger: iter.src.opts.AttachDebugger,
})
// Check if we were asked to Bail. This a special random constant used for that
// purpose.
if err == nil && bail {
return result.BailErrorf("run bailed")
}
if err == nil && progerr != "" {
// If the program had an unhandled error; propagate it to the caller.
err = fmt.Errorf("an unhandled error occurred: %v", progerr)
}
return err
}
// Communicate the error, if it exists, or nil if the program exited cleanly.
iter.finChan <- run()
}()
}
// defaultProviders manages the registration of default providers. The default provider for a package is the provider
// resource that will be used to manage resources that do not explicitly reference a provider. Default providers will
// only be registered for packages that are used by resources registered by the user's Pulumi program.
type defaultProviders struct {
// A map of package identifiers to versions, used to disambiguate which plugin to load if no version is provided
// by the language host.
defaultProviderInfo map[tokens.Package]workspace.PluginSpec
// A map of ProviderRequest strings to provider references, used to keep track of the set of default providers that
// have already been loaded.
providers map[string]providers.Reference
config plugin.ConfigSource
requests chan defaultProviderRequest
providerRegChan chan<- *registerResourceEvent
cancel <-chan bool
}
type defaultProviderResponse struct {
ref providers.Reference
err error
}
type defaultProviderRequest struct {
req providers.ProviderRequest
response chan<- defaultProviderResponse
}
func (d *defaultProviders) normalizeProviderRequest(req providers.ProviderRequest) providers.ProviderRequest {
// Request that the engine instantiate a specific version of this provider, if one was requested. We'll figure out
// what version to request by:
// 1. Providing the Version field of the ProviderRequest verbatim, if it was provided, otherwise
// 2. Querying the list of default versions provided to us on startup and returning the value associated with
// the given package, if one exists, otherwise
// 3. We give nothing to the engine and let the engine figure it out.
//
// As we tighen up our approach to provider versioning, 2 and 3 will go away and be replaced entirely by 1. 3 is
// especially onerous because the engine selects the "newest" plugin available on the machine, which is generally
// problematic for a lot of reasons.
if req.Version() != nil {
logging.V(5).Infof("normalizeProviderRequest(%s): using version %s from request", req, req.Version())
} else {
if version := d.defaultProviderInfo[req.Package()].Version; version != nil {
logging.V(5).Infof("normalizeProviderRequest(%s): default version hit on version %s", req, version)
req = providers.NewProviderRequest(
req.Package(), version, req.PluginDownloadURL(), req.PluginChecksums(), req.Parameterization())
} else {
logging.V(5).Infof(
"normalizeProviderRequest(%s): default provider miss, sending nil version to engine", req)
}
}
if req.PluginDownloadURL() != "" {
logging.V(5).Infof("normalizeProviderRequest(%s): using pluginDownloadURL %s from request",
req, req.PluginDownloadURL())
} else {
if pluginDownloadURL := d.defaultProviderInfo[req.Package()].PluginDownloadURL; pluginDownloadURL != "" {
logging.V(5).Infof("normalizeProviderRequest(%s): default pluginDownloadURL hit on %s",
req, pluginDownloadURL)
req = providers.NewProviderRequest(
req.Package(), req.Version(), pluginDownloadURL, req.PluginChecksums(), req.Parameterization())
} else {
logging.V(5).Infof(
"normalizeProviderRequest(%s): default pluginDownloadURL miss, sending empty string to engine", req)
}
}
if req.PluginChecksums() != nil {
logging.V(5).Infof("normalizeProviderRequest(%s): using pluginChecksums %v from request",
req, req.PluginChecksums())
} else {
if pluginChecksums := d.defaultProviderInfo[req.Package()].Checksums; pluginChecksums != nil {
logging.V(5).Infof("normalizeProviderRequest(%s): default pluginChecksums hit on %v",
req, pluginChecksums)
req = providers.NewProviderRequest(
req.Package(), req.Version(), req.PluginDownloadURL(), pluginChecksums, req.Parameterization())
} else {
logging.V(5).Infof(
"normalizeProviderRequest(%s): default pluginChecksums miss, sending empty map to engine", req)
}
}
if req.Parameterization() != nil {
logging.V(5).Infof("normalizeProviderRequest(%s): default parameterization miss, sending nil to engine", req)
} else {
logging.V(5).Infof("normalizeProviderRequest(%s): using parameterization %v from request",
req, req.Parameterization())
// TODO: Should Parameterization be in defaultProviderInfo
//if parameterization := d.defaultProviderInfo[req.Package()].Parameterization; parameterization != nil {
// logging.V(5).Infof("normalizeProviderRequest(%s): default parameterization hit on %v",
// req, parameterization)
// req = providers.NewProviderRequest(
// req.Version(), req.Package(), req.PluginDownloadURL(), req.PluginChecksums(), parameterization)
//} else {
// logging.V(5).Infof(
// "normalizeProviderRequest(%s): default parameterization miss, sending nil to engine", req)
//}
}
return req
}
// newRegisterDefaultProviderEvent creates a RegisterResourceEvent and completion channel that can be sent to the
// engine to register a default provider resource for the indicated package.
func (d *defaultProviders) newRegisterDefaultProviderEvent(
req providers.ProviderRequest,
) (*registerResourceEvent, <-chan *RegisterResult, error) {
// Attempt to get the config for the package.
inputs, err := d.config.GetPackageConfig(req.Package())
q.Q("d.config.GetPackageConfig", req.Package(), "==>", inputs)
if err != nil {
return nil, nil, err
}
if req.Version() != nil {
providers.SetProviderVersion(inputs, req.Version())
}
if req.PluginDownloadURL() != "" {
providers.SetProviderURL(inputs, req.PluginDownloadURL())
}
if req.PluginChecksums() != nil {
providers.SetProviderChecksums(inputs, req.PluginChecksums())
}
if req.Parameterization() != nil {
providers.SetProviderName(inputs, req.Name())
providers.SetProviderParameterization(inputs, req.Parameterization())
}
// Create the result channel and the event.
done := make(chan *RegisterResult)
event := &registerResourceEvent{
goal: resource.NewGoal(
providers.MakeProviderType(req.Package()),
req.DefaultName(), true, inputs, "", false, nil, "", nil, nil, nil,
nil, nil, nil, "", nil, nil, false, "", ""),
done: done,
}
return event, done, nil
}
// handleRequest services a single default provider request. If the request is for a default provider that we have
// already loaded, we will return its reference. If the request is for a default provider that has not yet been
// loaded, we will send a register resource request to the engine, wait for it to complete, and then cache and return
// the reference of the loaded provider.
//
// Note that this function must not be called from two goroutines concurrently; it is the responsibility of d.serve()
// to ensure this.
func (d *defaultProviders) handleRequest(req providers.ProviderRequest) (providers.Reference, error) {
logging.V(5).Infof("handling default provider request for package %s", req)
req = d.normalizeProviderRequest(req)
denyCreation, err := d.shouldDenyRequest(req)
if err != nil {
return providers.Reference{}, err
}
if denyCreation {
logging.V(5).Infof("denied default provider request for package %s", req)
return providers.NewDenyDefaultProvider(string(req.Package().Name())), nil
}
// Have we loaded this provider before? Use the existing reference, if so.
//
// Note that we are using the request's String as the key for the provider map. Go auto-derives hash and equality
// functions for aggregates, but the one auto-derived for ProviderRequest does not have the semantics we want. The
// use of a string key here is hacky but gets us the desired semantics - that ProviderRequest is a tuple of
// optional value-typed Version and a package.
ref, ok := d.providers[req.String()]
if ok {
return ref, nil
}
event, done, err := d.newRegisterDefaultProviderEvent(req)
if err != nil {
return providers.Reference{}, err
}
select {
case d.providerRegChan <- event:
case <-d.cancel:
return providers.Reference{}, context.Canceled
}
logging.V(5).Infof("waiting for default provider for package %s", req)
var result *RegisterResult
select {
case result = <-done:
case <-d.cancel:
return providers.Reference{}, context.Canceled
}
logging.V(5).Infof("registered default provider for package %s: %s", req, result.State.URN)
id := result.State.ID
contract.Assertf(id != "", "default provider for package %s has no ID", req)
ref, err = providers.NewReference(result.State.URN, id)
contract.Assertf(err == nil, "could not create provider reference with URN %s and ID %s", result.State.URN, id)
d.providers[req.String()] = ref
return ref, nil
}
// If req should be allowed, or if we should prevent the request.
func (d *defaultProviders) shouldDenyRequest(req providers.ProviderRequest) (bool, error) {
logging.V(9).Infof("checking if %#v should be denied", req)
if req.Package().Name().String() == "pulumi" {
logging.V(9).Infof("we always allow %#v through", req)
return false, nil
}
pConfig, err := d.config.GetPackageConfig("pulumi")
if err != nil {
return true, err
}
denyCreation := false
if value, ok := pConfig["disable-default-providers"]; ok {
array := []interface{}{}
if !value.IsString() {
return true, errors.New("Unexpected encoding of pulumi:disable-default-providers")
}
if value.StringValue() == "" {
// If the list is provided but empty, we don't encode a empty json
// list, we just encode the empty string. Check to ensure we don't
// get parse errors.
return false, nil
}
if err := json.Unmarshal([]byte(value.StringValue()), &array); err != nil {
return true, fmt.Errorf("Failed to parse %s: %w", value.StringValue(), err)
}
for i, v := range array {
s, ok := v.(string)
if !ok {
return true, fmt.Errorf("pulumi:disable-default-providers[%d] must be a string", i)
}
barred := strings.TrimSpace(s)
if barred == "*" || barred == req.Package().Name().String() {
logging.V(7).Infof("denying %s (star=%t)", req, barred == "*")
denyCreation = true
break
}
}
} else {
logging.V(9).Infof("Did not find a config for 'pulumi'")
}
return denyCreation, nil
}
// serve is the primary loop responsible for handling default provider requests.
func (d *defaultProviders) serve() {
for {
select {
case req := <-d.requests:
// Note that we do not need to handle cancellation when sending the response: every message we receive is
// guaranteed to have something waiting on the other end of the response channel.
ref, err := d.handleRequest(req.req)
req.response <- defaultProviderResponse{ref: ref, err: err}
case <-d.cancel:
return
}
}
}
// getDefaultProviderRef fetches the provider reference for the default provider for a particular package.
func (d *defaultProviders) getDefaultProviderRef(req providers.ProviderRequest) (providers.Reference, error) {
response := make(chan defaultProviderResponse)
select {
case d.requests <- defaultProviderRequest{req: req, response: response}:
case <-d.cancel:
return providers.Reference{}, context.Canceled
}
res := <-response
return res.ref, res.err
}
// A transformation function that can be applied to a resource.
type TransformFunction func(
ctx context.Context,
name, typ string, custom bool, parent resource.URN,
props resource.PropertyMap,
opts *pulumirpc.TransformResourceOptions,
) (resource.PropertyMap, *pulumirpc.TransformResourceOptions, error)
// A transformation function that can be applied to an invoke.
type TransformInvokeFunction func(
ctx context.Context, token string, args resource.PropertyMap,
opts *pulumirpc.TransformInvokeOptions,
) (resource.PropertyMap, *pulumirpc.TransformInvokeOptions, error)
type CallbacksClient struct {
pulumirpc.CallbacksClient
conn *grpc.ClientConn
}
func (c *CallbacksClient) Close() error {
return c.conn.Close()
}
func NewCallbacksClient(conn *grpc.ClientConn) *CallbacksClient {
return &CallbacksClient{
CallbacksClient: pulumirpc.NewCallbacksClient(conn),
conn: conn,
}
}
// resmon implements the pulumirpc.ResourceMonitor interface and acts as the gateway between a language runtime's
// evaluation of a program and the internal resource planning and deployment logic.
type resmon struct {
pulumirpc.UnsafeResourceMonitorServer
pendingTransforms map[string][]TransformFunction // pending transformation functions for a constructed resource
pendingTransformsLock sync.Mutex
parents map[resource.URN]resource.URN // map of child URNs to their parent URNs
parentsLock sync.Mutex
resGoals map[resource.URN]resource.Goal // map of seen URNs and their goals.
resGoalsLock sync.Mutex // locks the resGoals map.
diagnostics diag.Sink // logger for user-facing messages
providers ProviderSource // the provider source itself.
componentProviders map[resource.URN]map[string]string // which providers component resources used
componentProvidersLock sync.Mutex // which locks the componentProviders map
defaultProviders *defaultProviders // the default provider manager.
sourcePositions *sourcePositions // source position manager.
constructInfo plugin.ConstructInfo // information for construct and call calls.
regChan chan *registerResourceEvent // the channel to send resource registrations to.
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
abortChan chan bool // a channel that can abort iteration of resources.
cancel chan bool // a channel that can cancel the server.
done <-chan error // a channel that resolves when the server completes.
opts EvalSourceOptions // options for the resource monitor.
// the working directory for the resources sent to this monitor.
workingDirectory string
stackTransformsLock sync.Mutex
stackTransforms []TransformFunction // stack transformation functions
stackInvokeTransformsLock sync.Mutex
stackInvokeTransforms []TransformInvokeFunction // invoke transformation functions
resourceTransformsLock sync.Mutex
resourceTransforms map[resource.URN][]TransformFunction // option transformation functions per resource
callbacksLock sync.Mutex
callbacks map[string]*CallbacksClient // callbacks clients per target address
packageRefLock sync.Mutex
// A map of UUIDs to the description of a provider package they correspond to
packageRefMap map[string]providers.ProviderRequest
}
var _ SourceResourceMonitor = (*resmon)(nil)
// newResourceMonitor creates a new resource monitor RPC server.
func newResourceMonitor(
src *evalSource,
provs ProviderSource,
regChan chan *registerResourceEvent,
regOutChan chan *registerResourceOutputsEvent,
regReadChan chan *readResourceEvent,
config map[config.Key]string,
configSecretKeys []config.Key,
tracingSpan opentracing.Span,
) (*resmon, error) {
abortChan := make(chan bool)
// Create our cancellation channel.
cancel := make(chan bool)
// Create a new default provider manager.
d := &defaultProviders{
defaultProviderInfo: src.defaultProviderInfo,
providers: make(map[string]providers.Reference),
config: src.runinfo.Target,
requests: make(chan defaultProviderRequest),
providerRegChan: regChan,
cancel: cancel,
}
// New up an engine RPC server.
resmon := &resmon{
diagnostics: src.plugctx.Diag,
providers: provs,
defaultProviders: d,
workingDirectory: src.runinfo.Pwd,
sourcePositions: newSourcePositions(src.runinfo.ProjectRoot),
pendingTransforms: map[string][]TransformFunction{},
parents: map[resource.URN]resource.URN{},
resGoals: map[resource.URN]resource.Goal{},
componentProviders: map[resource.URN]map[string]string{},
regChan: regChan,
regOutChan: regOutChan,
regReadChan: regReadChan,
abortChan: abortChan,
cancel: cancel,
opts: src.opts,
callbacks: map[string]*CallbacksClient{},
resourceTransforms: map[resource.URN][]TransformFunction{},
packageRefMap: map[string]providers.ProviderRequest{},
}
// Fire up a gRPC server and start listening for incomings.
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: resmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, resmon)
return nil
},
Options: sourceEvalServeOptions(src.plugctx, tracingSpan, env.DebugGRPC.Value()),
})
if err != nil {
return nil, err
}
resmon.constructInfo = plugin.ConstructInfo{
Project: string(src.runinfo.Proj.Name),
Stack: src.runinfo.Target.Name.String(),
Config: config,
ConfigSecretKeys: configSecretKeys,
DryRun: src.opts.DryRun,
Parallel: src.opts.Parallel,
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", handle.Port),
}
resmon.done = handle.Done
go d.serve()
return resmon, nil
}
func (rm *resmon) AbortChan() <-chan bool {
return rm.abortChan
}
// Get or allocate a new grpc client for the given callback address.
func (rm *resmon) GetCallbacksClient(target string) (*CallbacksClient, error) {
rm.callbacksLock.Lock()
defer rm.callbacksLock.Unlock()
if client, has := rm.callbacks[target]; has {
return client, nil
}
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
client := NewCallbacksClient(conn)
rm.callbacks[target] = client
return client, nil
}
// Address returns the address at which the monitor's RPC server may be reached.
func (rm *resmon) Address() string {
return rm.constructInfo.MonitorAddress
}
// Cancel signals that the engine should be terminated, awaits its termination, and returns any errors that result.
func (rm *resmon) Cancel() error {
close(rm.cancel)
errs := []error{<-rm.done}
for _, client := range rm.callbacks {
errs = append(errs, client.Close())
}
return errors.Join(errs...)
}
func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span, logFile string) []grpc.ServerOption {
serveOpts := rpcutil.OpenTracingServerInterceptorOptions(
tracingSpan,
otgrpc.SpanDecorator(decorateResourceSpans),
)
if logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
// ignoring
return nil
}
metadata := map[string]interface{}{
"mode": "server",
}
serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{
Metadata: metadata,
})...)
}
return serveOpts
}
// getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the
// given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference
// to the default provider for the indicated package.
func (rm *resmon) getProviderReference(defaultProviders *defaultProviders, req providers.ProviderRequest,
rawProviderRef string,
) (providers.Reference, error) {
if rawProviderRef != "" {
// Check if this is a real provider ref (URN::ID) or a package reference (a dashed uuid)
if strings.Contains(rawProviderRef, "::") {
ref, err := providers.ParseReference(rawProviderRef)
if err != nil {
return providers.Reference{}, fmt.Errorf("could not parse provider reference: %w", err)
}
return ref, nil
}
}
ref, err := defaultProviders.getDefaultProviderRef(req)
if err != nil {
return providers.Reference{}, err
}
return ref, nil
}
// getProviderFromSource fetches the provider plugin for a resource, read, or invoke from the given
// package with the given unparsed provider reference. If the unparsed provider reference is empty,
// this function returns the plugin for the indicated package's default provider.
func (rm *resmon) getProviderFromSource(
providerSource ProviderSource, defaultProviders *defaultProviders,
req providers.ProviderRequest, rawProviderRef string,
token tokens.ModuleMember,
) (plugin.Provider, error) {
providerRef, err := rm.getProviderReference(defaultProviders, req, rawProviderRef)
if err != nil {
return nil, fmt.Errorf("getProviderFromSource: %w", err)
} else if providers.IsDenyDefaultsProvider(providerRef) {
msg := diag.GetDefaultProviderDenied("Invoke").Message
return nil, fmt.Errorf(msg, req.Package(), token)
}
provider, ok := providerSource.GetProvider(providerRef)
if !ok {
return nil, fmt.Errorf("unknown provider '%v' -> '%v'", rawProviderRef, providerRef)
}
return provider, nil
}
func parseProviderRequest(
pkg tokens.Package, version,
pluginDownloadURL string, pluginChecksums map[string][]byte,
parameterization *providers.ProviderParameterization,
) (providers.ProviderRequest, error) {
if version == "" {
logging.V(5).Infof("parseProviderRequest(%s): semver version is the empty string", pkg)
return providers.NewProviderRequest(pkg, nil, pluginDownloadURL, pluginChecksums, parameterization), nil
}
parsedVersion, err := semver.Parse(version)
if err != nil {
logging.V(5).Infof("parseProviderRequest(%s, %s): semver version string is invalid: %v", pkg, version, err)
return providers.ProviderRequest{}, err
}
url := strings.TrimSuffix(pluginDownloadURL, "/")
return providers.NewProviderRequest(pkg, &parsedVersion, url, pluginChecksums, parameterization), nil
}
func (rm *resmon) RegisterPackage(ctx context.Context,
req *pulumirpc.RegisterPackageRequest,
) (*pulumirpc.RegisterPackageResponse, error) {
logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v)", req)
name := tokens.Package(req.Name)
if name == "" {
return nil, errors.New("package name is empty")
}
// First parse the request into a ProviderRequest
var version *semver.Version
if req.Version != "" {
v, err := semver.Parse(req.Version)
if err != nil {
return nil, fmt.Errorf("parse package version %s: %w", req.Version, err)
}
version = &v
}
// Parse the parameterization
var parameterization *providers.ProviderParameterization
if req.Parameterization != nil {
parameterizationVersion, err := semver.Parse(req.Parameterization.Version)
if err != nil {
return nil, fmt.Errorf("parse parameter version %s: %w", req.Parameterization.Version, err)
}
// RegisterPackageRequest keeps all the plugin information in the root fields "name", "version" etc, while the
// information about the parameterized package is in the "parameterization" field. Internally in the engine, and
// for resource state we need to flip that around a bit.
parameterization = providers.NewProviderParameterization(
tokens.Package(req.Parameterization.Name),
parameterizationVersion,
req.Parameterization.Value,
)
}
pi := providers.NewProviderRequest(
tokens.Package(req.Name), version, req.DownloadUrl, req.Checksums,
parameterization)
rm.packageRefLock.Lock()
defer rm.packageRefLock.Unlock()
// See if this package is already registered, else add it to the map.
for uuid, candidate := range rm.packageRefMap {
if reflect.DeepEqual(candidate, pi) {
logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v) matched %s", req, uuid)
return &pulumirpc.RegisterPackageResponse{Ref: uuid}, nil
}
}
// Wasn't found add it to the map
uuid := uuid.New().String()
rm.packageRefMap[uuid] = pi
logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v) created %s", req, uuid)
return &pulumirpc.RegisterPackageResponse{Ref: uuid}, nil
}
func (rm *resmon) SupportsFeature(ctx context.Context,
req *pulumirpc.SupportsFeatureRequest,
) (*pulumirpc.SupportsFeatureResponse, error) {
hasSupport := false
// NOTE: DO NOT ADD ANY MORE FEATURES TO THIS LIST
//
// Context: https://github.com/pulumi/pulumi-dotnet/pull/88#pullrequestreview-1265714090
//
// We shouldn't add any more features to this list, copying strings around codebases is prone to bugs.
// Rather than adding a new feature here, setup a new SupportsFeatureV2 method, that takes a grpc enum
// instead. That can then be safely code generated out to each language with no risk of typos.
//
// These old features have to stay as is because old engines DO support them, but wouldn't support the new
// SupportsFeatureV2 method.
switch req.Id {
case "secrets":
hasSupport = true
case "resourceReferences":
hasSupport = !rm.opts.DisableResourceReferences
case "outputValues":
hasSupport = !rm.opts.DisableOutputValues
case "aliasSpecs":
hasSupport = true
case "deletedWith":
hasSupport = true
case "transforms":
hasSupport = true
case "invokeTransforms":
hasSupport = true
case "parameterization":
// N.B This serves a dual purpose of also indicating that package references are supported.
hasSupport = true
}
logging.V(5).Infof("ResourceMonitor.SupportsFeature(id: %s) = %t", req.Id, hasSupport)
return &pulumirpc.SupportsFeatureResponse{
HasSupport: hasSupport,
}, nil
}
// Invoke performs an invocation of a member located in a resource provider.
func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {
// Fetch the token.
tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("ResourceMonitor.Invoke(%s)", tok)
args, err := plugin.UnmarshalProperties(
req.GetArgs(), plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, fmt.Errorf("failed to unmarshal %v args: %w", tok, err)
}
opts := &pulumirpc.TransformInvokeOptions{
Provider: req.GetProvider(),
Version: req.GetVersion(),
PluginDownloadUrl: req.GetPluginDownloadURL(),
PluginChecksums: req.GetPluginChecksums(),
}
// Lock the invoke transforms and run all of those before loading the provider.
err = func() error {
// Function exists to scope the lock
rm.stackInvokeTransformsLock.Lock()
defer rm.stackInvokeTransformsLock.Unlock()
for _, transform := range rm.stackInvokeTransforms {
newArgs, newOpts, err := transform(ctx, string(tok), args, opts)
if err != nil {
return err
}
args = newArgs
opts = newOpts
}
return nil
}()
if err != nil {
return nil, err
}
// Load up the resource provider if necessary.
providerReq, err := parseProviderRequest(
tok.Package(), opts.Version, opts.PluginDownloadUrl, opts.PluginChecksums, nil)
if err != nil {
return nil, err
}
packageRef := req.GetPackageRef()
if packageRef != "" {
var has bool
providerReq, has = rm.packageRefMap[packageRef]
if !has {
return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
}
}
prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, opts.Provider, tok)
if err != nil {
return nil, fmt.Errorf("Invoke: %w", err)
}
// Do the invoke and then return the arguments.
logging.V(5).Infof("ResourceMonitor.Invoke received: tok=%v #args=%v", tok, len(args))
resp, err := prov.Invoke(ctx, plugin.InvokeRequest{
Tok: tok,
Args: args,
})
if err != nil {
return nil, fmt.Errorf("invocation of %v returned an error: %w", tok, err)
}
// Respect `AcceptResources` unless `tok` is for the built-in `pulumi:pulumi:getResource` function,
// in which case always keep resources to maintain the original behavior for older SDKs that are not
// setting the `AccceptResources` flag.
keepResources := req.GetAcceptResources()
if tok == "pulumi:pulumi:getResource" {
keepResources = true
}
mret, err := plugin.MarshalProperties(resp.Properties, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: keepResources,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
}
chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(resp.Failures))
for _, failure := range resp.Failures {
chkfails = append(chkfails, &pulumirpc.CheckFailure{
Property: string(failure.Property),
Reason: failure.Reason,
})
}
return &pulumirpc.InvokeResponse{Return: mret, Failures: chkfails}, nil
}
// Call dynamically executes a method in the provider associated with a component resource.
func (rm *resmon) Call(ctx context.Context, req *pulumirpc.ResourceCallRequest) (*pulumirpc.CallResponse, error) {
// Fetch the token and load up the resource provider if necessary.
tok := tokens.ModuleMember(req.GetTok())
providerReq, err := parseProviderRequest(
tok.Package(), req.GetVersion(),
req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil)
if err != nil {
return nil, err
}
packageRef := req.GetPackageRef()
if packageRef != "" {
var has bool
providerReq, has = rm.packageRefMap[packageRef]
if !has {
return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
}
}
prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider(), tok)
if err != nil {
return nil, err
}
label := fmt.Sprintf("ResourceMonitor.Call(%s)", tok)
args, err := plugin.UnmarshalProperties(
req.GetArgs(), plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
KeepOutputValues: true,
UpgradeToOutputValues: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, fmt.Errorf("failed to unmarshal %v args: %w", tok, err)
}
argDependencies := map[resource.PropertyKey][]resource.URN{}
for name, deps := range req.GetArgDependencies() {
urns := make([]resource.URN, len(deps.Urns))
for i, urn := range deps.Urns {
urn, err := resource.ParseURN(urn)
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency on argument %d URN: %s", i, err))
}
urns[i] = urn
}
argDependencies[resource.PropertyKey(name)] = urns
}
// If we have output values we can add the dependencies from them to the args dependencies map we send to the provider.
for key, output := range args {
argDependencies[key] = extendOutputDependencies(argDependencies[key], output)
}
info := plugin.CallInfo{
Project: rm.constructInfo.Project,
Stack: rm.constructInfo.Stack,
Config: rm.constructInfo.Config,
DryRun: rm.constructInfo.DryRun,
Parallel: rm.constructInfo.Parallel,
MonitorAddress: rm.constructInfo.MonitorAddress,
}
options := plugin.CallOptions{
ArgDependencies: argDependencies,
}
// Do the all and then return the arguments.
logging.V(5).Infof(
"ResourceMonitor.Call received: tok=%v #args=%v #info=%v #options=%v", tok, len(args), info, options)
ret, err := prov.Call(ctx, plugin.CallRequest{
Tok: tok,
Args: args,
Info: info,
Options: options,
})
if err != nil {
return nil, fmt.Errorf("call of %v returned an error: %w", tok, err)
}
if ret.ReturnDependencies == nil {
ret.ReturnDependencies = map[resource.PropertyKey][]resource.URN{}
}
for k, v := range ret.Return {
ret.ReturnDependencies[k] = extendOutputDependencies(ret.ReturnDependencies[k], v)
}
returnDependencies := map[string]*pulumirpc.CallResponse_ReturnDependencies{}
for name, deps := range ret.ReturnDependencies {
urns := make([]string, len(deps))
for i, urn := range deps {
urns[i] = string(urn)
}
returnDependencies[string(name)] = &pulumirpc.CallResponse_ReturnDependencies{Urns: urns}
}
mret, err := plugin.MarshalProperties(ret.Return, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
}
chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(ret.Failures))
for _, failure := range ret.Failures {
chkfails = append(chkfails, &pulumirpc.CheckFailure{
Property: string(failure.Property),
Reason: failure.Reason,
})
}
return &pulumirpc.CallResponse{Return: mret, ReturnDependencies: returnDependencies, Failures: chkfails}, nil
}
func (rm *resmon) StreamInvoke(
req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer,
) error {
tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("ResourceMonitor.StreamInvoke(%s)", tok)
providerReq, err := parseProviderRequest(
tok.Package(), req.GetVersion(),
req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil)
if err != nil {
return err
}
prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider(), tok)
if err != nil {
return err
}
args, err := plugin.UnmarshalProperties(
req.GetArgs(), plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return fmt.Errorf("failed to unmarshal %v args: %w", tok, err)
}
// Synchronously do the StreamInvoke and then return the arguments. This will block until the
// streaming operation completes!
logging.V(5).Infof("ResourceMonitor.StreamInvoke received: tok=%v #args=%v", tok, len(args))
resp, err := prov.StreamInvoke(context.TODO(), plugin.StreamInvokeRequest{
Tok: tok,
Args: args,
OnNext: func(event resource.PropertyMap) error {
mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepResources: req.GetAcceptResources(),
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return fmt.Errorf("failed to marshal return: %w", err)
}
return stream.Send(&pulumirpc.InvokeResponse{Return: mret})
},
})
if err != nil {
return fmt.Errorf("streaming invocation of %v returned an error: %w", tok, err)
}
chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(resp.Failures))
for _, failure := range resp.Failures {
chkfails = append(chkfails, &pulumirpc.CheckFailure{
Property: string(failure.Property),
Reason: failure.Reason,
})
}
if len(chkfails) > 0 {
return stream.Send(&pulumirpc.InvokeResponse{Failures: chkfails})
}
return nil
}
// ReadResource reads the current state associated with a resource from its provider plugin.
func (rm *resmon) ReadResource(ctx context.Context,
req *pulumirpc.ReadResourceRequest,
) (*pulumirpc.ReadResourceResponse, error) {
// Read the basic inputs necessary to identify the plugin.
t, err := tokens.ParseTypeToken(req.GetType())
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, err.Error())
}
name := req.GetName()
parent, err := resource.ParseOptionalURN(req.GetParent())
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent URN: %s", err))
}
provider := req.GetProvider()
if !providers.IsProviderType(t) && provider == "" {
providerReq, err := parseProviderRequest(
t.Package(), req.GetVersion(),
req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil)
if err != nil {
return nil, err
}
packageRef := req.GetPackageRef()
if packageRef != "" {
var has bool
providerReq, has = rm.packageRefMap[packageRef]
if !has {
return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
}
}
ref, provErr := rm.defaultProviders.getDefaultProviderRef(providerReq)
if provErr != nil {
return nil, provErr
} else if providers.IsDenyDefaultsProvider(ref) {
msg := diag.GetDefaultProviderDenied("Read").Message
return nil, fmt.Errorf(msg, req.GetType(), t)
}
provider = ref.String()
}
id := resource.ID(req.GetId())
label := fmt.Sprintf("ResourceMonitor.ReadResource(%s, %s, %s, %s)", id, t, name, provider)
deps := slice.Prealloc[resource.URN](len(req.GetDependencies()))
for _, depURN := range req.GetDependencies() {
urn, err := resource.ParseURN(depURN)
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency: %s", err))
}
deps = append(deps, urn)
}
props, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, err
}
additionalSecretOutputs := slice.Prealloc[resource.PropertyKey](len(req.GetAdditionalSecretOutputs()))
for _, name := range req.GetAdditionalSecretOutputs() {
additionalSecretOutputs = append(additionalSecretOutputs, resource.PropertyKey(name))
}
event := &readResourceEvent{
id: id,
name: name,
baseType: t,
provider: provider,
parent: parent,
props: props,
dependencies: deps,
additionalSecretOutputs: additionalSecretOutputs,
sourcePosition: rm.sourcePositions.getFromRequest(req),
done: make(chan *ReadResult),
}
select {
case rm.regReadChan <- event:
case <-rm.cancel:
logging.V(5).Infof("ResourceMonitor.ReadResource operation canceled, name=%s", name)
return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource registration")
}
// Now block waiting for the operation to finish.
var result *ReadResult
select {
case result = <-event.done:
case <-rm.cancel:
logging.V(5).Infof("ResourceMonitor.ReadResource operation canceled, name=%s", name)
return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on step's done channel")
}
contract.Assertf(result != nil, "ReadResource operation returned a nil result")
marshaled, err := plugin.MarshalProperties(result.State.Outputs, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: req.GetAcceptSecrets(),
KeepResources: req.GetAcceptResources(),
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %s return state: %w", result.State.URN, err)
}
return &pulumirpc.ReadResourceResponse{
Urn: string(result.State.URN),
Properties: marshaled,
}, nil
}
// Wrap the transform callback so the engine can call the callback server, which will then execute the function. The
// wrapper takes care of all the necessary marshalling and unmarshalling.
func (rm *resmon) wrapTransformCallback(cb *pulumirpc.Callback) (TransformFunction, error) {
client, err := rm.GetCallbacksClient(cb.Target)
if err != nil {
return nil, err
}
token := cb.Token
return func(
ctx context.Context, name, typ string, custom bool, parent resource.URN,
props resource.PropertyMap, opts *pulumirpc.TransformResourceOptions,
) (resource.PropertyMap, *pulumirpc.TransformResourceOptions, error) {
logging.V(5).Infof("Transform: name=%v type=%v custom=%v parent=%v props=%v opts=%v",
name, typ, custom, parent, props, opts)
mopts := plugin.MarshalOptions{
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
KeepOutputValues: true,
WorkingDirectory: rm.workingDirectory,
}
mprops, err := plugin.MarshalProperties(props, mopts)
if err != nil {
return nil, nil, err
}
request, err := proto.Marshal(&pulumirpc.TransformRequest{
Name: name,
Type: typ,
Custom: custom,
Properties: mprops,
Options: opts,
})
if err != nil {
return nil, nil, fmt.Errorf("marshaling request: %w", err)
}
resp, err := client.Invoke(ctx, &pulumirpc.CallbackInvokeRequest{
Token: token,
Request: request,
})
if err != nil {
logging.V(5).Infof("Transform callback error: %v", err)
return nil, nil, err
}
var response pulumirpc.TransformResponse
err = proto.Unmarshal(resp.Response, &response)
if err != nil {
return nil, nil, fmt.Errorf("unmarshaling response: %w", err)
}
newOpts := opts
if response.Options != nil {
newOpts = response.Options
}
newProps := props
if response.Properties != nil {
newProps, err = plugin.UnmarshalProperties(response.Properties, mopts)
if err != nil {
return nil, nil, err
}
}
logging.V(5).Infof("Transform: props=%v opts=%v", newProps, newOpts)
return newProps, newOpts, nil
}, nil
}
// Wrap the transform callback so the engine can call the callback server, which will then execute the function. The
// wrapper takes care of all the necessary marshalling and unmarshalling.
func (rm *resmon) wrapInvokeTransformCallback(cb *pulumirpc.Callback) (TransformInvokeFunction, error) {
client, err := rm.GetCallbacksClient(cb.Target)
if err != nil {
return nil, err
}
token := cb.Token
return func(
ctx context.Context, invokeToken string,
args resource.PropertyMap, opts *pulumirpc.TransformInvokeOptions,
) (resource.PropertyMap, *pulumirpc.TransformInvokeOptions, error) {
logging.V(5).Infof("Invoke transform: token=%v props=%v opts=%v",
invokeToken, args, opts)
margs := plugin.MarshalOptions{
KeepUnknowns: true,
KeepSecrets: true,
KeepResources: true,
KeepOutputValues: true,
WorkingDirectory: rm.workingDirectory,
}
mprops, err := plugin.MarshalProperties(args, margs)
if err != nil {
return nil, nil, err
}
var request []byte
request, err = proto.Marshal(&pulumirpc.TransformInvokeRequest{
Token: invokeToken,
Args: mprops,
Options: opts,
})
if err != nil {
return nil, nil, fmt.Errorf("marshaling request: %w", err)
}
resp, err := client.Invoke(ctx, &pulumirpc.CallbackInvokeRequest{
Token: token,
Request: request,
})
if err != nil {
logging.V(5).Infof("Invoke transform callback error: %v", err)
return nil, nil, err
}
newOpts := opts
var newProps resource.PropertyMap
var response pulumirpc.TransformInvokeResponse
err = proto.Unmarshal(resp.Response, &response)
if err != nil {
return nil, nil, fmt.Errorf("unmarshaling response: %w", err)
}
if response.Options != nil {
newOpts = response.Options
}
newProps = args
if response.Args != nil {
newProps, err = plugin.UnmarshalProperties(response.Args, margs)
if err != nil {
return nil, nil, err
}
}
logging.V(5).Infof("Invoke transform: props=%v opts=%v", newProps, newOpts)
return newProps, newOpts, nil
}, nil
}
func (rm *resmon) RegisterStackTransform(ctx context.Context, cb *pulumirpc.Callback) (*emptypb.Empty, error) {
rm.stackTransformsLock.Lock()
defer rm.stackTransformsLock.Unlock()
if cb.Target == "" {
return nil, errors.New("target must be specified")
}
wrapped, err := rm.wrapTransformCallback(cb)
if err != nil {
return nil, err
}
rm.stackTransforms = append(rm.stackTransforms, wrapped)
return &emptypb.Empty{}, nil
}
func (rm *resmon) RegisterStackInvokeTransform(ctx context.Context, cb *pulumirpc.Callback) (*emptypb.Empty, error) {
rm.stackInvokeTransformsLock.Lock()
defer rm.stackInvokeTransformsLock.Unlock()
if cb.Target == "" {
return nil, errors.New("target must be specified")
}
wrapped, err := rm.wrapInvokeTransformCallback(cb)
if err != nil {
return nil, err
}
rm.stackInvokeTransforms = append(rm.stackInvokeTransforms, wrapped)
return &emptypb.Empty{}, nil
}
// inheritFromParent returns a new goal that inherits from the given parent goal.
// Currently only inherits DeletedWith from parent.
func inheritFromParent(child resource.Goal, parent resource.Goal) *resource.Goal {
goal := child
if goal.DeletedWith == "" {
goal.DeletedWith = parent.DeletedWith
}
return &goal
}
type sourcePositions struct {
projectRoot string
}
func newSourcePositions(projectRoot string) *sourcePositions {
if projectRoot == "" {
projectRoot = "/"
} else {
contract.Assertf(filepath.IsAbs(projectRoot), "projectRoot is not an absolute path")
projectRoot = filepath.Clean(projectRoot)
}
return &sourcePositions{projectRoot: projectRoot}
}
func (s *sourcePositions) parseSourcePosition(raw *pulumirpc.SourcePosition) (string, error) {
if raw == nil {
return "", nil
}
if raw.Line <= 0 {
return "", fmt.Errorf("invalid line number %v", raw.Line)
}
col := ""
if raw.Column != 0 {
if raw.Column < 0 {
return "", fmt.Errorf("invalid column number %v", raw.Column)
}
col = "," + strconv.FormatInt(int64(raw.Column), 10)
}
posURL, err := url.Parse(raw.Uri)
if err != nil {
return "", err
}
if posURL.Scheme != "file" {
return "", fmt.Errorf("unrecognized scheme %q", posURL.Scheme)
}
file := filepath.FromSlash(posURL.Path)
if !filepath.IsAbs(file) {
return "", errors.New("source positions must include absolute paths")
}
rel, err := filepath.Rel(s.projectRoot, file)
if err != nil {
return "", fmt.Errorf("making relative path: %w", err)
}
posURL.Scheme = "project"
posURL.Path = "/" + filepath.ToSlash(rel)
posURL.Fragment = fmt.Sprintf("%v%s", raw.Line, col)
return posURL.String(), nil
}
// Allow getFromRequest to accept any gRPC request that has a source position (ReadResourceRequest,
// RegisterResourceRequest, ResourceInvokeRequest, and CallRequest).
type hasSourcePosition interface {
GetSourcePosition() *pulumirpc.SourcePosition
}
// getFromRequest returns any source position information from an incoming request.
func (s *sourcePositions) getFromRequest(req hasSourcePosition) string {
pos, err := s.parseSourcePosition(req.GetSourcePosition())
if err != nil {
logging.V(5).Infof("parsing source position %#v: %v", req.GetSourcePosition(), err)
return ""
}
return pos
}
// requestFromNodeJS returns true if the request is coming from a Node.js language runtime
// or SDK. This is determined by checking if the request has a "pulumi-runtime" metadata
// header with a value of "nodejs". If no "pulumi-runtime" header is present, then it
// checks if the request has a "user-agent" metadata header that has a value that starts
// with "grpc-node-js/".
func requestFromNodeJS(ctx context.Context) bool {
if md, hasMetadata := metadata.FromIncomingContext(ctx); hasMetadata {
// Check for the "pulumi-runtime" header first.
// We'll always respect this header value when present.
if runtime, ok := md["pulumi-runtime"]; ok {
return len(runtime) == 1 && runtime[0] == "nodejs"
}
// Otherwise, check the "user-agent" header.
if ua, ok := md["user-agent"]; ok {
return len(ua) == 1 && strings.HasPrefix(ua[0], "grpc-node-js/")
}
}
return false
}
// transformAliasForNodeJSCompat transforms the alias from the legacy Node.js values to properly specified values.
func transformAliasForNodeJSCompat(alias *pulumirpc.Alias) *pulumirpc.Alias {
switch a := alias.Alias.(type) {
case *pulumirpc.Alias_Spec_:
// The original implementation in the Node.js SDK did not specify aliases correctly:
//
// - It did not set NoParent when it should have, but instead set Parent to empty.
// - It set NoParent to true and left Parent empty when both the alias and resource had no Parent specified.
//
// To maintain compatibility with such versions of the Node.js SDK, we transform these incorrectly
// specified aliases into properly specified ones that work with this implementation of the engine:
//
// - { Parent: "", NoParent: false } -> { Parent: "", NoParent: true }
// - { Parent: "", NoParent: true } -> { Parent: "", NoParent: false }
spec := &pulumirpc.Alias_Spec{
Name: a.Spec.Name,
Type: a.Spec.Type,
Stack: a.Spec.Stack,
Project: a.Spec.Project,
}
switch p := a.Spec.Parent.(type) {
case *pulumirpc.Alias_Spec_ParentUrn:
if p.ParentUrn == "" {
spec.Parent = &pulumirpc.Alias_Spec_NoParent{NoParent: true}
} else {
spec.Parent = p
}
case *pulumirpc.Alias_Spec_NoParent:
if p.NoParent {
spec.Parent = nil
} else {
spec.Parent = p
}
default:
spec.Parent = &pulumirpc.Alias_Spec_NoParent{NoParent: true}
}
return &pulumirpc.Alias{
Alias: &pulumirpc.Alias_Spec_{
Spec: spec,
},
}
}
return alias
}
func (rm *resmon) resolveProvider(
provider string, providers map[string]string, parent resource.URN, pkg tokens.Package,
) string {
if provider != "" {
return provider
}
if prov, ok := providers[string(pkg)]; ok {
return prov
}
if parent != "" {
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
if parentsProvider, ok := rm.componentProviders[parent]; ok {
if prov, ok := parentsProvider[string(pkg)]; ok {
return prov
}
}
}
return ""
}
// Turn the GRPC status into a message, which can later be logged. Currently we only support a subset
// of the possible details types, which can be expanded later. If the details type is not recognized, we
// still return the message, but will leave out the details. This will allow us to be forward compatible
// when new details types are added.
func statusToMessage(st *status.Status, inputs resource.PropertyMap) string {
message := st.Message()
for i, d := range st.Details() {
if i == 0 && message != "" {
message = message + ":"
}
switch d := d.(type) {
case *pulumirpc.InputPropertiesError:
props := resource.NewObjectProperty(inputs)
for _, err := range d.GetErrors() {
propertyPath, e := resource.ParsePropertyPath(err.GetPropertyPath())
if e == nil {
value, ok := propertyPath.Get(props)
if ok {
message = fmt.Sprintf("%v\n\t\t- property %v with value '%v' has a problem: %v",
message, err.GetPropertyPath(), value, err.GetReason())
continue
}
}
message = fmt.Sprintf("%v\n\t\t- property %v has a problem: %v",
message, err.GetPropertyPath(), err.GetReason())
}
}
}
return message
}
// RegisterResource is invoked by a language process when a new resource has been allocated.
func (rm *resmon) RegisterResource(ctx context.Context,
req *pulumirpc.RegisterResourceRequest,
) (*pulumirpc.RegisterResourceResponse, error) {
// Communicate the type, name, and object information to the iterator that is awaiting us.
name := req.GetName()
custom := req.GetCustom()
remote := req.GetRemote()
parent, err := resource.ParseOptionalURN(req.GetParent())
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent URN: %s", err))
}
id := resource.ID(req.GetImportId())
sourcePosition := rm.sourcePositions.getFromRequest(req)
// Custom resources must have a three-part type so that we can 1) identify if they are providers and 2) retrieve the
// provider responsible for managing a particular resource (based on the type's Package).
var t tokens.Type
if custom || remote {
t, err = tokens.ParseTypeToken(req.GetType())
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, err.Error())
}
} else {
// Component resources may have any format type.
t = tokens.Type(req.GetType())
}
label := fmt.Sprintf("ResourceMonitor.RegisterResource(%s,%s)", t, name)
// We need to build the full alias spec list here, so we can pass it to transforms.
aliases := []*pulumirpc.Alias{}
for _, aliasURN := range req.GetAliasURNs() {
aliases = append(aliases, &pulumirpc.Alias{Alias: &pulumirpc.Alias_Urn{Urn: aliasURN}})
}
// We assume aliases are properly specified. However, if a request hasn't explicitly
// indicated that it is using properly specified aliases and the request is coming
// from Node.js, transform the aliases from the incorrect Node.js values to properly
// specified values, to maintain backward compatibility for users of older Node.js
// SDKs that aren't sending properly specified aliases.
transformAliases := !req.GetAliasSpecs() && requestFromNodeJS(ctx)
for _, aliasObject := range req.GetAliases() {
if transformAliases {
aliasObject = transformAliasForNodeJSCompat(aliasObject)
}
aliases = append(aliases, aliasObject)
}
var deleteBeforeReplace *bool
// Technically DeleteBeforeReplaceDefined should be used to decided if DeleteBeforeReplace should be looked at or
// not. However the Go sdk doesn't set Defined so we have a fallback here of respecting this field if either Defined
// is set or DeleteBeforeReplace is true.
if req.GetDeleteBeforeReplaceDefined() || req.GetDeleteBeforeReplace() {
deleteBeforeReplace = &req.DeleteBeforeReplace
}
props, err := plugin.UnmarshalProperties(
req.GetObject(), plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
ComputeAssetHashes: true,
KeepSecrets: true,
KeepResources: true,
KeepOutputValues: true,
UpgradeToOutputValues: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, err
}
// Before we pass the props to the transform function we need to ensure that they correctly carry any dependency
// information.
dependencies := mapset.NewSet[resource.URN]()
for _, dependingURN := range req.GetDependencies() {
urn, err := resource.ParseURN(dependingURN)
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency URN: %s", err))
}
dependencies.Add(urn)
}
propertyDependencies := make(map[resource.PropertyKey]mapset.Set[resource.URN])
if len(req.GetPropertyDependencies()) == 0 && !remote {
// If this request did not specify property dependencies, treat each property as depending on every resource
// in the request's dependency list. We don't need to do this when remote is true, because all clients that
// support remote already support passing property dependencies, so there's no need to backfill here.
for pk := range props {
propertyDependencies[pk] = dependencies
}
} else {
// Otherwise, unmarshal the per-property dependency information.
for pk, pd := range req.GetPropertyDependencies() {
deps := mapset.NewSet[resource.URN]()
for _, d := range pd.Urns {
urn, err := resource.ParseURN(d)
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency on property %s URN: %s", pk, err))
}
deps.Add(urn)
}
propertyDependencies[resource.PropertyKey(pk)] = deps
}
}
// If we're running any transforms we need to update all the property values to Outputs to track dependencies.
if len(req.Transforms) > 0 {
props = upgradeOutputValues(props, propertyDependencies)
}
provider := req.GetProvider()
if custom || remote {
provider = rm.resolveProvider(req.GetProvider(), req.GetProviders(), parent, t.Package())
}
opts := &pulumirpc.TransformResourceOptions{
DependsOn: req.GetDependencies(),
Protect: req.GetProtect(),
IgnoreChanges: req.GetIgnoreChanges(),
ReplaceOnChanges: req.GetReplaceOnChanges(),
Version: req.GetVersion(),
Aliases: aliases,
Provider: provider,
Providers: req.GetProviders(),
CustomTimeouts: req.GetCustomTimeouts(),
PluginDownloadUrl: req.GetPluginDownloadURL(),
RetainOnDelete: req.GetRetainOnDelete(),
DeletedWith: req.GetDeletedWith(),
DeleteBeforeReplace: deleteBeforeReplace,
AdditionalSecretOutputs: req.GetAdditionalSecretOutputs(),
PluginChecksums: req.GetPluginChecksums(),
}
// This might be a resource registation for a resource that another process requested to be constructed. If so we'll
// have saved the pending transforms for this and we should use those rather than what is on the request.
var transforms []TransformFunction
pendingKey := fmt.Sprintf("%s::%s::%s", parent, t, name)
err = func() error {
rm.pendingTransformsLock.Lock()
defer rm.pendingTransformsLock.Unlock()
if pending, ok := rm.pendingTransforms[pendingKey]; ok {
transforms = pending
} else {
transforms, err = slice.MapError(req.Transforms, rm.wrapTransformCallback)
if err != nil {
return err
}
// We only need to save this for remote calls
if remote && len(transforms) > 0 {
rm.pendingTransforms[pendingKey] = transforms
}
}
return nil
}()
if err != nil {
return nil, err
}
// Before we calculate anything else run the transformations. First run the transforms for this resource,
// then it's parents etc etc
for _, transform := range transforms {
newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts)
if err != nil {
return nil, err
}
props = newProps
opts = newOpts
}
// Lookup our parents transformations and run those
err = func() error {
// Function exists to scope the lock
rm.resourceTransformsLock.Lock()
defer rm.resourceTransformsLock.Unlock()
rm.parentsLock.Lock()
defer rm.parentsLock.Unlock()
current := parent
for current != "" {
if transforms, ok := rm.resourceTransforms[current]; ok {
for _, transform := range transforms {
newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts)
if err != nil {
return err
}
props = newProps
opts = newOpts
}
}
current = rm.parents[current]
}
return nil
}()
if err != nil {
return nil, err
}
// Then lock the stack transformations and run all of those
err = func() error {
// Function exists to scope the lock
rm.stackTransformsLock.Lock()
defer rm.stackTransformsLock.Unlock()
for _, transform := range rm.stackTransforms {
newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts)
if err != nil {
return err
}
props = newProps
opts = newOpts
}
return nil
}()
if err != nil {
return nil, err
}
// We handle updating the providers map to include the providers field of the parent if
// both the current resource and its parent is a component resource.
func() {
// Function exists to scope the lock
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
if parentsProviders, parentIsComponent := rm.componentProviders[parent]; !custom &&
parent != "" && parentIsComponent {
for k, v := range parentsProviders {
if opts.Providers == nil {
opts.Providers = map[string]string{}
}
if _, ok := opts.Providers[k]; !ok {
opts.Providers[k] = v
}
}
}
}()
var providerRef providers.Reference
var providerRefs map[string]string
if custom && !providers.IsProviderType(t) || remote {
providerReq, err := parseProviderRequest(
t.Package(), opts.GetVersion(),
opts.GetPluginDownloadUrl(), opts.GetPluginChecksums(), nil)
if err != nil {
return nil, err
}
packageRef := req.GetPackageRef()
if packageRef != "" {
var has bool
providerReq, has = rm.packageRefMap[packageRef]
if !has {
return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
}
}
providerRef, err = rm.getProviderReference(rm.defaultProviders, providerReq, opts.GetProvider())
if err != nil {
return nil, err
}
providerRefs = make(map[string]string, len(opts.GetProviders()))
for name, provider := range opts.GetProviders() {
ref, err := rm.getProviderReference(rm.defaultProviders, providerReq, provider)
if err != nil {
return nil, err
}
providerRefs[name] = ref.String()
}
}
parsedAliases := []resource.Alias{}
for _, aliasObject := range opts.Aliases {
aliasSpec := aliasObject.GetSpec()
var alias resource.Alias
if aliasSpec != nil {
alias = resource.Alias{
Name: aliasSpec.Name,
Type: aliasSpec.Type,
Stack: aliasSpec.Stack,
Project: aliasSpec.Project,
}
switch parent := aliasSpec.GetParent().(type) {
case *pulumirpc.Alias_Spec_ParentUrn:
// Technically an SDK shouldn't set `parent` at all to specify the default parent, but both NodeJS and
// Python have buggy SDKs that set parent to an empty URN to specify the default parent. We handle this
// case here to maintain backward compatibility with older SDKs but it would be good to fix this to be
// strict in V4.
parentURN, err := resource.ParseOptionalURN(parent.ParentUrn)
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent alias URN: %s", err))
}
alias.Parent = parentURN
case *pulumirpc.Alias_Spec_NoParent:
alias.NoParent = parent.NoParent
}
} else {
urn, err := resource.ParseURN(aliasObject.GetUrn())
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid alias URN: %s", err))
}
alias = resource.Alias{URN: urn}
}
parsedAliases = append(parsedAliases, alias)
}
// Reparse the dependency information from any transformation results
if len(req.Transforms) > 0 {
dependencies = mapset.NewSet[resource.URN]()
for _, dependingURN := range opts.DependsOn {
urn, err := resource.ParseURN(dependingURN)
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency URN: %s", err))
}
dependencies.Add(urn)
}
// Now we've run the transforms we can rebuild the property dependency maps. If we have output values we can add the
// dependencies from them to the dependencies map we send to the provider and save to state.
propertyDependencies = make(map[resource.PropertyKey]mapset.Set[resource.URN])
for key, output := range props {
deps := mapset.NewSet[resource.URN]()
addOutputDependencies(deps, output)
propertyDependencies[key] = deps
// Also add these to the overall dependencies
dependencies = dependencies.Union(deps)
}
} else {
// If we ran transforms we would have merged all the dependencies togther already, but if we didn't we want to
// ensure any output values add their dependencies to the dependencies map we send to the provider.
for key, output := range props {
if propertyDependencies[key] == nil {
propertyDependencies[key] = mapset.NewSet[resource.URN]()
}
addOutputDependencies(propertyDependencies[key], output)
}
}
rawDependencies := dependencies.ToSlice()
rawPropertyDependencies := make(map[resource.PropertyKey][]resource.URN)
for key, deps := range propertyDependencies {
rawPropertyDependencies[key] = deps.ToSlice()
}
if providers.IsProviderType(t) {
if opts.GetVersion() != "" {
version, err := semver.Parse(opts.GetVersion())
if err != nil {
return nil, fmt.Errorf("%s: passed invalid version: %w", label, err)
}
providers.SetProviderVersion(props, &version)
}
if opts.GetPluginDownloadUrl() != "" {
providers.SetProviderURL(props, opts.GetPluginDownloadUrl())
}
if req.GetPackageRef() != "" {
// If the provider resource has a package ref then we need to set all it's input fields as in
// newRegisterDefaultProviderEvent.
packageRef := req.GetPackageRef()
providerReq, has := rm.packageRefMap[packageRef]
if !has {
return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
}
if providerReq.Version() != nil {
providers.SetProviderVersion(props, providerReq.Version())
}
if providerReq.PluginDownloadURL() != "" {
providers.SetProviderURL(props, providerReq.PluginDownloadURL())
}
if providerReq.PluginChecksums() != nil {
providers.SetProviderChecksums(props, providerReq.PluginChecksums())
}
if providerReq.Parameterization() != nil {
providers.SetProviderName(props, providerReq.Name())
providers.SetProviderParameterization(props, providerReq.Parameterization())
}
}
// Make sure that an explicit provider which doesn't specify its plugin gets the
// same plugin as the default provider for the package.
defaultProvider, ok := rm.defaultProviders.defaultProviderInfo[providers.GetProviderPackage(t)]
if ok && opts.GetVersion() == "" && opts.GetPluginDownloadUrl() == "" {
if defaultProvider.Version != nil {
providers.SetProviderVersion(props, defaultProvider.Version)
}
if defaultProvider.PluginDownloadURL != "" {
providers.SetProviderURL(props, defaultProvider.PluginDownloadURL)
}
}
}
protect := opts.Protect
ignoreChanges := opts.IgnoreChanges
replaceOnChanges := opts.ReplaceOnChanges
retainOnDelete := opts.RetainOnDelete
deletedWith, err := resource.ParseOptionalURN(opts.GetDeletedWith())
if err != nil {
return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid DeletedWith URN: %s", err))
}
customTimeouts := opts.CustomTimeouts
additionalSecretOutputs := opts.GetAdditionalSecretOutputs()
// At this point we're going to forward these properties to the rest of the engine and potentially to providers. As
// we add features to the code above (most notably transforms) we could end up with more instances of `OutputValue`
// than the rest of the system historically expects. To minimize the disruption we downgrade `OutputValue`s with no
// dependencies down to `Computed` and `Secret` or their plain values. We only do this for non-remote resources.
// Remote resources already deal with `OutputValue`s and even though it would be more consistent to downgrade them
// here it would be a break change.
if !remote {
props = downgradeOutputValues(props)
}
logging.V(5).Infof(
"ResourceMonitor.RegisterResource received: t=%v, name=%v, custom=%v, #props=%v, parent=%v, protect=%v, "+
"provider=%v, deps=%v, deleteBeforeReplace=%v, ignoreChanges=%v, aliases=%v, customTimeouts=%v, "+
"providers=%v, replaceOnChanges=%v, retainOnDelete=%v, deletedWith=%v",
t, name, custom, len(props), parent, protect, providerRef, rawDependencies, opts.DeleteBeforeReplace, ignoreChanges,
parsedAliases, customTimeouts, providerRefs, replaceOnChanges, retainOnDelete, deletedWith)
// If this is a remote component, fetch its provider and issue the construct call. Otherwise, register the resource.
var result *RegisterResult
var outputDeps map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies
if remote {
provider, ok := rm.providers.GetProvider(providerRef)
if providers.IsDenyDefaultsProvider(providerRef) {
msg := diag.GetDefaultProviderDenied("").Message
return nil, fmt.Errorf(msg, t.Package().String(), t.String())
}
if !ok {
return nil, fmt.Errorf("unknown provider '%v'", providerRef)
}
// Invoke the provider's Construct RPC method.
options := plugin.ConstructOptions{
// We don't actually need to send a list of aliases to construct anymore because the engine does
// all alias construction.
Aliases: []resource.Alias{},
Dependencies: rawDependencies,
Protect: protect,
PropertyDependencies: rawPropertyDependencies,
Providers: providerRefs,
AdditionalSecretOutputs: additionalSecretOutputs,
DeletedWith: deletedWith,
IgnoreChanges: ignoreChanges,
ReplaceOnChanges: replaceOnChanges,
RetainOnDelete: retainOnDelete,
}
if customTimeouts != nil {
options.CustomTimeouts = &plugin.CustomTimeouts{
Create: customTimeouts.Create,
Update: customTimeouts.Update,
Delete: customTimeouts.Delete,
}
}
if opts.DeleteBeforeReplace != nil {
options.DeleteBeforeReplace = *opts.DeleteBeforeReplace
}
constructResult, err := provider.Construct(ctx, plugin.ConstructRequest{
Info: rm.constructInfo,
Type: t,
Name: name,
Parent: parent,
Inputs: props,
Options: options,
})
if err != nil {
if st, ok := status.FromError(err); ok {
message := statusToMessage(st, props)
rm.diagnostics.Errorf(diag.GetResourceInvalidError(constructResult.URN), t, name, message)
} else {
rm.diagnostics.Errorf(diag.GetResourceInvalidError(constructResult.URN), t, name, err)
}
rm.abortChan <- true
<-rm.cancel
return nil, rpcerror.New(codes.Unknown, "resource monitor shut down")
}
result = &RegisterResult{State: &resource.State{URN: constructResult.URN, Outputs: constructResult.Outputs}}
// The provider may have returned OutputValues in "Outputs", we need to downgrade them to Computed or
// Secret but also add them to the outputDeps map.
if constructResult.OutputDependencies == nil {
constructResult.OutputDependencies = map[resource.PropertyKey][]resource.URN{}
}
for k, v := range result.State.Outputs {
constructResult.OutputDependencies[k] = extendOutputDependencies(constructResult.OutputDependencies[k], v)
}
outputDeps = map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies{}
for k, deps := range constructResult.OutputDependencies {
urns := make([]string, len(deps))
for i, d := range deps {
urns[i] = string(d)
}
outputDeps[string(k)] = &pulumirpc.RegisterResourceResponse_PropertyDependencies{Urns: urns}
}
} else {
additionalSecretKeys := slice.Prealloc[resource.PropertyKey](len(additionalSecretOutputs))
for _, name := range additionalSecretOutputs {
additionalSecretKeys = append(additionalSecretKeys, resource.PropertyKey(name))
}
var timeouts resource.CustomTimeouts
if customTimeouts != nil {
if customTimeouts.Create != "" {
seconds, err := generateTimeoutInSeconds(customTimeouts.Create)
if err != nil {
return nil, err
}
timeouts.Create = seconds
}
if customTimeouts.Delete != "" {
seconds, err := generateTimeoutInSeconds(customTimeouts.Delete)
if err != nil {
return nil, err
}
timeouts.Delete = seconds
}
if customTimeouts.Update != "" {
seconds, err := generateTimeoutInSeconds(customTimeouts.Update)
if err != nil {
return nil, err
}
timeouts.Update = seconds
}
}
goal := resource.NewGoal(t, name, custom, props, parent, protect, rawDependencies,
providerRef.String(), nil, rawPropertyDependencies, opts.DeleteBeforeReplace, ignoreChanges,
additionalSecretKeys, parsedAliases, id, &timeouts, replaceOnChanges, retainOnDelete, deletedWith,
sourcePosition,
)
if goal.Parent != "" {
rm.resGoalsLock.Lock()
parentGoal, ok := rm.resGoals[goal.Parent]
if ok {
goal = inheritFromParent(*goal, parentGoal)
}
rm.resGoalsLock.Unlock()
}
// Send the goal state to the engine.
step := &registerResourceEvent{
goal: goal,
done: make(chan *RegisterResult),
}
select {
case rm.regChan <- step:
case <-rm.cancel:
logging.V(5).Infof("ResourceMonitor.RegisterResource operation canceled, name=%s", name)
return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource registration")
}
// Now block waiting for the operation to finish.
select {
case result = <-step.done:
case <-rm.cancel:
logging.V(5).Infof("ResourceMonitor.RegisterResource operation canceled, name=%s", name)
return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on step's done channel")
}
if result != nil && result.Result != ResultStateSuccess && !req.GetSupportsResultReporting() {
return nil, rpcerror.New(codes.Internal, "resource registration failed")
}
if result != nil && result.State != nil && result.State.URN != "" {
rm.resGoalsLock.Lock()
rm.resGoals[result.State.URN] = *goal
rm.resGoalsLock.Unlock()
}
}
if result != nil && result.State != nil && result.State.URN != "" {
// We've got a safe URN now, save the parent and transformations
func() {
rm.parentsLock.Lock()
defer rm.parentsLock.Unlock()
rm.parents[result.State.URN] = parent
}()
func() {
rm.resourceTransformsLock.Lock()
defer rm.resourceTransformsLock.Unlock()
rm.resourceTransforms[result.State.URN] = transforms
}()
if !custom {
func() {
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
rm.componentProviders[result.State.URN] = opts.GetProviders()
}()
}
}
// Filter out partially-known values if the requestor does not support them.
outputs := result.State.Outputs
// Local ComponentResources may contain unresolved resource refs, so ignore those outputs.
if !req.GetCustom() && !remote {
// In the case of a SameStep, the old resource outputs are returned to the language host after the step is
// executed. The outputs of a ComponentResource may depend on resources that have not been registered at the
// time the ComponentResource is itself registered, as the outputs are set by a later call to
// RegisterResourceOutputs. Therefore, when the SameStep returns the old resource outputs for a
// ComponentResource, it may return references to resources that have not yet been registered, which will cause
// the SDK's calls to getResource to fail when it attempts to resolve those references.
//
// Work on a more targeted fix is tracked in https://github.com/pulumi/pulumi/issues/5978
outputs = resource.PropertyMap{}
}
if !req.GetSupportsPartialValues() {
logging.V(5).Infof("stripping unknowns from RegisterResource response for urn %v", result.State.URN)
filtered := resource.PropertyMap{}
for k, v := range outputs {
if !v.ContainsUnknowns() {
filtered[k] = v
}
}
outputs = filtered
}
// TODO(@platform):
// Currently component resources ignore these options:
// • ignoreChanges
// • customTimeouts
// • additionalSecretOutputs
// • replaceOnChanges
// • retainOnDelete
// • deletedWith
// Revisit these semantics in Pulumi v4.0
// See this issue for more: https://github.com/pulumi/pulumi/issues/9704
if !custom {
rm.checkComponentOption(result.State.URN, "ignoreChanges", func() bool {
return len(ignoreChanges) > 0
})
rm.checkComponentOption(result.State.URN, "customTimeouts", func() bool {
if customTimeouts == nil {
return false
}
hasUpdateTimeout := customTimeouts.Update != ""
hasCreateTimeout := customTimeouts.Create != ""
hasDeleteTimeout := customTimeouts.Delete != ""
return hasCreateTimeout || hasUpdateTimeout || hasDeleteTimeout
})
rm.checkComponentOption(result.State.URN, "additionalSecretOutputs", func() bool {
return len(additionalSecretOutputs) > 0
})
rm.checkComponentOption(result.State.URN, "replaceOnChanges", func() bool {
return len(replaceOnChanges) > 0
})
rm.checkComponentOption(result.State.URN, "retainOnDelete", func() bool {
return retainOnDelete
})
rm.checkComponentOption(result.State.URN, "deletedWith", func() bool {
return deletedWith != ""
})
}
logging.V(5).Infof(
"ResourceMonitor.RegisterResource operation finished: t=%v, urn=%v, #outs=%v",
result.State.Type, result.State.URN, len(outputs))
// Finally, unpack the response into properties that we can return to the language runtime. This mostly includes
// an ID, URN, and defaults and output properties that will all be blitted back onto the runtime object.
obj, err := plugin.MarshalProperties(outputs, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepSecrets: req.GetAcceptSecrets(),
KeepResources: req.GetAcceptResources(),
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, err
}
// Assert that we never leak the unconfigured provider ID to the language host.
contract.Assertf(
!providers.IsProviderType(result.State.Type) || result.State.ID != providers.UnconfiguredID,
"provider resource %s has unconfigured ID", result.State.URN)
reason := pulumirpc.Result_SUCCESS
if result.Result == ResultStateSkipped {
reason = pulumirpc.Result_SKIP
} else if result.Result == ResultStateFailed {
reason = pulumirpc.Result_FAIL
}
return &pulumirpc.RegisterResourceResponse{
Urn: string(result.State.URN),
Id: string(result.State.ID),
Object: obj,
PropertyDependencies: outputDeps,
Result: reason,
}, nil
}
// checkComponentOption generates a warning message on the resource
// 'urn' if 'check' returns true.
// This function is intended to validate options passed to component resources,
// so urn is expected to refer to a component.
func (rm *resmon) checkComponentOption(urn resource.URN, optName string, check func() bool) {
if check() {
logging.V(10).Infof("The option '%s' has no automatic effect on component resource '%s', "+
"ensure it is handled correctly in the component code.", optName, urn)
}
}
// RegisterResourceOutputs records some new output properties for a resource that have arrived after its initial
// provisioning. These will make their way into the eventual checkpoint state file for that resource.
func (rm *resmon) RegisterResourceOutputs(ctx context.Context,
req *pulumirpc.RegisterResourceOutputsRequest,
) (*emptypb.Empty, error) {
// Obtain and validate the message's inputs (a URN plus the output property map).
urn, err := resource.ParseURN(req.Urn)
if err != nil {
return nil, fmt.Errorf("invalid resource URN: %w", err)
}
label := fmt.Sprintf("ResourceMonitor.RegisterResourceOutputs(%s)", urn)
outs, err := plugin.UnmarshalProperties(
req.GetOutputs(), plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
ComputeAssetHashes: true,
KeepSecrets: true,
KeepResources: true,
WorkingDirectory: rm.workingDirectory,
})
if err != nil {
return nil, fmt.Errorf("cannot unmarshal output properties: %w", err)
}
logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs received: urn=%v, #outs=%v", urn, len(outs))
// Now send the step over to the engine to perform.
step := &registerResourceOutputsEvent{
urn: urn,
outputs: outs,
done: make(chan bool),
}
select {
case rm.regOutChan <- step:
case <-rm.cancel:
logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs operation canceled, urn=%s", urn)
return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource outputs")
}
// Now block waiting for the operation to finish.
select {
case <-step.done:
case <-rm.cancel:
logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs operation canceled, urn=%s", urn)
return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on output step's done channel")
}
logging.V(5).Infof(
"ResourceMonitor.RegisterResourceOutputs operation finished: urn=%v, #outs=%v", urn, len(outs))
return &emptypb.Empty{}, nil
}
type registerResourceEvent struct {
goal *resource.Goal // the resource goal state produced by the iterator.
done chan *RegisterResult // the channel to communicate with after the resource state is available.
}
var _ RegisterResourceEvent = (*registerResourceEvent)(nil)
func (g *registerResourceEvent) event() {}
func (g *registerResourceEvent) Goal() *resource.Goal {
return g.goal
}
func (g *registerResourceEvent) Done(result *RegisterResult) {
// Communicate the resulting state back to the RPC thread, which is parked awaiting our reply.
g.done <- result
}
type registerResourceOutputsEvent struct {
urn resource.URN // the URN to which this completion applies.
outputs resource.PropertyMap // an optional property bag for output properties.
done chan bool // the channel to communicate with after the operation completes.
}
var _ RegisterResourceOutputsEvent = (*registerResourceOutputsEvent)(nil)
func (g *registerResourceOutputsEvent) event() {}
func (g *registerResourceOutputsEvent) URN() resource.URN {
return g.urn
}
func (g *registerResourceOutputsEvent) Outputs() resource.PropertyMap {
return g.outputs
}
func (g *registerResourceOutputsEvent) Done() {
// Communicate the resulting state back to the RPC thread, which is parked awaiting our reply.
g.done <- true
}
type readResourceEvent struct {
id resource.ID
name string
baseType tokens.Type
provider string
parent resource.URN
props resource.PropertyMap
dependencies []resource.URN
additionalSecretOutputs []resource.PropertyKey
sourcePosition string
done chan *ReadResult
}
var _ ReadResourceEvent = (*readResourceEvent)(nil)
func (g *readResourceEvent) event() {}
func (g *readResourceEvent) ID() resource.ID { return g.id }
func (g *readResourceEvent) Name() string { return g.name }
func (g *readResourceEvent) Type() tokens.Type { return g.baseType }
func (g *readResourceEvent) Provider() string { return g.provider }
func (g *readResourceEvent) Parent() resource.URN { return g.parent }
func (g *readResourceEvent) Properties() resource.PropertyMap { return g.props }
func (g *readResourceEvent) Dependencies() []resource.URN { return g.dependencies }
func (g *readResourceEvent) AdditionalSecretOutputs() []resource.PropertyKey {
return g.additionalSecretOutputs
}
func (g *readResourceEvent) SourcePosition() string { return g.sourcePosition }
func (g *readResourceEvent) Done(result *ReadResult) {
g.done <- result
}
func generateTimeoutInSeconds(timeout string) (float64, error) {
duration, err := time.ParseDuration(timeout)
if err != nil {
return 0, fmt.Errorf("unable to parse customTimeout Value %s", timeout)
}
return duration.Seconds(), nil
}
func decorateResourceSpans(span opentracing.Span, method string, req, resp interface{}, grpcError error) {
if req == nil {
return
}
switch method {
case "/pulumirpc.ResourceMonitor/Invoke":
span.SetTag("pulumi-decorator", req.(*pulumirpc.ResourceInvokeRequest).Tok)
case "/pulumirpc.ResourceMonitor/ReadResource":
span.SetTag("pulumi-decorator", req.(*pulumirpc.ReadResourceRequest).Type)
case "/pulumirpc.ResourceMonitor/RegisterResource":
span.SetTag("pulumi-decorator", req.(*pulumirpc.RegisterResourceRequest).Type)
}
}
// downgradeOutputValues recursively replaces all Output values with `Computed`, `Secret`, or their plain
// value. This loses all dependency information.
func downgradeOutputValues(v resource.PropertyMap) resource.PropertyMap {
var downgradeOutputPropertyValue func(v resource.PropertyValue) resource.PropertyValue
downgradeOutputPropertyValue = func(v resource.PropertyValue) resource.PropertyValue {
if v.IsOutput() {
output := v.OutputValue()
var result resource.PropertyValue
if output.Known {
result = downgradeOutputPropertyValue(output.Element)
} else {
result = resource.MakeComputed(resource.NewStringProperty(""))
}
if output.Secret {
result = resource.MakeSecret(result)
}
return result
}
if v.IsObject() {
return resource.NewObjectProperty(downgradeOutputValues(v.ObjectValue()))
}
if v.IsArray() {
result := make([]resource.PropertyValue, len(v.ArrayValue()))
for i, elem := range v.ArrayValue() {
result[i] = downgradeOutputPropertyValue(elem)
}
return resource.NewArrayProperty(result)
}
if v.IsSecret() {
return resource.MakeSecret(downgradeOutputPropertyValue(v.SecretValue().Element))
}
if v.IsResourceReference() {
ref := v.ResourceReferenceValue()
return resource.NewResourceReferenceProperty(
resource.ResourceReference{
URN: ref.URN,
ID: downgradeOutputPropertyValue(ref.ID),
PackageVersion: ref.PackageVersion,
})
}
return v
}
result := make(resource.PropertyMap)
for k, pv := range v {
result[k] = downgradeOutputPropertyValue(pv)
}
return result
}
func upgradeOutputValues(
v resource.PropertyMap, propertyDependencies map[resource.PropertyKey]mapset.Set[resource.URN],
) resource.PropertyMap {
// We assume that by the time this is being called we've upgraded all Secret/Computed values to outputs. We just
// need to add the dependency information from propertyDependencies.
result := make(resource.PropertyMap)
for k, pv := range v {
if deps, has := propertyDependencies[k]; has {
currentDeps := mapset.NewSet[resource.URN]()
addOutputDependencies(currentDeps, pv)
if currentDeps.IsSuperset(deps) {
// already has the deps, just copy across
result[k] = pv
} else {
var output resource.Output
if pv.IsOutput() {
output = pv.OutputValue()
} else {
output = resource.Output{
Element: pv,
Known: true,
}
}
// Merge all the dependencies from the propertyDependencies map with any current dependencies on this
// output value.
currentDeps.Clear()
currentDeps.Append(output.Dependencies...)
currentDeps = currentDeps.Union(deps)
output.Dependencies = currentDeps.ToSlice()
result[k] = resource.NewOutputProperty(output)
}
} else {
// no deps just copy across
result[k] = pv
}
}
return result
}
func extendOutputDependencies(deps []resource.URN, v resource.PropertyValue) []resource.URN {
set := mapset.NewSet(deps...)
addOutputDependencies(set, v)
return set.ToSlice()
}
func addOutputDependencies(deps mapset.Set[resource.URN], v resource.PropertyValue) {
if v.IsOutput() {
output := v.OutputValue()
if output.Known {
addOutputDependencies(deps, output.Element)
}
deps.Append(output.Dependencies...)
}
if v.IsResourceReference() {
ref := v.ResourceReferenceValue()
addOutputDependencies(deps, ref.ID)
}
if v.IsObject() {
for _, elem := range v.ObjectValue() {
addOutputDependencies(deps, elem)
}
}
if v.IsArray() {
for _, elem := range v.ArrayValue() {
addOutputDependencies(deps, elem)
}
}
if v.IsSecret() {
addOutputDependencies(deps, v.SecretValue().Element)
}
}