// Copyright 2016-2023, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package deploy import ( "context" "encoding/json" "errors" "fmt" "net/url" "path/filepath" "reflect" "strconv" "strings" "sync" "time" "github.com/blang/semver" "github.com/google/uuid" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" opentracing "github.com/opentracing/opentracing-go" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers" interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/env" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "github.com/pulumi/pulumi/sdk/v3/go/common/resource/config" "github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin" "github.com/pulumi/pulumi/sdk/v3/go/common/slice" "github.com/pulumi/pulumi/sdk/v3/go/common/tokens" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/util/result" "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go" mapset "github.com/deckarep/golang-set/v2" ) // EvalRunInfo provides information required to execute and deploy resources within a package. type EvalRunInfo struct { // the package metadata. Proj *workspace.Project `json:"proj" yaml:"proj"` // the package's working directory. Pwd string `json:"pwd" yaml:"pwd"` // the path to the program. Program string `json:"program" yaml:"program"` // the path to the project's directory. ProjectRoot string `json:"projectRoot,omitempty" yaml:"projectRoot,omitempty"` // any arguments to pass to the package. Args []string `json:"args,omitempty" yaml:"args,omitempty"` // the target being deployed into. Target *Target `json:"target,omitempty" yaml:"target,omitempty"` } // EvalRunInfoOptions provides options for configuring an evaluation source. type EvalSourceOptions struct { // true if the evaluation is producing resources for a dry-run/preview. DryRun bool // the degree of parallelism for resource operations (<=1 for serial). Parallel int // true to disable resource reference support. DisableResourceReferences bool // true to disable output value support. DisableOutputValues bool } // NewEvalSource returns a planning source that fetches resources by evaluating a package with a set of args and // a confgiuration map. This evaluation is performed using the given plugin context and may optionally use the // given plugin host (or the default, if this is nil). Note that closing the eval source also closes the host. func NewEvalSource( plugctx *plugin.Context, runinfo *EvalRunInfo, defaultProviderInfo map[tokens.Package]workspace.PluginSpec, opts EvalSourceOptions, ) Source { return &evalSource{ plugctx: plugctx, runinfo: runinfo, defaultProviderInfo: defaultProviderInfo, opts: opts, } } type evalSource struct { plugctx *plugin.Context // the plugin context. runinfo *EvalRunInfo // the directives to use when running the program. defaultProviderInfo map[tokens.Package]workspace.PluginSpec // the default provider versions for this source. opts EvalSourceOptions // options for the evaluation source. } func (src *evalSource) Close() error { return nil } // Project is the name of the project being run by this evaluation source. func (src *evalSource) Project() tokens.PackageName { return src.runinfo.Proj.Name } // Stack is the name of the stack being targeted by this evaluation source. func (src *evalSource) Stack() tokens.StackName { return src.runinfo.Target.Name } func (src *evalSource) Info() interface{} { return src.runinfo } // Iterate will spawn an evaluator coroutine and prepare to interact with it on subsequent calls to Next. func (src *evalSource) Iterate(ctx context.Context, providers ProviderSource) (SourceIterator, error) { tracingSpan := opentracing.SpanFromContext(ctx) // Decrypt the configuration. config, err := src.runinfo.Target.Config.Decrypt(src.runinfo.Target.Decrypter) if err != nil { return nil, fmt.Errorf("failed to decrypt config: %w", err) } // Keep track of any config keys that have secure values. configSecretKeys := src.runinfo.Target.Config.SecureKeys() configMap, err := src.runinfo.Target.Config.AsDecryptedPropertyMap(ctx, src.runinfo.Target.Decrypter) if err != nil { return nil, fmt.Errorf("failed to convert config to map: %w", err) } // First, fire up a resource monitor that will watch for and record resource creation. regChan := make(chan *registerResourceEvent) regOutChan := make(chan *registerResourceOutputsEvent) regReadChan := make(chan *readResourceEvent) mon, err := newResourceMonitor( src, providers, regChan, regOutChan, regReadChan, config, configSecretKeys, tracingSpan) if err != nil { return nil, fmt.Errorf("failed to start resource monitor: %w", err) } // Create a new iterator with appropriate channels, and gear up to go! iter := &evalSourceIterator{ mon: mon, src: src, regChan: regChan, regOutChan: regOutChan, regReadChan: regReadChan, finChan: make(chan error), } // Now invoke Run in a goroutine. All subsequent resource creation events will come in over the gRPC channel, // and we will pump them through the channel. If the Run call ultimately fails, we need to propagate the error. iter.forkRun(config, configSecretKeys, configMap) // Finally, return the fresh iterator that the caller can use to take things from here. return iter, nil } type evalSourceIterator struct { mon SourceResourceMonitor // the resource monitor, per iterator. src *evalSource // the owning eval source object. regChan chan *registerResourceEvent // the channel that contains resource registrations. regOutChan chan *registerResourceOutputsEvent // the channel that contains resource completions. regReadChan chan *readResourceEvent // the channel that contains read resource requests. finChan chan error // the channel that communicates completion. done bool // set to true when the evaluation is done. } func (iter *evalSourceIterator) Close() error { // Cancel the monitor and reclaim any associated resources. return iter.mon.Cancel() } func (iter *evalSourceIterator) ResourceMonitor() SourceResourceMonitor { return iter.mon } func (iter *evalSourceIterator) Next() (SourceEvent, error) { // If we are done, quit. if iter.done { return nil, nil } // Await the program to compute some more state and then inspect what it has to say. select { case reg := <-iter.regChan: contract.Assertf(reg != nil, "received a nil registerResourceEvent") goal := reg.Goal() logging.V(5).Infof("EvalSourceIterator produced a registration: t=%v,name=%v,#props=%v", goal.Type, goal.Name, len(goal.Properties)) return reg, nil case regOut := <-iter.regOutChan: contract.Assertf(regOut != nil, "received a nil registerResourceOutputsEvent") logging.V(5).Infof("EvalSourceIterator produced a completion: urn=%v,#outs=%v", regOut.URN(), len(regOut.Outputs())) return regOut, nil case read := <-iter.regReadChan: contract.Assertf(read != nil, "received a nil readResourceEvent") logging.V(5).Infoln("EvalSourceIterator produced a read") return read, nil case err := <-iter.finChan: // If we are finished, we can safely exit. The contract with the language provider is that this implies // that the language runtime has exited and so calling Close on the plugin is fine. iter.done = true if err != nil { if result.IsBail(err) { logging.V(5).Infof("EvalSourceIterator ended with bail.") } else { logging.V(5).Infof("EvalSourceIterator ended with an error: %v", err) } } return nil, err } } // forkRun performs the evaluation from a distinct goroutine. This function blocks until it's our turn to go. func (iter *evalSourceIterator) forkRun( config map[config.Key]string, configSecretKeys []config.Key, configPropertyMap resource.PropertyMap, ) { // Fire up the goroutine to make the RPC invocation against the language runtime. As this executes, calls // to queue things up in the resource channel will occur, and we will serve them concurrently. go func() { // Next, launch the language plugin. run := func() error { rt := iter.src.runinfo.Proj.Runtime.Name() rtopts := iter.src.runinfo.Proj.Runtime.Options() programInfo := plugin.NewProgramInfo( /* rootDirectory */ iter.src.runinfo.ProjectRoot, /* programDirectory */ iter.src.runinfo.Pwd, /* entryPoint */ iter.src.runinfo.Program, /* options */ rtopts) langhost, err := iter.src.plugctx.Host.LanguageRuntime(rt, programInfo) if err != nil { return fmt.Errorf("failed to launch language host %s: %w", rt, err) } contract.Assertf(langhost != nil, "expected non-nil language host %s", rt) // Now run the actual program. progerr, bail, err := langhost.Run(plugin.RunInfo{ MonitorAddress: iter.mon.Address(), Stack: iter.src.runinfo.Target.Name.String(), Project: string(iter.src.runinfo.Proj.Name), Pwd: iter.src.runinfo.Pwd, Args: iter.src.runinfo.Args, Config: config, ConfigSecretKeys: configSecretKeys, ConfigPropertyMap: configPropertyMap, DryRun: iter.src.opts.DryRun, Parallel: iter.src.opts.Parallel, Organization: string(iter.src.runinfo.Target.Organization), Info: programInfo, }) // Check if we were asked to Bail. This a special random constant used for that // purpose. if err == nil && bail { return result.BailErrorf("run bailed") } if err == nil && progerr != "" { // If the program had an unhandled error; propagate it to the caller. err = fmt.Errorf("an unhandled error occurred: %v", progerr) } return err } // Communicate the error, if it exists, or nil if the program exited cleanly. iter.finChan <- run() }() } // defaultProviders manages the registration of default providers. The default provider for a package is the provider // resource that will be used to manage resources that do not explicitly reference a provider. Default providers will // only be registered for packages that are used by resources registered by the user's Pulumi program. type defaultProviders struct { // A map of package identifiers to versions, used to disambiguate which plugin to load if no version is provided // by the language host. defaultProviderInfo map[tokens.Package]workspace.PluginSpec // A map of ProviderRequest strings to provider references, used to keep track of the set of default providers that // have already been loaded. providers map[string]providers.Reference config plugin.ConfigSource requests chan defaultProviderRequest providerRegChan chan<- *registerResourceEvent cancel <-chan bool } type defaultProviderResponse struct { ref providers.Reference err error } type defaultProviderRequest struct { req providers.ProviderRequest response chan<- defaultProviderResponse } func (d *defaultProviders) normalizeProviderRequest(req providers.ProviderRequest) providers.ProviderRequest { // Request that the engine instantiate a specific version of this provider, if one was requested. We'll figure out // what version to request by: // 1. Providing the Version field of the ProviderRequest verbatim, if it was provided, otherwise // 2. Querying the list of default versions provided to us on startup and returning the value associated with // the given package, if one exists, otherwise // 3. We give nothing to the engine and let the engine figure it out. // // As we tighen up our approach to provider versioning, 2 and 3 will go away and be replaced entirely by 1. 3 is // especially onerous because the engine selects the "newest" plugin available on the machine, which is generally // problematic for a lot of reasons. if req.Version() != nil { logging.V(5).Infof("normalizeProviderRequest(%s): using version %s from request", req, req.Version()) } else { if version := d.defaultProviderInfo[req.Package()].Version; version != nil { logging.V(5).Infof("normalizeProviderRequest(%s): default version hit on version %s", req, version) req = providers.NewProviderRequest( req.Package(), version, req.PluginDownloadURL(), req.PluginChecksums(), req.Parameterization()) } else { logging.V(5).Infof( "normalizeProviderRequest(%s): default provider miss, sending nil version to engine", req) } } if req.PluginDownloadURL() != "" { logging.V(5).Infof("normalizeProviderRequest(%s): using pluginDownloadURL %s from request", req, req.PluginDownloadURL()) } else { if pluginDownloadURL := d.defaultProviderInfo[req.Package()].PluginDownloadURL; pluginDownloadURL != "" { logging.V(5).Infof("normalizeProviderRequest(%s): default pluginDownloadURL hit on %s", req, pluginDownloadURL) req = providers.NewProviderRequest( req.Package(), req.Version(), pluginDownloadURL, req.PluginChecksums(), req.Parameterization()) } else { logging.V(5).Infof( "normalizeProviderRequest(%s): default pluginDownloadURL miss, sending empty string to engine", req) } } if req.PluginChecksums() != nil { logging.V(5).Infof("normalizeProviderRequest(%s): using pluginChecksums %v from request", req, req.PluginChecksums()) } else { if pluginChecksums := d.defaultProviderInfo[req.Package()].Checksums; pluginChecksums != nil { logging.V(5).Infof("normalizeProviderRequest(%s): default pluginChecksums hit on %v", req, pluginChecksums) req = providers.NewProviderRequest( req.Package(), req.Version(), req.PluginDownloadURL(), pluginChecksums, req.Parameterization()) } else { logging.V(5).Infof( "normalizeProviderRequest(%s): default pluginChecksums miss, sending empty map to engine", req) } } if req.Parameterization() != nil { logging.V(5).Infof("normalizeProviderRequest(%s): default parameterization miss, sending nil to engine", req) } else { logging.V(5).Infof("normalizeProviderRequest(%s): using parameterization %v from request", req, req.Parameterization()) // TODO: Should Parameterization be in defaultProviderInfo //if parameterization := d.defaultProviderInfo[req.Package()].Parameterization; parameterization != nil { // logging.V(5).Infof("normalizeProviderRequest(%s): default parameterization hit on %v", // req, parameterization) // req = providers.NewProviderRequest( // req.Version(), req.Package(), req.PluginDownloadURL(), req.PluginChecksums(), parameterization) //} else { // logging.V(5).Infof( // "normalizeProviderRequest(%s): default parameterization miss, sending nil to engine", req) //} } return req } // newRegisterDefaultProviderEvent creates a RegisterResourceEvent and completion channel that can be sent to the // engine to register a default provider resource for the indicated package. func (d *defaultProviders) newRegisterDefaultProviderEvent( req providers.ProviderRequest, ) (*registerResourceEvent, <-chan *RegisterResult, error) { // Attempt to get the config for the package. inputs, err := d.config.GetPackageConfig(req.Package()) if err != nil { return nil, nil, err } if req.Version() != nil { providers.SetProviderVersion(inputs, req.Version()) } if req.PluginDownloadURL() != "" { providers.SetProviderURL(inputs, req.PluginDownloadURL()) } if req.PluginChecksums() != nil { providers.SetProviderChecksums(inputs, req.PluginChecksums()) } if req.Parameterization() != nil { providers.SetProviderName(inputs, req.Name()) providers.SetProviderParameterization(inputs, req.Parameterization()) } // Create the result channel and the event. done := make(chan *RegisterResult) event := ®isterResourceEvent{ goal: resource.NewGoal( providers.MakeProviderType(req.Package()), req.DefaultName(), true, inputs, "", false, nil, "", nil, nil, nil, nil, nil, nil, "", nil, nil, false, "", ""), done: done, } return event, done, nil } // handleRequest services a single default provider request. If the request is for a default provider that we have // already loaded, we will return its reference. If the request is for a default provider that has not yet been // loaded, we will send a register resource request to the engine, wait for it to complete, and then cache and return // the reference of the loaded provider. // // Note that this function must not be called from two goroutines concurrently; it is the responsibility of d.serve() // to ensure this. func (d *defaultProviders) handleRequest(req providers.ProviderRequest) (providers.Reference, error) { logging.V(5).Infof("handling default provider request for package %s", req) req = d.normalizeProviderRequest(req) denyCreation, err := d.shouldDenyRequest(req) if err != nil { return providers.Reference{}, err } if denyCreation { logging.V(5).Infof("denied default provider request for package %s", req) return providers.NewDenyDefaultProvider(string(req.Package().Name())), nil } // Have we loaded this provider before? Use the existing reference, if so. // // Note that we are using the request's String as the key for the provider map. Go auto-derives hash and equality // functions for aggregates, but the one auto-derived for ProviderRequest does not have the semantics we want. The // use of a string key here is hacky but gets us the desired semantics - that ProviderRequest is a tuple of // optional value-typed Version and a package. ref, ok := d.providers[req.String()] if ok { return ref, nil } event, done, err := d.newRegisterDefaultProviderEvent(req) if err != nil { return providers.Reference{}, err } select { case d.providerRegChan <- event: case <-d.cancel: return providers.Reference{}, context.Canceled } logging.V(5).Infof("waiting for default provider for package %s", req) var result *RegisterResult select { case result = <-done: case <-d.cancel: return providers.Reference{}, context.Canceled } logging.V(5).Infof("registered default provider for package %s: %s", req, result.State.URN) id := result.State.ID contract.Assertf(id != "", "default provider for package %s has no ID", req) ref, err = providers.NewReference(result.State.URN, id) contract.Assertf(err == nil, "could not create provider reference with URN %s and ID %s", result.State.URN, id) d.providers[req.String()] = ref return ref, nil } // If req should be allowed, or if we should prevent the request. func (d *defaultProviders) shouldDenyRequest(req providers.ProviderRequest) (bool, error) { logging.V(9).Infof("checking if %#v should be denied", req) if req.Package().Name().String() == "pulumi" { logging.V(9).Infof("we always allow %#v through", req) return false, nil } pConfig, err := d.config.GetPackageConfig("pulumi") if err != nil { return true, err } denyCreation := false if value, ok := pConfig["disable-default-providers"]; ok { array := []interface{}{} if !value.IsString() { return true, errors.New("Unexpected encoding of pulumi:disable-default-providers") } if value.StringValue() == "" { // If the list is provided but empty, we don't encode a empty json // list, we just encode the empty string. Check to ensure we don't // get parse errors. return false, nil } if err := json.Unmarshal([]byte(value.StringValue()), &array); err != nil { return true, fmt.Errorf("Failed to parse %s: %w", value.StringValue(), err) } for i, v := range array { s, ok := v.(string) if !ok { return true, fmt.Errorf("pulumi:disable-default-providers[%d] must be a string", i) } barred := strings.TrimSpace(s) if barred == "*" || barred == req.Package().Name().String() { logging.V(7).Infof("denying %s (star=%t)", req, barred == "*") denyCreation = true break } } } else { logging.V(9).Infof("Did not find a config for 'pulumi'") } return denyCreation, nil } // serve is the primary loop responsible for handling default provider requests. func (d *defaultProviders) serve() { for { select { case req := <-d.requests: // Note that we do not need to handle cancellation when sending the response: every message we receive is // guaranteed to have something waiting on the other end of the response channel. ref, err := d.handleRequest(req.req) req.response <- defaultProviderResponse{ref: ref, err: err} case <-d.cancel: return } } } // getDefaultProviderRef fetches the provider reference for the default provider for a particular package. func (d *defaultProviders) getDefaultProviderRef(req providers.ProviderRequest) (providers.Reference, error) { response := make(chan defaultProviderResponse) select { case d.requests <- defaultProviderRequest{req: req, response: response}: case <-d.cancel: return providers.Reference{}, context.Canceled } res := <-response return res.ref, res.err } // A transformation function that can be applied to a resource. type TransformFunction func( ctx context.Context, name, typ string, custom bool, parent resource.URN, props resource.PropertyMap, opts *pulumirpc.TransformResourceOptions, ) (resource.PropertyMap, *pulumirpc.TransformResourceOptions, error) // A transformation function that can be applied to an invoke. type TransformInvokeFunction func( ctx context.Context, token string, args resource.PropertyMap, opts *pulumirpc.TransformInvokeOptions, ) (resource.PropertyMap, *pulumirpc.TransformInvokeOptions, error) type CallbacksClient struct { pulumirpc.CallbacksClient conn *grpc.ClientConn } func (c *CallbacksClient) Close() error { return c.conn.Close() } func NewCallbacksClient(conn *grpc.ClientConn) *CallbacksClient { return &CallbacksClient{ CallbacksClient: pulumirpc.NewCallbacksClient(conn), conn: conn, } } // resmon implements the pulumirpc.ResourceMonitor interface and acts as the gateway between a language runtime's // evaluation of a program and the internal resource planning and deployment logic. type resmon struct { pulumirpc.UnsafeResourceMonitorServer pendingTransforms map[string][]TransformFunction // pending transformation functions for a constructed resource pendingTransformsLock sync.Mutex parents map[resource.URN]resource.URN // map of child URNs to their parent URNs parentsLock sync.Mutex resGoals map[resource.URN]resource.Goal // map of seen URNs and their goals. resGoalsLock sync.Mutex // locks the resGoals map. diagnostics diag.Sink // logger for user-facing messages providers ProviderSource // the provider source itself. componentProviders map[resource.URN]map[string]string // which providers component resources used componentProvidersLock sync.Mutex // which locks the componentProviders map defaultProviders *defaultProviders // the default provider manager. sourcePositions *sourcePositions // source position manager. constructInfo plugin.ConstructInfo // information for construct and call calls. regChan chan *registerResourceEvent // the channel to send resource registrations to. regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to. regReadChan chan *readResourceEvent // the channel to send resource reads to. cancel chan bool // a channel that can cancel the server. done <-chan error // a channel that resolves when the server completes. opts EvalSourceOptions // options for the resource monitor. // the working directory for the resources sent to this monitor. workingDirectory string stackTransformsLock sync.Mutex stackTransforms []TransformFunction // stack transformation functions stackInvokeTransformsLock sync.Mutex stackInvokeTransforms []TransformInvokeFunction // invoke transformation functions resourceTransformsLock sync.Mutex resourceTransforms map[resource.URN][]TransformFunction // option transformation functions per resource callbacksLock sync.Mutex callbacks map[string]*CallbacksClient // callbacks clients per target address packageRefLock sync.Mutex // A map of UUIDs to the description of a provider package they correspond to packageRefMap map[string]providers.ProviderRequest } var _ SourceResourceMonitor = (*resmon)(nil) // newResourceMonitor creates a new resource monitor RPC server. func newResourceMonitor( src *evalSource, provs ProviderSource, regChan chan *registerResourceEvent, regOutChan chan *registerResourceOutputsEvent, regReadChan chan *readResourceEvent, config map[config.Key]string, configSecretKeys []config.Key, tracingSpan opentracing.Span, ) (*resmon, error) { // Create our cancellation channel. cancel := make(chan bool) // Create a new default provider manager. d := &defaultProviders{ defaultProviderInfo: src.defaultProviderInfo, providers: make(map[string]providers.Reference), config: src.runinfo.Target, requests: make(chan defaultProviderRequest), providerRegChan: regChan, cancel: cancel, } // New up an engine RPC server. resmon := &resmon{ diagnostics: src.plugctx.Diag, providers: provs, defaultProviders: d, workingDirectory: src.runinfo.Pwd, sourcePositions: newSourcePositions(src.runinfo.ProjectRoot), pendingTransforms: map[string][]TransformFunction{}, parents: map[resource.URN]resource.URN{}, resGoals: map[resource.URN]resource.Goal{}, componentProviders: map[resource.URN]map[string]string{}, regChan: regChan, regOutChan: regOutChan, regReadChan: regReadChan, cancel: cancel, opts: src.opts, callbacks: map[string]*CallbacksClient{}, resourceTransforms: map[resource.URN][]TransformFunction{}, packageRefMap: map[string]providers.ProviderRequest{}, } // Fire up a gRPC server and start listening for incomings. handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ Cancel: resmon.cancel, Init: func(srv *grpc.Server) error { pulumirpc.RegisterResourceMonitorServer(srv, resmon) return nil }, Options: sourceEvalServeOptions(src.plugctx, tracingSpan, env.DebugGRPC.Value()), }) if err != nil { return nil, err } resmon.constructInfo = plugin.ConstructInfo{ Project: string(src.runinfo.Proj.Name), Stack: src.runinfo.Target.Name.String(), Config: config, ConfigSecretKeys: configSecretKeys, DryRun: src.opts.DryRun, Parallel: src.opts.Parallel, MonitorAddress: fmt.Sprintf("127.0.0.1:%d", handle.Port), } resmon.done = handle.Done go d.serve() return resmon, nil } // Get or allocate a new grpc client for the given callback address. func (rm *resmon) GetCallbacksClient(target string) (*CallbacksClient, error) { rm.callbacksLock.Lock() defer rm.callbacksLock.Unlock() if client, has := rm.callbacks[target]; has { return client, nil } conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } client := NewCallbacksClient(conn) rm.callbacks[target] = client return client, nil } // Address returns the address at which the monitor's RPC server may be reached. func (rm *resmon) Address() string { return rm.constructInfo.MonitorAddress } // Cancel signals that the engine should be terminated, awaits its termination, and returns any errors that result. func (rm *resmon) Cancel() error { close(rm.cancel) errs := []error{<-rm.done} for _, client := range rm.callbacks { errs = append(errs, client.Close()) } return errors.Join(errs...) } func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span, logFile string) []grpc.ServerOption { serveOpts := rpcutil.OpenTracingServerInterceptorOptions( tracingSpan, otgrpc.SpanDecorator(decorateResourceSpans), ) if logFile != "" { di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{ LogFile: logFile, Mutex: ctx.DebugTraceMutex, }) if err != nil { // ignoring return nil } metadata := map[string]interface{}{ "mode": "server", } serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{ Metadata: metadata, })...) } return serveOpts } // getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the // given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference // to the default provider for the indicated package. func (rm *resmon) getProviderReference(defaultProviders *defaultProviders, req providers.ProviderRequest, rawProviderRef string, ) (providers.Reference, error) { if rawProviderRef != "" { // Check if this is a real provider ref (URN::ID) or a package reference (a dashed uuid) if strings.Contains(rawProviderRef, "::") { ref, err := providers.ParseReference(rawProviderRef) if err != nil { return providers.Reference{}, fmt.Errorf("could not parse provider reference: %w", err) } return ref, nil } } ref, err := defaultProviders.getDefaultProviderRef(req) if err != nil { return providers.Reference{}, err } return ref, nil } // getProviderFromSource fetches the provider plugin for a resource, read, or invoke from the given // package with the given unparsed provider reference. If the unparsed provider reference is empty, // this function returns the plugin for the indicated package's default provider. func (rm *resmon) getProviderFromSource( providerSource ProviderSource, defaultProviders *defaultProviders, req providers.ProviderRequest, rawProviderRef string, token tokens.ModuleMember, ) (plugin.Provider, error) { providerRef, err := rm.getProviderReference(defaultProviders, req, rawProviderRef) if err != nil { return nil, fmt.Errorf("getProviderFromSource: %w", err) } else if providers.IsDenyDefaultsProvider(providerRef) { msg := diag.GetDefaultProviderDenied("Invoke").Message return nil, fmt.Errorf(msg, req.Package(), token) } provider, ok := providerSource.GetProvider(providerRef) if !ok { return nil, fmt.Errorf("unknown provider '%v' -> '%v'", rawProviderRef, providerRef) } return provider, nil } func parseProviderRequest( pkg tokens.Package, version, pluginDownloadURL string, pluginChecksums map[string][]byte, parameterization *providers.ProviderParameterization, ) (providers.ProviderRequest, error) { if version == "" { logging.V(5).Infof("parseProviderRequest(%s): semver version is the empty string", pkg) return providers.NewProviderRequest(pkg, nil, pluginDownloadURL, pluginChecksums, parameterization), nil } parsedVersion, err := semver.Parse(version) if err != nil { logging.V(5).Infof("parseProviderRequest(%s, %s): semver version string is invalid: %v", pkg, version, err) return providers.ProviderRequest{}, err } url := strings.TrimSuffix(pluginDownloadURL, "/") return providers.NewProviderRequest(pkg, &parsedVersion, url, pluginChecksums, parameterization), nil } func (rm *resmon) RegisterPackage(ctx context.Context, req *pulumirpc.RegisterPackageRequest, ) (*pulumirpc.RegisterPackageResponse, error) { logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v)", req) name := tokens.Package(req.Name) if name == "" { return nil, errors.New("package name is empty") } // First parse the request into a ProviderRequest var version *semver.Version if req.Version != "" { v, err := semver.Parse(req.Version) if err != nil { return nil, fmt.Errorf("parse package version %s: %w", req.Version, err) } version = &v } // Parse the parameterization var parameterization *providers.ProviderParameterization if req.Parameterization != nil { parameterizationVersion, err := semver.Parse(req.Parameterization.Version) if err != nil { return nil, fmt.Errorf("parse parameter version %s: %w", req.Parameterization.Version, err) } // RegisterPackageRequest keeps all the plugin information in the root fields "name", "version" etc, while the // information about the parameterized package is in the "parameterization" field. Internally in the engine, and // for resource state we need to flip that around a bit. parameterization = providers.NewProviderParameterization( tokens.Package(req.Parameterization.Name), parameterizationVersion, req.Parameterization.Value, ) } pi := providers.NewProviderRequest( tokens.Package(req.Name), version, req.DownloadUrl, req.Checksums, parameterization) rm.packageRefLock.Lock() defer rm.packageRefLock.Unlock() // See if this package is already registered, else add it to the map. for uuid, candidate := range rm.packageRefMap { if reflect.DeepEqual(candidate, pi) { logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v) matched %s", req, uuid) return &pulumirpc.RegisterPackageResponse{Ref: uuid}, nil } } // Wasn't found add it to the map uuid := uuid.New().String() rm.packageRefMap[uuid] = pi logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v) created %s", req, uuid) return &pulumirpc.RegisterPackageResponse{Ref: uuid}, nil } func (rm *resmon) SupportsFeature(ctx context.Context, req *pulumirpc.SupportsFeatureRequest, ) (*pulumirpc.SupportsFeatureResponse, error) { hasSupport := false // NOTE: DO NOT ADD ANY MORE FEATURES TO THIS LIST // // Context: https://github.com/pulumi/pulumi-dotnet/pull/88#pullrequestreview-1265714090 // // We shouldn't add any more features to this list, copying strings around codebases is prone to bugs. // Rather than adding a new feature here, setup a new SupportsFeatureV2 method, that takes a grpc enum // instead. That can then be safely code generated out to each language with no risk of typos. // // These old features have to stay as is because old engines DO support them, but wouldn't support the new // SupportsFeatureV2 method. switch req.Id { case "secrets": hasSupport = true case "resourceReferences": hasSupport = !rm.opts.DisableResourceReferences case "outputValues": hasSupport = !rm.opts.DisableOutputValues case "aliasSpecs": hasSupport = true case "deletedWith": hasSupport = true case "transforms": hasSupport = true case "invokeTransforms": hasSupport = true } logging.V(5).Infof("ResourceMonitor.SupportsFeature(id: %s) = %t", req.Id, hasSupport) return &pulumirpc.SupportsFeatureResponse{ HasSupport: hasSupport, }, nil } // Invoke performs an invocation of a member located in a resource provider. func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) { // Fetch the token. tok := tokens.ModuleMember(req.GetTok()) label := fmt.Sprintf("ResourceMonitor.Invoke(%s)", tok) args, err := plugin.UnmarshalProperties( req.GetArgs(), plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: true, KeepResources: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, fmt.Errorf("failed to unmarshal %v args: %w", tok, err) } opts := &pulumirpc.TransformInvokeOptions{ Provider: req.GetProvider(), Version: req.GetVersion(), PluginDownloadUrl: req.GetPluginDownloadURL(), PluginChecksums: req.GetPluginChecksums(), } // Lock the invoke transforms and run all of those before loading the provider. err = func() error { // Function exists to scope the lock rm.stackInvokeTransformsLock.Lock() defer rm.stackInvokeTransformsLock.Unlock() for _, transform := range rm.stackInvokeTransforms { newArgs, newOpts, err := transform(ctx, string(tok), args, opts) if err != nil { return err } args = newArgs opts = newOpts } return nil }() if err != nil { return nil, err } // Load up the resource provider if necessary. providerReq, err := parseProviderRequest( tok.Package(), opts.Version, opts.PluginDownloadUrl, opts.PluginChecksums, nil) if err != nil { return nil, err } packageRef := req.GetPackageRef() if packageRef != "" { var has bool providerReq, has = rm.packageRefMap[packageRef] if !has { return nil, fmt.Errorf("unknown provider package '%v'", packageRef) } } prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, opts.Provider, tok) if err != nil { return nil, fmt.Errorf("Invoke: %w", err) } // Do the invoke and then return the arguments. logging.V(5).Infof("ResourceMonitor.Invoke received: tok=%v #args=%v", tok, len(args)) resp, err := prov.Invoke(ctx, plugin.InvokeRequest{ Tok: tok, Args: args, }) if err != nil { return nil, fmt.Errorf("invocation of %v returned an error: %w", tok, err) } // Respect `AcceptResources` unless `tok` is for the built-in `pulumi:pulumi:getResource` function, // in which case always keep resources to maintain the original behavior for older SDKs that are not // setting the `AccceptResources` flag. keepResources := req.GetAcceptResources() if tok == "pulumi:pulumi:getResource" { keepResources = true } mret, err := plugin.MarshalProperties(resp.Properties, plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: true, KeepResources: keepResources, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err) } chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(resp.Failures)) for _, failure := range resp.Failures { chkfails = append(chkfails, &pulumirpc.CheckFailure{ Property: string(failure.Property), Reason: failure.Reason, }) } return &pulumirpc.InvokeResponse{Return: mret, Failures: chkfails}, nil } // Call dynamically executes a method in the provider associated with a component resource. func (rm *resmon) Call(ctx context.Context, req *pulumirpc.ResourceCallRequest) (*pulumirpc.CallResponse, error) { // Fetch the token and load up the resource provider if necessary. tok := tokens.ModuleMember(req.GetTok()) providerReq, err := parseProviderRequest( tok.Package(), req.GetVersion(), req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil) if err != nil { return nil, err } packageRef := req.GetPackageRef() if packageRef != "" { var has bool providerReq, has = rm.packageRefMap[packageRef] if !has { return nil, fmt.Errorf("unknown provider package '%v'", packageRef) } } prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider(), tok) if err != nil { return nil, err } label := fmt.Sprintf("ResourceMonitor.Call(%s)", tok) args, err := plugin.UnmarshalProperties( req.GetArgs(), plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: true, KeepResources: true, KeepOutputValues: true, UpgradeToOutputValues: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, fmt.Errorf("failed to unmarshal %v args: %w", tok, err) } argDependencies := map[resource.PropertyKey][]resource.URN{} for name, deps := range req.GetArgDependencies() { urns := make([]resource.URN, len(deps.Urns)) for i, urn := range deps.Urns { urn, err := resource.ParseURN(urn) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency on argument %d URN: %s", i, err)) } urns[i] = urn } argDependencies[resource.PropertyKey(name)] = urns } // If we have output values we can add the dependencies from them to the args dependencies map we send to the provider. for key, output := range args { argDependencies[key] = extendOutputDependencies(argDependencies[key], output) } info := plugin.CallInfo{ Project: rm.constructInfo.Project, Stack: rm.constructInfo.Stack, Config: rm.constructInfo.Config, DryRun: rm.constructInfo.DryRun, Parallel: rm.constructInfo.Parallel, MonitorAddress: rm.constructInfo.MonitorAddress, } options := plugin.CallOptions{ ArgDependencies: argDependencies, } // Do the all and then return the arguments. logging.V(5).Infof( "ResourceMonitor.Call received: tok=%v #args=%v #info=%v #options=%v", tok, len(args), info, options) ret, err := prov.Call(ctx, plugin.CallRequest{ Tok: tok, Args: args, Info: info, Options: options, }) if err != nil { return nil, fmt.Errorf("call of %v returned an error: %w", tok, err) } if ret.ReturnDependencies == nil { ret.ReturnDependencies = map[resource.PropertyKey][]resource.URN{} } for k, v := range ret.Return { ret.ReturnDependencies[k] = extendOutputDependencies(ret.ReturnDependencies[k], v) } returnDependencies := map[string]*pulumirpc.CallResponse_ReturnDependencies{} for name, deps := range ret.ReturnDependencies { urns := make([]string, len(deps)) for i, urn := range deps { urns[i] = string(urn) } returnDependencies[string(name)] = &pulumirpc.CallResponse_ReturnDependencies{Urns: urns} } mret, err := plugin.MarshalProperties(ret.Return, plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: true, KeepResources: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err) } chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(ret.Failures)) for _, failure := range ret.Failures { chkfails = append(chkfails, &pulumirpc.CheckFailure{ Property: string(failure.Property), Reason: failure.Reason, }) } return &pulumirpc.CallResponse{Return: mret, ReturnDependencies: returnDependencies, Failures: chkfails}, nil } func (rm *resmon) StreamInvoke( req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer, ) error { tok := tokens.ModuleMember(req.GetTok()) label := fmt.Sprintf("ResourceMonitor.StreamInvoke(%s)", tok) providerReq, err := parseProviderRequest( tok.Package(), req.GetVersion(), req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil) if err != nil { return err } prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider(), tok) if err != nil { return err } args, err := plugin.UnmarshalProperties( req.GetArgs(), plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: true, KeepResources: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return fmt.Errorf("failed to unmarshal %v args: %w", tok, err) } // Synchronously do the StreamInvoke and then return the arguments. This will block until the // streaming operation completes! logging.V(5).Infof("ResourceMonitor.StreamInvoke received: tok=%v #args=%v", tok, len(args)) resp, err := prov.StreamInvoke(context.TODO(), plugin.StreamInvokeRequest{ Tok: tok, Args: args, OnNext: func(event resource.PropertyMap) error { mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepResources: req.GetAcceptResources(), WorkingDirectory: rm.workingDirectory, }) if err != nil { return fmt.Errorf("failed to marshal return: %w", err) } return stream.Send(&pulumirpc.InvokeResponse{Return: mret}) }, }) if err != nil { return fmt.Errorf("streaming invocation of %v returned an error: %w", tok, err) } chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(resp.Failures)) for _, failure := range resp.Failures { chkfails = append(chkfails, &pulumirpc.CheckFailure{ Property: string(failure.Property), Reason: failure.Reason, }) } if len(chkfails) > 0 { return stream.Send(&pulumirpc.InvokeResponse{Failures: chkfails}) } return nil } // ReadResource reads the current state associated with a resource from its provider plugin. func (rm *resmon) ReadResource(ctx context.Context, req *pulumirpc.ReadResourceRequest, ) (*pulumirpc.ReadResourceResponse, error) { // Read the basic inputs necessary to identify the plugin. t, err := tokens.ParseTypeToken(req.GetType()) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, err.Error()) } name := req.GetName() parent, err := resource.ParseOptionalURN(req.GetParent()) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent URN: %s", err)) } provider := req.GetProvider() if !providers.IsProviderType(t) && provider == "" { providerReq, err := parseProviderRequest( t.Package(), req.GetVersion(), req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil) if err != nil { return nil, err } packageRef := req.GetPackageRef() if packageRef != "" { var has bool providerReq, has = rm.packageRefMap[packageRef] if !has { return nil, fmt.Errorf("unknown provider package '%v'", packageRef) } } ref, provErr := rm.defaultProviders.getDefaultProviderRef(providerReq) if provErr != nil { return nil, provErr } else if providers.IsDenyDefaultsProvider(ref) { msg := diag.GetDefaultProviderDenied("Read").Message return nil, fmt.Errorf(msg, req.GetType(), t) } provider = ref.String() } id := resource.ID(req.GetId()) label := fmt.Sprintf("ResourceMonitor.ReadResource(%s, %s, %s, %s)", id, t, name, provider) deps := slice.Prealloc[resource.URN](len(req.GetDependencies())) for _, depURN := range req.GetDependencies() { urn, err := resource.ParseURN(depURN) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency: %s", err)) } deps = append(deps, urn) } props, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: true, KeepResources: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, err } additionalSecretOutputs := slice.Prealloc[resource.PropertyKey](len(req.GetAdditionalSecretOutputs())) for _, name := range req.GetAdditionalSecretOutputs() { additionalSecretOutputs = append(additionalSecretOutputs, resource.PropertyKey(name)) } event := &readResourceEvent{ id: id, name: name, baseType: t, provider: provider, parent: parent, props: props, dependencies: deps, additionalSecretOutputs: additionalSecretOutputs, sourcePosition: rm.sourcePositions.getFromRequest(req), done: make(chan *ReadResult), } select { case rm.regReadChan <- event: case <-rm.cancel: logging.V(5).Infof("ResourceMonitor.ReadResource operation canceled, name=%s", name) return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource registration") } // Now block waiting for the operation to finish. var result *ReadResult select { case result = <-event.done: case <-rm.cancel: logging.V(5).Infof("ResourceMonitor.ReadResource operation canceled, name=%s", name) return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on step's done channel") } contract.Assertf(result != nil, "ReadResource operation returned a nil result") marshaled, err := plugin.MarshalProperties(result.State.Outputs, plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: req.GetAcceptSecrets(), KeepResources: req.GetAcceptResources(), WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, fmt.Errorf("failed to marshal %s return state: %w", result.State.URN, err) } return &pulumirpc.ReadResourceResponse{ Urn: string(result.State.URN), Properties: marshaled, }, nil } // Wrap the transform callback so the engine can call the callback server, which will then execute the function. The // wrapper takes care of all the necessary marshalling and unmarshalling. func (rm *resmon) wrapTransformCallback(cb *pulumirpc.Callback) (TransformFunction, error) { client, err := rm.GetCallbacksClient(cb.Target) if err != nil { return nil, err } token := cb.Token return func( ctx context.Context, name, typ string, custom bool, parent resource.URN, props resource.PropertyMap, opts *pulumirpc.TransformResourceOptions, ) (resource.PropertyMap, *pulumirpc.TransformResourceOptions, error) { logging.V(5).Infof("Transform: name=%v type=%v custom=%v parent=%v props=%v opts=%v", name, typ, custom, parent, props, opts) mopts := plugin.MarshalOptions{ KeepUnknowns: true, KeepSecrets: true, KeepResources: true, KeepOutputValues: true, WorkingDirectory: rm.workingDirectory, } mprops, err := plugin.MarshalProperties(props, mopts) if err != nil { return nil, nil, err } request, err := proto.Marshal(&pulumirpc.TransformRequest{ Name: name, Type: typ, Custom: custom, Properties: mprops, Options: opts, }) if err != nil { return nil, nil, fmt.Errorf("marshaling request: %w", err) } resp, err := client.Invoke(ctx, &pulumirpc.CallbackInvokeRequest{ Token: token, Request: request, }) if err != nil { logging.V(5).Infof("Transform callback error: %v", err) return nil, nil, err } var response pulumirpc.TransformResponse err = proto.Unmarshal(resp.Response, &response) if err != nil { return nil, nil, fmt.Errorf("unmarshaling response: %w", err) } newOpts := opts if response.Options != nil { newOpts = response.Options } newProps := props if response.Properties != nil { newProps, err = plugin.UnmarshalProperties(response.Properties, mopts) if err != nil { return nil, nil, err } } logging.V(5).Infof("Transform: props=%v opts=%v", newProps, newOpts) return newProps, newOpts, nil }, nil } // Wrap the transform callback so the engine can call the callback server, which will then execute the function. The // wrapper takes care of all the necessary marshalling and unmarshalling. func (rm *resmon) wrapInvokeTransformCallback(cb *pulumirpc.Callback) (TransformInvokeFunction, error) { client, err := rm.GetCallbacksClient(cb.Target) if err != nil { return nil, err } token := cb.Token return func( ctx context.Context, invokeToken string, args resource.PropertyMap, opts *pulumirpc.TransformInvokeOptions, ) (resource.PropertyMap, *pulumirpc.TransformInvokeOptions, error) { logging.V(5).Infof("Invoke transform: token=%v props=%v opts=%v", invokeToken, args, opts) margs := plugin.MarshalOptions{ KeepUnknowns: true, KeepSecrets: true, KeepResources: true, KeepOutputValues: true, WorkingDirectory: rm.workingDirectory, } mprops, err := plugin.MarshalProperties(args, margs) if err != nil { return nil, nil, err } var request []byte request, err = proto.Marshal(&pulumirpc.TransformInvokeRequest{ Token: invokeToken, Args: mprops, Options: opts, }) if err != nil { return nil, nil, fmt.Errorf("marshaling request: %w", err) } resp, err := client.Invoke(ctx, &pulumirpc.CallbackInvokeRequest{ Token: token, Request: request, }) if err != nil { logging.V(5).Infof("Invoke transform callback error: %v", err) return nil, nil, err } newOpts := opts var newProps resource.PropertyMap var response pulumirpc.TransformInvokeResponse err = proto.Unmarshal(resp.Response, &response) if err != nil { return nil, nil, fmt.Errorf("unmarshaling response: %w", err) } if response.Options != nil { newOpts = response.Options } newProps = args if response.Args != nil { newProps, err = plugin.UnmarshalProperties(response.Args, margs) if err != nil { return nil, nil, err } } logging.V(5).Infof("Invoke transform: props=%v opts=%v", newProps, newOpts) return newProps, newOpts, nil }, nil } func (rm *resmon) RegisterStackTransform(ctx context.Context, cb *pulumirpc.Callback) (*emptypb.Empty, error) { rm.stackTransformsLock.Lock() defer rm.stackTransformsLock.Unlock() if cb.Target == "" { return nil, errors.New("target must be specified") } wrapped, err := rm.wrapTransformCallback(cb) if err != nil { return nil, err } rm.stackTransforms = append(rm.stackTransforms, wrapped) return &emptypb.Empty{}, nil } func (rm *resmon) RegisterStackInvokeTransform(ctx context.Context, cb *pulumirpc.Callback) (*emptypb.Empty, error) { rm.stackInvokeTransformsLock.Lock() defer rm.stackInvokeTransformsLock.Unlock() if cb.Target == "" { return nil, errors.New("target must be specified") } wrapped, err := rm.wrapInvokeTransformCallback(cb) if err != nil { return nil, err } rm.stackInvokeTransforms = append(rm.stackInvokeTransforms, wrapped) return &emptypb.Empty{}, nil } // inheritFromParent returns a new goal that inherits from the given parent goal. // Currently only inherits DeletedWith from parent. func inheritFromParent(child resource.Goal, parent resource.Goal) *resource.Goal { goal := child if goal.DeletedWith == "" { goal.DeletedWith = parent.DeletedWith } return &goal } type sourcePositions struct { projectRoot string } func newSourcePositions(projectRoot string) *sourcePositions { if projectRoot == "" { projectRoot = "/" } else { contract.Assertf(filepath.IsAbs(projectRoot), "projectRoot is not an absolute path") projectRoot = filepath.Clean(projectRoot) } return &sourcePositions{projectRoot: projectRoot} } func (s *sourcePositions) parseSourcePosition(raw *pulumirpc.SourcePosition) (string, error) { if raw == nil { return "", nil } if raw.Line <= 0 { return "", fmt.Errorf("invalid line number %v", raw.Line) } col := "" if raw.Column != 0 { if raw.Column < 0 { return "", fmt.Errorf("invalid column number %v", raw.Column) } col = "," + strconv.FormatInt(int64(raw.Column), 10) } posURL, err := url.Parse(raw.Uri) if err != nil { return "", err } if posURL.Scheme != "file" { return "", fmt.Errorf("unrecognized scheme %q", posURL.Scheme) } file := filepath.FromSlash(posURL.Path) if !filepath.IsAbs(file) { return "", errors.New("source positions must include absolute paths") } rel, err := filepath.Rel(s.projectRoot, file) if err != nil { return "", fmt.Errorf("making relative path: %w", err) } posURL.Scheme = "project" posURL.Path = "/" + filepath.ToSlash(rel) posURL.Fragment = fmt.Sprintf("%v%s", raw.Line, col) return posURL.String(), nil } // Allow getFromRequest to accept any gRPC request that has a source position (ReadResourceRequest, // RegisterResourceRequest, ResourceInvokeRequest, and CallRequest). type hasSourcePosition interface { GetSourcePosition() *pulumirpc.SourcePosition } // getFromRequest returns any source position information from an incoming request. func (s *sourcePositions) getFromRequest(req hasSourcePosition) string { pos, err := s.parseSourcePosition(req.GetSourcePosition()) if err != nil { logging.V(5).Infof("parsing source position %#v: %v", req.GetSourcePosition(), err) return "" } return pos } // requestFromNodeJS returns true if the request is coming from a Node.js language runtime // or SDK. This is determined by checking if the request has a "pulumi-runtime" metadata // header with a value of "nodejs". If no "pulumi-runtime" header is present, then it // checks if the request has a "user-agent" metadata header that has a value that starts // with "grpc-node-js/". func requestFromNodeJS(ctx context.Context) bool { if md, hasMetadata := metadata.FromIncomingContext(ctx); hasMetadata { // Check for the "pulumi-runtime" header first. // We'll always respect this header value when present. if runtime, ok := md["pulumi-runtime"]; ok { return len(runtime) == 1 && runtime[0] == "nodejs" } // Otherwise, check the "user-agent" header. if ua, ok := md["user-agent"]; ok { return len(ua) == 1 && strings.HasPrefix(ua[0], "grpc-node-js/") } } return false } // transformAliasForNodeJSCompat transforms the alias from the legacy Node.js values to properly specified values. func transformAliasForNodeJSCompat(alias *pulumirpc.Alias) *pulumirpc.Alias { switch a := alias.Alias.(type) { case *pulumirpc.Alias_Spec_: // The original implementation in the Node.js SDK did not specify aliases correctly: // // - It did not set NoParent when it should have, but instead set Parent to empty. // - It set NoParent to true and left Parent empty when both the alias and resource had no Parent specified. // // To maintain compatibility with such versions of the Node.js SDK, we transform these incorrectly // specified aliases into properly specified ones that work with this implementation of the engine: // // - { Parent: "", NoParent: false } -> { Parent: "", NoParent: true } // - { Parent: "", NoParent: true } -> { Parent: "", NoParent: false } spec := &pulumirpc.Alias_Spec{ Name: a.Spec.Name, Type: a.Spec.Type, Stack: a.Spec.Stack, Project: a.Spec.Project, } switch p := a.Spec.Parent.(type) { case *pulumirpc.Alias_Spec_ParentUrn: if p.ParentUrn == "" { spec.Parent = &pulumirpc.Alias_Spec_NoParent{NoParent: true} } else { spec.Parent = p } case *pulumirpc.Alias_Spec_NoParent: if p.NoParent { spec.Parent = nil } else { spec.Parent = p } default: spec.Parent = &pulumirpc.Alias_Spec_NoParent{NoParent: true} } return &pulumirpc.Alias{ Alias: &pulumirpc.Alias_Spec_{ Spec: spec, }, } } return alias } func (rm *resmon) resolveProvider( provider string, providers map[string]string, parent resource.URN, pkg tokens.Package, ) string { if provider != "" { return provider } if prov, ok := providers[string(pkg)]; ok { return prov } if parent != "" { rm.componentProvidersLock.Lock() defer rm.componentProvidersLock.Unlock() if parentsProvider, ok := rm.componentProviders[parent]; ok { if prov, ok := parentsProvider[string(pkg)]; ok { return prov } } } return "" } // RegisterResource is invoked by a language process when a new resource has been allocated. func (rm *resmon) RegisterResource(ctx context.Context, req *pulumirpc.RegisterResourceRequest, ) (*pulumirpc.RegisterResourceResponse, error) { // Communicate the type, name, and object information to the iterator that is awaiting us. name := req.GetName() custom := req.GetCustom() remote := req.GetRemote() parent, err := resource.ParseOptionalURN(req.GetParent()) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent URN: %s", err)) } id := resource.ID(req.GetImportId()) sourcePosition := rm.sourcePositions.getFromRequest(req) // Custom resources must have a three-part type so that we can 1) identify if they are providers and 2) retrieve the // provider responsible for managing a particular resource (based on the type's Package). var t tokens.Type if custom || remote { t, err = tokens.ParseTypeToken(req.GetType()) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, err.Error()) } } else { // Component resources may have any format type. t = tokens.Type(req.GetType()) } label := fmt.Sprintf("ResourceMonitor.RegisterResource(%s,%s)", t, name) // We need to build the full alias spec list here, so we can pass it to transforms. aliases := []*pulumirpc.Alias{} for _, aliasURN := range req.GetAliasURNs() { aliases = append(aliases, &pulumirpc.Alias{Alias: &pulumirpc.Alias_Urn{Urn: aliasURN}}) } // We assume aliases are properly specified. However, if a request hasn't explicitly // indicated that it is using properly specified aliases and the request is coming // from Node.js, transform the aliases from the incorrect Node.js values to properly // specified values, to maintain backward compatibility for users of older Node.js // SDKs that aren't sending properly specified aliases. transformAliases := !req.GetAliasSpecs() && requestFromNodeJS(ctx) for _, aliasObject := range req.GetAliases() { if transformAliases { aliasObject = transformAliasForNodeJSCompat(aliasObject) } aliases = append(aliases, aliasObject) } var deleteBeforeReplace *bool // Technically DeleteBeforeReplaceDefined should be used to decided if DeleteBeforeReplace should be looked at or // not. However the Go sdk doesn't set Defined so we have a fallback here of respecting this field if either Defined // is set or DeleteBeforeReplace is true. if req.GetDeleteBeforeReplaceDefined() || req.GetDeleteBeforeReplace() { deleteBeforeReplace = &req.DeleteBeforeReplace } props, err := plugin.UnmarshalProperties( req.GetObject(), plugin.MarshalOptions{ Label: label, KeepUnknowns: true, ComputeAssetHashes: true, KeepSecrets: true, KeepResources: true, KeepOutputValues: true, UpgradeToOutputValues: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, err } // Before we pass the props to the transform function we need to ensure that they correctly carry any dependency // information. dependencies := mapset.NewSet[resource.URN]() for _, dependingURN := range req.GetDependencies() { urn, err := resource.ParseURN(dependingURN) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency URN: %s", err)) } dependencies.Add(urn) } propertyDependencies := make(map[resource.PropertyKey]mapset.Set[resource.URN]) if len(req.GetPropertyDependencies()) == 0 && !remote { // If this request did not specify property dependencies, treat each property as depending on every resource // in the request's dependency list. We don't need to do this when remote is true, because all clients that // support remote already support passing property dependencies, so there's no need to backfill here. for pk := range props { propertyDependencies[pk] = dependencies } } else { // Otherwise, unmarshal the per-property dependency information. for pk, pd := range req.GetPropertyDependencies() { deps := mapset.NewSet[resource.URN]() for _, d := range pd.Urns { urn, err := resource.ParseURN(d) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency on property %s URN: %s", pk, err)) } deps.Add(urn) } propertyDependencies[resource.PropertyKey(pk)] = deps } } // If we're running any transforms we need to update all the property values to Outputs to track dependencies. if len(req.Transforms) > 0 { props = upgradeOutputValues(props, propertyDependencies) } provider := req.GetProvider() if custom || remote { provider = rm.resolveProvider(req.GetProvider(), req.GetProviders(), parent, t.Package()) } opts := &pulumirpc.TransformResourceOptions{ DependsOn: req.GetDependencies(), Protect: req.GetProtect(), IgnoreChanges: req.GetIgnoreChanges(), ReplaceOnChanges: req.GetReplaceOnChanges(), Version: req.GetVersion(), Aliases: aliases, Provider: provider, Providers: req.GetProviders(), CustomTimeouts: req.GetCustomTimeouts(), PluginDownloadUrl: req.GetPluginDownloadURL(), RetainOnDelete: req.GetRetainOnDelete(), DeletedWith: req.GetDeletedWith(), DeleteBeforeReplace: deleteBeforeReplace, AdditionalSecretOutputs: req.GetAdditionalSecretOutputs(), PluginChecksums: req.GetPluginChecksums(), } // This might be a resource registation for a resource that another process requested to be constructed. If so we'll // have saved the pending transforms for this and we should use those rather than what is on the request. var transforms []TransformFunction pendingKey := fmt.Sprintf("%s::%s::%s", parent, t, name) err = func() error { rm.pendingTransformsLock.Lock() defer rm.pendingTransformsLock.Unlock() if pending, ok := rm.pendingTransforms[pendingKey]; ok { transforms = pending } else { transforms, err = slice.MapError(req.Transforms, rm.wrapTransformCallback) if err != nil { return err } // We only need to save this for remote calls if remote && len(transforms) > 0 { rm.pendingTransforms[pendingKey] = transforms } } return nil }() if err != nil { return nil, err } // Before we calculate anything else run the transformations. First run the transforms for this resource, // then it's parents etc etc for _, transform := range transforms { newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts) if err != nil { return nil, err } props = newProps opts = newOpts } // Lookup our parents transformations and run those err = func() error { // Function exists to scope the lock rm.resourceTransformsLock.Lock() defer rm.resourceTransformsLock.Unlock() rm.parentsLock.Lock() defer rm.parentsLock.Unlock() current := parent for current != "" { if transforms, ok := rm.resourceTransforms[current]; ok { for _, transform := range transforms { newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts) if err != nil { return err } props = newProps opts = newOpts } } current = rm.parents[current] } return nil }() if err != nil { return nil, err } // Then lock the stack transformations and run all of those err = func() error { // Function exists to scope the lock rm.stackTransformsLock.Lock() defer rm.stackTransformsLock.Unlock() for _, transform := range rm.stackTransforms { newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts) if err != nil { return err } props = newProps opts = newOpts } return nil }() if err != nil { return nil, err } // We handle updating the providers map to include the providers field of the parent if // both the current resource and its parent is a component resource. func() { // Function exists to scope the lock rm.componentProvidersLock.Lock() defer rm.componentProvidersLock.Unlock() if parentsProviders, parentIsComponent := rm.componentProviders[parent]; !custom && parent != "" && parentIsComponent { for k, v := range parentsProviders { if opts.Providers == nil { opts.Providers = map[string]string{} } if _, ok := opts.Providers[k]; !ok { opts.Providers[k] = v } } } }() var providerRef providers.Reference var providerRefs map[string]string if custom && !providers.IsProviderType(t) || remote { providerReq, err := parseProviderRequest( t.Package(), opts.GetVersion(), opts.GetPluginDownloadUrl(), opts.GetPluginChecksums(), nil) if err != nil { return nil, err } packageRef := req.GetPackageRef() if packageRef != "" { var has bool providerReq, has = rm.packageRefMap[packageRef] if !has { return nil, fmt.Errorf("unknown provider package '%v'", packageRef) } } providerRef, err = rm.getProviderReference(rm.defaultProviders, providerReq, opts.GetProvider()) if err != nil { return nil, err } providerRefs = make(map[string]string, len(opts.GetProviders())) for name, provider := range opts.GetProviders() { ref, err := rm.getProviderReference(rm.defaultProviders, providerReq, provider) if err != nil { return nil, err } providerRefs[name] = ref.String() } } parsedAliases := []resource.Alias{} for _, aliasObject := range opts.Aliases { aliasSpec := aliasObject.GetSpec() var alias resource.Alias if aliasSpec != nil { alias = resource.Alias{ Name: aliasSpec.Name, Type: aliasSpec.Type, Stack: aliasSpec.Stack, Project: aliasSpec.Project, } switch parent := aliasSpec.GetParent().(type) { case *pulumirpc.Alias_Spec_ParentUrn: // Technically an SDK shouldn't set `parent` at all to specify the default parent, but both NodeJS and // Python have buggy SDKs that set parent to an empty URN to specify the default parent. We handle this // case here to maintain backward compatibility with older SDKs but it would be good to fix this to be // strict in V4. parentURN, err := resource.ParseOptionalURN(parent.ParentUrn) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent alias URN: %s", err)) } alias.Parent = parentURN case *pulumirpc.Alias_Spec_NoParent: alias.NoParent = parent.NoParent } } else { urn, err := resource.ParseURN(aliasObject.GetUrn()) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid alias URN: %s", err)) } alias = resource.Alias{URN: urn} } parsedAliases = append(parsedAliases, alias) } // Reparse the dependency information from any transformation results if len(req.Transforms) > 0 { dependencies = mapset.NewSet[resource.URN]() for _, dependingURN := range opts.DependsOn { urn, err := resource.ParseURN(dependingURN) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency URN: %s", err)) } dependencies.Add(urn) } // Now we've run the transforms we can rebuild the property dependency maps. If we have output values we can add the // dependencies from them to the dependencies map we send to the provider and save to state. propertyDependencies = make(map[resource.PropertyKey]mapset.Set[resource.URN]) for key, output := range props { deps := mapset.NewSet[resource.URN]() addOutputDependencies(deps, output) propertyDependencies[key] = deps // Also add these to the overall dependencies dependencies = dependencies.Union(deps) } } else { // If we ran transforms we would have merged all the dependencies togther already, but if we didn't we want to // ensure any output values add their dependencies to the dependencies map we send to the provider. for key, output := range props { if propertyDependencies[key] == nil { propertyDependencies[key] = mapset.NewSet[resource.URN]() } addOutputDependencies(propertyDependencies[key], output) } } rawDependencies := dependencies.ToSlice() rawPropertyDependencies := make(map[resource.PropertyKey][]resource.URN) for key, deps := range propertyDependencies { rawPropertyDependencies[key] = deps.ToSlice() } if providers.IsProviderType(t) { if opts.GetVersion() != "" { version, err := semver.Parse(opts.GetVersion()) if err != nil { return nil, fmt.Errorf("%s: passed invalid version: %w", label, err) } providers.SetProviderVersion(props, &version) } if opts.GetPluginDownloadUrl() != "" { providers.SetProviderURL(props, opts.GetPluginDownloadUrl()) } if req.GetPackageRef() != "" { // If the provider resource has a package ref then we need to set all it's input fields as in // newRegisterDefaultProviderEvent. packageRef := req.GetPackageRef() providerReq, has := rm.packageRefMap[packageRef] if !has { return nil, fmt.Errorf("unknown provider package '%v'", packageRef) } if providerReq.Version() != nil { providers.SetProviderVersion(props, providerReq.Version()) } if providerReq.PluginDownloadURL() != "" { providers.SetProviderURL(props, providerReq.PluginDownloadURL()) } if providerReq.PluginChecksums() != nil { providers.SetProviderChecksums(props, providerReq.PluginChecksums()) } if providerReq.Parameterization() != nil { providers.SetProviderName(props, providerReq.Name()) providers.SetProviderParameterization(props, providerReq.Parameterization()) } } // Make sure that an explicit provider which doesn't specify its plugin gets the // same plugin as the default provider for the package. defaultProvider, ok := rm.defaultProviders.defaultProviderInfo[providers.GetProviderPackage(t)] if ok && opts.GetVersion() == "" && opts.GetPluginDownloadUrl() == "" { if defaultProvider.Version != nil { providers.SetProviderVersion(props, defaultProvider.Version) } if defaultProvider.PluginDownloadURL != "" { providers.SetProviderURL(props, defaultProvider.PluginDownloadURL) } } } protect := opts.Protect ignoreChanges := opts.IgnoreChanges replaceOnChanges := opts.ReplaceOnChanges retainOnDelete := opts.RetainOnDelete deletedWith, err := resource.ParseOptionalURN(opts.GetDeletedWith()) if err != nil { return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid DeletedWith URN: %s", err)) } customTimeouts := opts.CustomTimeouts additionalSecretOutputs := opts.GetAdditionalSecretOutputs() // At this point we're going to forward these properties to the rest of the engine and potentially to providers. As // we add features to the code above (most notably transforms) we could end up with more instances of `OutputValue` // than the rest of the system historically expects. To minimize the disruption we downgrade `OutputValue`s with no // dependencies down to `Computed` and `Secret` or their plain values. We only do this for non-remote resources. // Remote resources already deal with `OutputValue`s and even though it would be more consistent to downgrade them // here it would be a break change. if !remote { props = downgradeOutputValues(props) } logging.V(5).Infof( "ResourceMonitor.RegisterResource received: t=%v, name=%v, custom=%v, #props=%v, parent=%v, protect=%v, "+ "provider=%v, deps=%v, deleteBeforeReplace=%v, ignoreChanges=%v, aliases=%v, customTimeouts=%v, "+ "providers=%v, replaceOnChanges=%v, retainOnDelete=%v, deletedWith=%v", t, name, custom, len(props), parent, protect, providerRef, rawDependencies, opts.DeleteBeforeReplace, ignoreChanges, parsedAliases, customTimeouts, providerRefs, replaceOnChanges, retainOnDelete, deletedWith) // If this is a remote component, fetch its provider and issue the construct call. Otherwise, register the resource. var result *RegisterResult var outputDeps map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies if remote { provider, ok := rm.providers.GetProvider(providerRef) if providers.IsDenyDefaultsProvider(providerRef) { msg := diag.GetDefaultProviderDenied("").Message return nil, fmt.Errorf(msg, t.Package().String(), t.String()) } if !ok { return nil, fmt.Errorf("unknown provider '%v'", providerRef) } // Invoke the provider's Construct RPC method. options := plugin.ConstructOptions{ // We don't actually need to send a list of aliases to construct anymore because the engine does // all alias construction. Aliases: []resource.Alias{}, Dependencies: rawDependencies, Protect: protect, PropertyDependencies: rawPropertyDependencies, Providers: providerRefs, AdditionalSecretOutputs: additionalSecretOutputs, DeletedWith: deletedWith, IgnoreChanges: ignoreChanges, ReplaceOnChanges: replaceOnChanges, RetainOnDelete: retainOnDelete, } if customTimeouts != nil { options.CustomTimeouts = &plugin.CustomTimeouts{ Create: customTimeouts.Create, Update: customTimeouts.Update, Delete: customTimeouts.Delete, } } if opts.DeleteBeforeReplace != nil { options.DeleteBeforeReplace = *opts.DeleteBeforeReplace } constructResult, err := provider.Construct(ctx, plugin.ConstructRequest{ Info: rm.constructInfo, Type: t, Name: name, Parent: parent, Inputs: props, Options: options, }) if err != nil { return nil, err } result = &RegisterResult{State: &resource.State{URN: constructResult.URN, Outputs: constructResult.Outputs}} // The provider may have returned OutputValues in "Outputs", we need to downgrade them to Computed or // Secret but also add them to the outputDeps map. if constructResult.OutputDependencies == nil { constructResult.OutputDependencies = map[resource.PropertyKey][]resource.URN{} } for k, v := range result.State.Outputs { constructResult.OutputDependencies[k] = extendOutputDependencies(constructResult.OutputDependencies[k], v) } outputDeps = map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies{} for k, deps := range constructResult.OutputDependencies { urns := make([]string, len(deps)) for i, d := range deps { urns[i] = string(d) } outputDeps[string(k)] = &pulumirpc.RegisterResourceResponse_PropertyDependencies{Urns: urns} } } else { additionalSecretKeys := slice.Prealloc[resource.PropertyKey](len(additionalSecretOutputs)) for _, name := range additionalSecretOutputs { additionalSecretKeys = append(additionalSecretKeys, resource.PropertyKey(name)) } var timeouts resource.CustomTimeouts if customTimeouts != nil { if customTimeouts.Create != "" { seconds, err := generateTimeoutInSeconds(customTimeouts.Create) if err != nil { return nil, err } timeouts.Create = seconds } if customTimeouts.Delete != "" { seconds, err := generateTimeoutInSeconds(customTimeouts.Delete) if err != nil { return nil, err } timeouts.Delete = seconds } if customTimeouts.Update != "" { seconds, err := generateTimeoutInSeconds(customTimeouts.Update) if err != nil { return nil, err } timeouts.Update = seconds } } goal := resource.NewGoal(t, name, custom, props, parent, protect, rawDependencies, providerRef.String(), nil, rawPropertyDependencies, opts.DeleteBeforeReplace, ignoreChanges, additionalSecretKeys, parsedAliases, id, &timeouts, replaceOnChanges, retainOnDelete, deletedWith, sourcePosition, ) if goal.Parent != "" { rm.resGoalsLock.Lock() parentGoal, ok := rm.resGoals[goal.Parent] if ok { goal = inheritFromParent(*goal, parentGoal) } rm.resGoalsLock.Unlock() } // Send the goal state to the engine. step := ®isterResourceEvent{ goal: goal, done: make(chan *RegisterResult), } select { case rm.regChan <- step: case <-rm.cancel: logging.V(5).Infof("ResourceMonitor.RegisterResource operation canceled, name=%s", name) return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource registration") } // Now block waiting for the operation to finish. select { case result = <-step.done: case <-rm.cancel: logging.V(5).Infof("ResourceMonitor.RegisterResource operation canceled, name=%s", name) return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on step's done channel") } if result != nil && result.Result != ResultStateSuccess && !req.GetSupportsResultReporting() { return nil, rpcerror.New(codes.Internal, "resource registration failed") } if result != nil && result.State != nil && result.State.URN != "" { rm.resGoalsLock.Lock() rm.resGoals[result.State.URN] = *goal rm.resGoalsLock.Unlock() } } if result != nil && result.State != nil && result.State.URN != "" { // We've got a safe URN now, save the parent and transformations func() { rm.parentsLock.Lock() defer rm.parentsLock.Unlock() rm.parents[result.State.URN] = parent }() func() { rm.resourceTransformsLock.Lock() defer rm.resourceTransformsLock.Unlock() rm.resourceTransforms[result.State.URN] = transforms }() if !custom { func() { rm.componentProvidersLock.Lock() defer rm.componentProvidersLock.Unlock() rm.componentProviders[result.State.URN] = opts.GetProviders() }() } } // Filter out partially-known values if the requestor does not support them. outputs := result.State.Outputs // Local ComponentResources may contain unresolved resource refs, so ignore those outputs. if !req.GetCustom() && !remote { // In the case of a SameStep, the old resource outputs are returned to the language host after the step is // executed. The outputs of a ComponentResource may depend on resources that have not been registered at the // time the ComponentResource is itself registered, as the outputs are set by a later call to // RegisterResourceOutputs. Therefore, when the SameStep returns the old resource outputs for a // ComponentResource, it may return references to resources that have not yet been registered, which will cause // the SDK's calls to getResource to fail when it attempts to resolve those references. // // Work on a more targeted fix is tracked in https://github.com/pulumi/pulumi/issues/5978 outputs = resource.PropertyMap{} } if !req.GetSupportsPartialValues() { logging.V(5).Infof("stripping unknowns from RegisterResource response for urn %v", result.State.URN) filtered := resource.PropertyMap{} for k, v := range outputs { if !v.ContainsUnknowns() { filtered[k] = v } } outputs = filtered } // TODO(@platform): // Currently component resources ignore these options: // • ignoreChanges // • customTimeouts // • additionalSecretOutputs // • replaceOnChanges // • retainOnDelete // • deletedWith // Revisit these semantics in Pulumi v4.0 // See this issue for more: https://github.com/pulumi/pulumi/issues/9704 if !custom { rm.checkComponentOption(result.State.URN, "ignoreChanges", func() bool { return len(ignoreChanges) > 0 }) rm.checkComponentOption(result.State.URN, "customTimeouts", func() bool { if customTimeouts == nil { return false } hasUpdateTimeout := customTimeouts.Update != "" hasCreateTimeout := customTimeouts.Create != "" hasDeleteTimeout := customTimeouts.Delete != "" return hasCreateTimeout || hasUpdateTimeout || hasDeleteTimeout }) rm.checkComponentOption(result.State.URN, "additionalSecretOutputs", func() bool { return len(additionalSecretOutputs) > 0 }) rm.checkComponentOption(result.State.URN, "replaceOnChanges", func() bool { return len(replaceOnChanges) > 0 }) rm.checkComponentOption(result.State.URN, "retainOnDelete", func() bool { return retainOnDelete }) rm.checkComponentOption(result.State.URN, "deletedWith", func() bool { return deletedWith != "" }) } logging.V(5).Infof( "ResourceMonitor.RegisterResource operation finished: t=%v, urn=%v, #outs=%v", result.State.Type, result.State.URN, len(outputs)) // Finally, unpack the response into properties that we can return to the language runtime. This mostly includes // an ID, URN, and defaults and output properties that will all be blitted back onto the runtime object. obj, err := plugin.MarshalProperties(outputs, plugin.MarshalOptions{ Label: label, KeepUnknowns: true, KeepSecrets: req.GetAcceptSecrets(), KeepResources: req.GetAcceptResources(), WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, err } // Assert that we never leak the unconfigured provider ID to the language host. contract.Assertf( !providers.IsProviderType(result.State.Type) || result.State.ID != providers.UnconfiguredID, "provider resource %s has unconfigured ID", result.State.URN) reason := pulumirpc.Result_SUCCESS if result.Result == ResultStateSkipped { reason = pulumirpc.Result_SKIP } else if result.Result == ResultStateFailed { reason = pulumirpc.Result_FAIL } return &pulumirpc.RegisterResourceResponse{ Urn: string(result.State.URN), Id: string(result.State.ID), Object: obj, PropertyDependencies: outputDeps, Result: reason, }, nil } // checkComponentOption generates a warning message on the resource // 'urn' if 'check' returns true. // This function is intended to validate options passed to component resources, // so urn is expected to refer to a component. func (rm *resmon) checkComponentOption(urn resource.URN, optName string, check func() bool) { if check() { logging.V(10).Infof("The option '%s' has no automatic effect on component resource '%s', "+ "ensure it is handled correctly in the component code.", optName, urn) } } // RegisterResourceOutputs records some new output properties for a resource that have arrived after its initial // provisioning. These will make their way into the eventual checkpoint state file for that resource. func (rm *resmon) RegisterResourceOutputs(ctx context.Context, req *pulumirpc.RegisterResourceOutputsRequest, ) (*emptypb.Empty, error) { // Obtain and validate the message's inputs (a URN plus the output property map). urn, err := resource.ParseURN(req.Urn) if err != nil { return nil, fmt.Errorf("invalid resource URN: %w", err) } label := fmt.Sprintf("ResourceMonitor.RegisterResourceOutputs(%s)", urn) outs, err := plugin.UnmarshalProperties( req.GetOutputs(), plugin.MarshalOptions{ Label: label, KeepUnknowns: true, ComputeAssetHashes: true, KeepSecrets: true, KeepResources: true, WorkingDirectory: rm.workingDirectory, }) if err != nil { return nil, fmt.Errorf("cannot unmarshal output properties: %w", err) } logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs received: urn=%v, #outs=%v", urn, len(outs)) // Now send the step over to the engine to perform. step := ®isterResourceOutputsEvent{ urn: urn, outputs: outs, done: make(chan bool), } select { case rm.regOutChan <- step: case <-rm.cancel: logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs operation canceled, urn=%s", urn) return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource outputs") } // Now block waiting for the operation to finish. select { case <-step.done: case <-rm.cancel: logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs operation canceled, urn=%s", urn) return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on output step's done channel") } logging.V(5).Infof( "ResourceMonitor.RegisterResourceOutputs operation finished: urn=%v, #outs=%v", urn, len(outs)) return &emptypb.Empty{}, nil } type registerResourceEvent struct { goal *resource.Goal // the resource goal state produced by the iterator. done chan *RegisterResult // the channel to communicate with after the resource state is available. } var _ RegisterResourceEvent = (*registerResourceEvent)(nil) func (g *registerResourceEvent) event() {} func (g *registerResourceEvent) Goal() *resource.Goal { return g.goal } func (g *registerResourceEvent) Done(result *RegisterResult) { // Communicate the resulting state back to the RPC thread, which is parked awaiting our reply. g.done <- result } type registerResourceOutputsEvent struct { urn resource.URN // the URN to which this completion applies. outputs resource.PropertyMap // an optional property bag for output properties. done chan bool // the channel to communicate with after the operation completes. } var _ RegisterResourceOutputsEvent = (*registerResourceOutputsEvent)(nil) func (g *registerResourceOutputsEvent) event() {} func (g *registerResourceOutputsEvent) URN() resource.URN { return g.urn } func (g *registerResourceOutputsEvent) Outputs() resource.PropertyMap { return g.outputs } func (g *registerResourceOutputsEvent) Done() { // Communicate the resulting state back to the RPC thread, which is parked awaiting our reply. g.done <- true } type readResourceEvent struct { id resource.ID name string baseType tokens.Type provider string parent resource.URN props resource.PropertyMap dependencies []resource.URN additionalSecretOutputs []resource.PropertyKey sourcePosition string done chan *ReadResult } var _ ReadResourceEvent = (*readResourceEvent)(nil) func (g *readResourceEvent) event() {} func (g *readResourceEvent) ID() resource.ID { return g.id } func (g *readResourceEvent) Name() string { return g.name } func (g *readResourceEvent) Type() tokens.Type { return g.baseType } func (g *readResourceEvent) Provider() string { return g.provider } func (g *readResourceEvent) Parent() resource.URN { return g.parent } func (g *readResourceEvent) Properties() resource.PropertyMap { return g.props } func (g *readResourceEvent) Dependencies() []resource.URN { return g.dependencies } func (g *readResourceEvent) AdditionalSecretOutputs() []resource.PropertyKey { return g.additionalSecretOutputs } func (g *readResourceEvent) SourcePosition() string { return g.sourcePosition } func (g *readResourceEvent) Done(result *ReadResult) { g.done <- result } func generateTimeoutInSeconds(timeout string) (float64, error) { duration, err := time.ParseDuration(timeout) if err != nil { return 0, fmt.Errorf("unable to parse customTimeout Value %s", timeout) } return duration.Seconds(), nil } func decorateResourceSpans(span opentracing.Span, method string, req, resp interface{}, grpcError error) { if req == nil { return } switch method { case "/pulumirpc.ResourceMonitor/Invoke": span.SetTag("pulumi-decorator", req.(*pulumirpc.ResourceInvokeRequest).Tok) case "/pulumirpc.ResourceMonitor/ReadResource": span.SetTag("pulumi-decorator", req.(*pulumirpc.ReadResourceRequest).Type) case "/pulumirpc.ResourceMonitor/RegisterResource": span.SetTag("pulumi-decorator", req.(*pulumirpc.RegisterResourceRequest).Type) } } // downgradeOutputValues recursively replaces all Output values with `Computed`, `Secret`, or their plain // value. This loses all dependency information. func downgradeOutputValues(v resource.PropertyMap) resource.PropertyMap { var downgradeOutputPropertyValue func(v resource.PropertyValue) resource.PropertyValue downgradeOutputPropertyValue = func(v resource.PropertyValue) resource.PropertyValue { if v.IsOutput() { output := v.OutputValue() var result resource.PropertyValue if output.Known { result = downgradeOutputPropertyValue(output.Element) } else { result = resource.MakeComputed(resource.NewStringProperty("")) } if output.Secret { result = resource.MakeSecret(result) } return result } if v.IsObject() { return resource.NewObjectProperty(downgradeOutputValues(v.ObjectValue())) } if v.IsArray() { result := make([]resource.PropertyValue, len(v.ArrayValue())) for i, elem := range v.ArrayValue() { result[i] = downgradeOutputPropertyValue(elem) } return resource.NewArrayProperty(result) } if v.IsSecret() { return resource.MakeSecret(downgradeOutputPropertyValue(v.SecretValue().Element)) } if v.IsResourceReference() { ref := v.ResourceReferenceValue() return resource.NewResourceReferenceProperty( resource.ResourceReference{ URN: ref.URN, ID: downgradeOutputPropertyValue(ref.ID), PackageVersion: ref.PackageVersion, }) } return v } result := make(resource.PropertyMap) for k, pv := range v { result[k] = downgradeOutputPropertyValue(pv) } return result } func upgradeOutputValues( v resource.PropertyMap, propertyDependencies map[resource.PropertyKey]mapset.Set[resource.URN], ) resource.PropertyMap { // We assume that by the time this is being called we've upgraded all Secret/Computed values to outputs. We just // need to add the dependency information from propertyDependencies. result := make(resource.PropertyMap) for k, pv := range v { if deps, has := propertyDependencies[k]; has { currentDeps := mapset.NewSet[resource.URN]() addOutputDependencies(currentDeps, pv) if currentDeps.IsSuperset(deps) { // already has the deps, just copy across result[k] = pv } else { var output resource.Output if pv.IsOutput() { output = pv.OutputValue() } else { output = resource.Output{ Element: pv, Known: true, } } // Merge all the dependencies from the propertyDependencies map with any current dependencies on this // output value. currentDeps.Clear() currentDeps.Append(output.Dependencies...) currentDeps = currentDeps.Union(deps) output.Dependencies = currentDeps.ToSlice() result[k] = resource.NewOutputProperty(output) } } else { // no deps just copy across result[k] = pv } } return result } func extendOutputDependencies(deps []resource.URN, v resource.PropertyValue) []resource.URN { set := mapset.NewSet(deps...) addOutputDependencies(set, v) return set.ToSlice() } func addOutputDependencies(deps mapset.Set[resource.URN], v resource.PropertyValue) { if v.IsOutput() { output := v.OutputValue() if output.Known { addOutputDependencies(deps, output.Element) } deps.Append(output.Dependencies...) } if v.IsResourceReference() { ref := v.ResourceReferenceValue() addOutputDependencies(deps, ref.ID) } if v.IsObject() { for _, elem := range v.ObjectValue() { addOutputDependencies(deps, elem) } } if v.IsArray() { for _, elem := range v.ArrayValue() { addOutputDependencies(deps, elem) } } if v.IsSecret() { addOutputDependencies(deps, v.SecretValue().Element) } }