// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deploy

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"net/url"
	"path/filepath"
	"reflect"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/blang/semver"
	"github.com/google/uuid"
	"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
	opentracing "github.com/opentracing/opentracing-go"
	"google.golang.org/protobuf/types/known/emptypb"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/metadata"
	"google.golang.org/protobuf/proto"

	"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
	interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
	"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
	"github.com/pulumi/pulumi/sdk/v3/go/common/env"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
	"github.com/pulumi/pulumi/sdk/v3/go/common/slice"
	"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/result"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
	"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
	pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"

	mapset "github.com/deckarep/golang-set/v2"
)

// EvalRunInfo provides information required to execute and deploy resources within a package.
type EvalRunInfo struct {
	// the package metadata.
	Proj *workspace.Project `json:"proj" yaml:"proj"`
	// the package's working directory.
	Pwd string `json:"pwd" yaml:"pwd"`
	// the path to the program.
	Program string `json:"program" yaml:"program"`
	// the path to the project's directory.
	ProjectRoot string `json:"projectRoot,omitempty" yaml:"projectRoot,omitempty"`
	// any arguments to pass to the package.
	Args []string `json:"args,omitempty" yaml:"args,omitempty"`
	// the target being deployed into.
	Target *Target `json:"target,omitempty" yaml:"target,omitempty"`
}

// EvalRunInfoOptions provides options for configuring an evaluation source.
type EvalSourceOptions struct {
	// true if the evaluation is producing resources for a dry-run/preview.
	DryRun bool
	// the degree of parallelism for resource operations (<=1 for serial).
	Parallel int
	// true to disable resource reference support.
	DisableResourceReferences bool
	// true to disable output value support.
	DisableOutputValues bool
}

// NewEvalSource returns a planning source that fetches resources by evaluating a package with a set of args and
// a confgiuration map.  This evaluation is performed using the given plugin context and may optionally use the
// given plugin host (or the default, if this is nil).  Note that closing the eval source also closes the host.
func NewEvalSource(
	plugctx *plugin.Context,
	runinfo *EvalRunInfo,
	defaultProviderInfo map[tokens.Package]workspace.PluginSpec,
	opts EvalSourceOptions,
) Source {
	return &evalSource{
		plugctx:             plugctx,
		runinfo:             runinfo,
		defaultProviderInfo: defaultProviderInfo,
		opts:                opts,
	}
}

type evalSource struct {
	plugctx             *plugin.Context                         // the plugin context.
	runinfo             *EvalRunInfo                            // the directives to use when running the program.
	defaultProviderInfo map[tokens.Package]workspace.PluginSpec // the default provider versions for this source.
	opts                EvalSourceOptions                       // options for the evaluation source.
}

func (src *evalSource) Close() error {
	return nil
}

// Project is the name of the project being run by this evaluation source.
func (src *evalSource) Project() tokens.PackageName {
	return src.runinfo.Proj.Name
}

// Stack is the name of the stack being targeted by this evaluation source.
func (src *evalSource) Stack() tokens.StackName {
	return src.runinfo.Target.Name
}

func (src *evalSource) Info() interface{} { return src.runinfo }

// Iterate will spawn an evaluator coroutine and prepare to interact with it on subsequent calls to Next.
func (src *evalSource) Iterate(ctx context.Context, providers ProviderSource) (SourceIterator, error) {
	tracingSpan := opentracing.SpanFromContext(ctx)

	// Decrypt the configuration.
	config, err := src.runinfo.Target.Config.Decrypt(src.runinfo.Target.Decrypter)
	if err != nil {
		return nil, fmt.Errorf("failed to decrypt config: %w", err)
	}

	// Keep track of any config keys that have secure values.
	configSecretKeys := src.runinfo.Target.Config.SecureKeys()

	configMap, err := src.runinfo.Target.Config.AsDecryptedPropertyMap(ctx, src.runinfo.Target.Decrypter)
	if err != nil {
		return nil, fmt.Errorf("failed to convert config to map: %w", err)
	}

	// First, fire up a resource monitor that will watch for and record resource creation.
	regChan := make(chan *registerResourceEvent)
	regOutChan := make(chan *registerResourceOutputsEvent)
	regReadChan := make(chan *readResourceEvent)

	mon, err := newResourceMonitor(
		src, providers, regChan, regOutChan, regReadChan, config, configSecretKeys, tracingSpan)
	if err != nil {
		return nil, fmt.Errorf("failed to start resource monitor: %w", err)
	}

	// Create a new iterator with appropriate channels, and gear up to go!
	iter := &evalSourceIterator{
		mon:         mon,
		src:         src,
		regChan:     regChan,
		regOutChan:  regOutChan,
		regReadChan: regReadChan,
		finChan:     make(chan error),
	}

	// Now invoke Run in a goroutine.  All subsequent resource creation events will come in over the gRPC channel,
	// and we will pump them through the channel.  If the Run call ultimately fails, we need to propagate the error.
	iter.forkRun(config, configSecretKeys, configMap)

	// Finally, return the fresh iterator that the caller can use to take things from here.
	return iter, nil
}

type evalSourceIterator struct {
	mon         SourceResourceMonitor              // the resource monitor, per iterator.
	src         *evalSource                        // the owning eval source object.
	regChan     chan *registerResourceEvent        // the channel that contains resource registrations.
	regOutChan  chan *registerResourceOutputsEvent // the channel that contains resource completions.
	regReadChan chan *readResourceEvent            // the channel that contains read resource requests.
	finChan     chan error                         // the channel that communicates completion.
	done        bool                               // set to true when the evaluation is done.
}

func (iter *evalSourceIterator) Close() error {
	// Cancel the monitor and reclaim any associated resources.
	return iter.mon.Cancel()
}

func (iter *evalSourceIterator) ResourceMonitor() SourceResourceMonitor {
	return iter.mon
}

func (iter *evalSourceIterator) Next() (SourceEvent, error) {
	// If we are done, quit.
	if iter.done {
		return nil, nil
	}

	// Await the program to compute some more state and then inspect what it has to say.
	select {
	case reg := <-iter.regChan:
		contract.Assertf(reg != nil, "received a nil registerResourceEvent")
		goal := reg.Goal()
		logging.V(5).Infof("EvalSourceIterator produced a registration: t=%v,name=%v,#props=%v",
			goal.Type, goal.Name, len(goal.Properties))
		return reg, nil
	case regOut := <-iter.regOutChan:
		contract.Assertf(regOut != nil, "received a nil registerResourceOutputsEvent")
		logging.V(5).Infof("EvalSourceIterator produced a completion: urn=%v,#outs=%v",
			regOut.URN(), len(regOut.Outputs()))
		return regOut, nil
	case read := <-iter.regReadChan:
		contract.Assertf(read != nil, "received a nil readResourceEvent")
		logging.V(5).Infoln("EvalSourceIterator produced a read")
		return read, nil
	case err := <-iter.finChan:
		// If we are finished, we can safely exit.  The contract with the language provider is that this implies
		// that the language runtime has exited and so calling Close on the plugin is fine.
		iter.done = true
		if err != nil {
			if result.IsBail(err) {
				logging.V(5).Infof("EvalSourceIterator ended with bail.")
			} else {
				logging.V(5).Infof("EvalSourceIterator ended with an error: %v", err)
			}
		}
		return nil, err
	}
}

// forkRun performs the evaluation from a distinct goroutine. This function blocks until it's our turn to go.
func (iter *evalSourceIterator) forkRun(
	config map[config.Key]string,
	configSecretKeys []config.Key,
	configPropertyMap resource.PropertyMap,
) {
	// Fire up the goroutine to make the RPC invocation against the language runtime.  As this executes, calls
	// to queue things up in the resource channel will occur, and we will serve them concurrently.
	go func() {
		// Next, launch the language plugin.
		run := func() error {
			rt := iter.src.runinfo.Proj.Runtime.Name()

			rtopts := iter.src.runinfo.Proj.Runtime.Options()
			programInfo := plugin.NewProgramInfo(
				/* rootDirectory */ iter.src.runinfo.ProjectRoot,
				/* programDirectory */ iter.src.runinfo.Pwd,
				/* entryPoint */ iter.src.runinfo.Program,
				/* options */ rtopts)

			langhost, err := iter.src.plugctx.Host.LanguageRuntime(rt, programInfo)
			if err != nil {
				return fmt.Errorf("failed to launch language host %s: %w", rt, err)
			}
			contract.Assertf(langhost != nil, "expected non-nil language host %s", rt)

			// Now run the actual program.
			progerr, bail, err := langhost.Run(plugin.RunInfo{
				MonitorAddress:    iter.mon.Address(),
				Stack:             iter.src.runinfo.Target.Name.String(),
				Project:           string(iter.src.runinfo.Proj.Name),
				Pwd:               iter.src.runinfo.Pwd,
				Args:              iter.src.runinfo.Args,
				Config:            config,
				ConfigSecretKeys:  configSecretKeys,
				ConfigPropertyMap: configPropertyMap,
				DryRun:            iter.src.opts.DryRun,
				Parallel:          iter.src.opts.Parallel,
				Organization:      string(iter.src.runinfo.Target.Organization),
				Info:              programInfo,
			})

			// Check if we were asked to Bail.  This a special random constant used for that
			// purpose.
			if err == nil && bail {
				return result.BailErrorf("run bailed")
			}

			if err == nil && progerr != "" {
				// If the program had an unhandled error; propagate it to the caller.
				err = fmt.Errorf("an unhandled error occurred: %v", progerr)
			}
			return err
		}

		// Communicate the error, if it exists, or nil if the program exited cleanly.
		iter.finChan <- run()
	}()
}

// defaultProviders manages the registration of default providers. The default provider for a package is the provider
// resource that will be used to manage resources that do not explicitly reference a provider. Default providers will
// only be registered for packages that are used by resources registered by the user's Pulumi program.
type defaultProviders struct {
	// A map of package identifiers to versions, used to disambiguate which plugin to load if no version is provided
	// by the language host.
	defaultProviderInfo map[tokens.Package]workspace.PluginSpec

	// A map of ProviderRequest strings to provider references, used to keep track of the set of default providers that
	// have already been loaded.
	providers map[string]providers.Reference
	config    plugin.ConfigSource

	requests        chan defaultProviderRequest
	providerRegChan chan<- *registerResourceEvent
	cancel          <-chan bool
}

type defaultProviderResponse struct {
	ref providers.Reference
	err error
}

type defaultProviderRequest struct {
	req      providers.ProviderRequest
	response chan<- defaultProviderResponse
}

func (d *defaultProviders) normalizeProviderRequest(req providers.ProviderRequest) providers.ProviderRequest {
	// Request that the engine instantiate a specific version of this provider, if one was requested. We'll figure out
	// what version to request by:
	//   1. Providing the Version field of the ProviderRequest verbatim, if it was provided, otherwise
	//   2. Querying the list of default versions provided to us on startup and returning the value associated with
	//      the given package, if one exists, otherwise
	//   3. We give nothing to the engine and let the engine figure it out.
	//
	// As we tighen up our approach to provider versioning, 2 and 3 will go away and be replaced entirely by 1. 3 is
	// especially onerous because the engine selects the "newest" plugin available on the machine, which is generally
	// problematic for a lot of reasons.
	if req.Version() != nil {
		logging.V(5).Infof("normalizeProviderRequest(%s): using version %s from request", req, req.Version())
	} else {
		if version := d.defaultProviderInfo[req.Package()].Version; version != nil {
			logging.V(5).Infof("normalizeProviderRequest(%s): default version hit on version %s", req, version)
			req = providers.NewProviderRequest(
				req.Package(), version, req.PluginDownloadURL(), req.PluginChecksums(), req.Parameterization())
		} else {
			logging.V(5).Infof(
				"normalizeProviderRequest(%s): default provider miss, sending nil version to engine", req)
		}
	}

	if req.PluginDownloadURL() != "" {
		logging.V(5).Infof("normalizeProviderRequest(%s): using pluginDownloadURL %s from request",
			req, req.PluginDownloadURL())
	} else {
		if pluginDownloadURL := d.defaultProviderInfo[req.Package()].PluginDownloadURL; pluginDownloadURL != "" {
			logging.V(5).Infof("normalizeProviderRequest(%s): default pluginDownloadURL hit on %s",
				req, pluginDownloadURL)
			req = providers.NewProviderRequest(
				req.Package(), req.Version(), pluginDownloadURL, req.PluginChecksums(), req.Parameterization())
		} else {
			logging.V(5).Infof(
				"normalizeProviderRequest(%s): default pluginDownloadURL miss, sending empty string to engine", req)
		}
	}

	if req.PluginChecksums() != nil {
		logging.V(5).Infof("normalizeProviderRequest(%s): using pluginChecksums %v from request",
			req, req.PluginChecksums())
	} else {
		if pluginChecksums := d.defaultProviderInfo[req.Package()].Checksums; pluginChecksums != nil {
			logging.V(5).Infof("normalizeProviderRequest(%s): default pluginChecksums hit on %v",
				req, pluginChecksums)
			req = providers.NewProviderRequest(
				req.Package(), req.Version(), req.PluginDownloadURL(), pluginChecksums, req.Parameterization())
		} else {
			logging.V(5).Infof(
				"normalizeProviderRequest(%s): default pluginChecksums miss, sending empty map to engine", req)
		}
	}

	if req.Parameterization() != nil {
		logging.V(5).Infof("normalizeProviderRequest(%s): default parameterization miss, sending nil to engine", req)
	} else {
		logging.V(5).Infof("normalizeProviderRequest(%s): using parameterization %v from request",
			req, req.Parameterization())

		// TODO: Should Parameterization be in defaultProviderInfo
		//if parameterization := d.defaultProviderInfo[req.Package()].Parameterization; parameterization != nil {
		//	logging.V(5).Infof("normalizeProviderRequest(%s): default parameterization hit on %v",
		//		req, parameterization)
		//	req = providers.NewProviderRequest(
		//  	req.Version(), req.Package(), req.PluginDownloadURL(), req.PluginChecksums(), parameterization)
		//} else {
		//	logging.V(5).Infof(
		//		"normalizeProviderRequest(%s): default parameterization miss, sending nil to engine", req)
		//}
	}

	return req
}

// newRegisterDefaultProviderEvent creates a RegisterResourceEvent and completion channel that can be sent to the
// engine to register a default provider resource for the indicated package.
func (d *defaultProviders) newRegisterDefaultProviderEvent(
	req providers.ProviderRequest,
) (*registerResourceEvent, <-chan *RegisterResult, error) {
	// Attempt to get the config for the package.
	inputs, err := d.config.GetPackageConfig(req.Package())
	if err != nil {
		return nil, nil, err
	}
	if req.Version() != nil {
		providers.SetProviderVersion(inputs, req.Version())
	}
	if req.PluginDownloadURL() != "" {
		providers.SetProviderURL(inputs, req.PluginDownloadURL())
	}
	if req.PluginChecksums() != nil {
		providers.SetProviderChecksums(inputs, req.PluginChecksums())
	}
	if req.Parameterization() != nil {
		providers.SetProviderName(inputs, req.Name())
		providers.SetProviderParameterization(inputs, req.Parameterization())
	}

	// Create the result channel and the event.
	done := make(chan *RegisterResult)
	event := &registerResourceEvent{
		goal: resource.NewGoal(
			providers.MakeProviderType(req.Package()),
			req.DefaultName(), true, inputs, "", false, nil, "", nil, nil, nil,
			nil, nil, nil, "", nil, nil, false, "", ""),
		done: done,
	}
	return event, done, nil
}

// handleRequest services a single default provider request. If the request is for a default provider that we have
// already loaded, we will return its reference. If the request is for a default provider that has not yet been
// loaded, we will send a register resource request to the engine, wait for it to complete, and then cache and return
// the reference of the loaded provider.
//
// Note that this function must not be called from two goroutines concurrently; it is the responsibility of d.serve()
// to ensure this.
func (d *defaultProviders) handleRequest(req providers.ProviderRequest) (providers.Reference, error) {
	logging.V(5).Infof("handling default provider request for package %s", req)

	req = d.normalizeProviderRequest(req)

	denyCreation, err := d.shouldDenyRequest(req)
	if err != nil {
		return providers.Reference{}, err
	}
	if denyCreation {
		logging.V(5).Infof("denied default provider request for package %s", req)
		return providers.NewDenyDefaultProvider(string(req.Package().Name())), nil
	}

	// Have we loaded this provider before? Use the existing reference, if so.
	//
	// Note that we are using the request's String as the key for the provider map. Go auto-derives hash and equality
	// functions for aggregates, but the one auto-derived for ProviderRequest does not have the semantics we want. The
	// use of a string key here is hacky but gets us the desired semantics - that ProviderRequest is a tuple of
	// optional value-typed Version and a package.
	ref, ok := d.providers[req.String()]
	if ok {
		return ref, nil
	}

	event, done, err := d.newRegisterDefaultProviderEvent(req)
	if err != nil {
		return providers.Reference{}, err
	}

	select {
	case d.providerRegChan <- event:
	case <-d.cancel:
		return providers.Reference{}, context.Canceled
	}

	logging.V(5).Infof("waiting for default provider for package %s", req)

	var result *RegisterResult
	select {
	case result = <-done:
	case <-d.cancel:
		return providers.Reference{}, context.Canceled
	}

	logging.V(5).Infof("registered default provider for package %s: %s", req, result.State.URN)

	id := result.State.ID
	contract.Assertf(id != "", "default provider for package %s has no ID", req)

	ref, err = providers.NewReference(result.State.URN, id)
	contract.Assertf(err == nil, "could not create provider reference with URN %s and ID %s", result.State.URN, id)
	d.providers[req.String()] = ref

	return ref, nil
}

// If req should be allowed, or if we should prevent the request.
func (d *defaultProviders) shouldDenyRequest(req providers.ProviderRequest) (bool, error) {
	logging.V(9).Infof("checking if %#v should be denied", req)

	if req.Package().Name().String() == "pulumi" {
		logging.V(9).Infof("we always allow %#v through", req)
		return false, nil
	}

	pConfig, err := d.config.GetPackageConfig("pulumi")
	if err != nil {
		return true, err
	}

	denyCreation := false
	if value, ok := pConfig["disable-default-providers"]; ok {
		array := []interface{}{}
		if !value.IsString() {
			return true, errors.New("Unexpected encoding of pulumi:disable-default-providers")
		}
		if value.StringValue() == "" {
			// If the list is provided but empty, we don't encode a empty json
			// list, we just encode the empty string. Check to ensure we don't
			// get parse errors.
			return false, nil
		}
		if err := json.Unmarshal([]byte(value.StringValue()), &array); err != nil {
			return true, fmt.Errorf("Failed to parse %s: %w", value.StringValue(), err)
		}
		for i, v := range array {
			s, ok := v.(string)
			if !ok {
				return true, fmt.Errorf("pulumi:disable-default-providers[%d] must be a string", i)
			}
			barred := strings.TrimSpace(s)
			if barred == "*" || barred == req.Package().Name().String() {
				logging.V(7).Infof("denying %s (star=%t)", req, barred == "*")
				denyCreation = true
				break
			}
		}
	} else {
		logging.V(9).Infof("Did not find a config for 'pulumi'")
	}

	return denyCreation, nil
}

// serve is the primary loop responsible for handling default provider requests.
func (d *defaultProviders) serve() {
	for {
		select {
		case req := <-d.requests:
			// Note that we do not need to handle cancellation when sending the response: every message we receive is
			// guaranteed to have something waiting on the other end of the response channel.
			ref, err := d.handleRequest(req.req)
			req.response <- defaultProviderResponse{ref: ref, err: err}
		case <-d.cancel:
			return
		}
	}
}

// getDefaultProviderRef fetches the provider reference for the default provider for a particular package.
func (d *defaultProviders) getDefaultProviderRef(req providers.ProviderRequest) (providers.Reference, error) {
	response := make(chan defaultProviderResponse)
	select {
	case d.requests <- defaultProviderRequest{req: req, response: response}:
	case <-d.cancel:
		return providers.Reference{}, context.Canceled
	}
	res := <-response
	return res.ref, res.err
}

// A transformation function that can be applied to a resource.
type TransformFunction func(
	ctx context.Context,
	name, typ string, custom bool, parent resource.URN,
	props resource.PropertyMap,
	opts *pulumirpc.TransformResourceOptions,
) (resource.PropertyMap, *pulumirpc.TransformResourceOptions, error)

// A transformation function that can be applied to an invoke.
type TransformInvokeFunction func(
	ctx context.Context, token string, args resource.PropertyMap,
	opts *pulumirpc.TransformInvokeOptions,
) (resource.PropertyMap, *pulumirpc.TransformInvokeOptions, error)

type CallbacksClient struct {
	pulumirpc.CallbacksClient

	conn *grpc.ClientConn
}

func (c *CallbacksClient) Close() error {
	return c.conn.Close()
}

func NewCallbacksClient(conn *grpc.ClientConn) *CallbacksClient {
	return &CallbacksClient{
		CallbacksClient: pulumirpc.NewCallbacksClient(conn),
		conn:            conn,
	}
}

// resmon implements the pulumirpc.ResourceMonitor interface and acts as the gateway between a language runtime's
// evaluation of a program and the internal resource planning and deployment logic.
type resmon struct {
	pulumirpc.UnsafeResourceMonitorServer

	pendingTransforms     map[string][]TransformFunction // pending transformation functions for a constructed resource
	pendingTransformsLock sync.Mutex

	parents     map[resource.URN]resource.URN // map of child URNs to their parent URNs
	parentsLock sync.Mutex

	resGoals               map[resource.URN]resource.Goal     // map of seen URNs and their goals.
	resGoalsLock           sync.Mutex                         // locks the resGoals map.
	diagnostics            diag.Sink                          // logger for user-facing messages
	providers              ProviderSource                     // the provider source itself.
	componentProviders     map[resource.URN]map[string]string // which providers component resources used
	componentProvidersLock sync.Mutex                         // which locks the componentProviders map
	defaultProviders       *defaultProviders                  // the default provider manager.
	sourcePositions        *sourcePositions                   // source position manager.
	constructInfo          plugin.ConstructInfo               // information for construct and call calls.
	regChan                chan *registerResourceEvent        // the channel to send resource registrations to.
	regOutChan             chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
	regReadChan            chan *readResourceEvent            // the channel to send resource reads to.
	cancel                 chan bool                          // a channel that can cancel the server.
	done                   <-chan error                       // a channel that resolves when the server completes.
	opts                   EvalSourceOptions                  // options for the resource monitor.

	// the working directory for the resources sent to this monitor.
	workingDirectory string

	stackTransformsLock       sync.Mutex
	stackTransforms           []TransformFunction // stack transformation functions
	stackInvokeTransformsLock sync.Mutex
	stackInvokeTransforms     []TransformInvokeFunction // invoke transformation functions
	resourceTransformsLock    sync.Mutex
	resourceTransforms        map[resource.URN][]TransformFunction // option transformation functions per resource
	callbacksLock             sync.Mutex
	callbacks                 map[string]*CallbacksClient // callbacks clients per target address

	packageRefLock sync.Mutex
	// A map of UUIDs to the description of a provider package they correspond to
	packageRefMap map[string]providers.ProviderRequest
}

var _ SourceResourceMonitor = (*resmon)(nil)

// newResourceMonitor creates a new resource monitor RPC server.
func newResourceMonitor(
	src *evalSource,
	provs ProviderSource,
	regChan chan *registerResourceEvent,
	regOutChan chan *registerResourceOutputsEvent,
	regReadChan chan *readResourceEvent,
	config map[config.Key]string,
	configSecretKeys []config.Key,
	tracingSpan opentracing.Span,
) (*resmon, error) {
	// Create our cancellation channel.
	cancel := make(chan bool)

	// Create a new default provider manager.
	d := &defaultProviders{
		defaultProviderInfo: src.defaultProviderInfo,
		providers:           make(map[string]providers.Reference),
		config:              src.runinfo.Target,
		requests:            make(chan defaultProviderRequest),
		providerRegChan:     regChan,
		cancel:              cancel,
	}

	// New up an engine RPC server.
	resmon := &resmon{
		diagnostics:        src.plugctx.Diag,
		providers:          provs,
		defaultProviders:   d,
		workingDirectory:   src.runinfo.Pwd,
		sourcePositions:    newSourcePositions(src.runinfo.ProjectRoot),
		pendingTransforms:  map[string][]TransformFunction{},
		parents:            map[resource.URN]resource.URN{},
		resGoals:           map[resource.URN]resource.Goal{},
		componentProviders: map[resource.URN]map[string]string{},
		regChan:            regChan,
		regOutChan:         regOutChan,
		regReadChan:        regReadChan,
		cancel:             cancel,
		opts:               src.opts,
		callbacks:          map[string]*CallbacksClient{},
		resourceTransforms: map[resource.URN][]TransformFunction{},
		packageRefMap:      map[string]providers.ProviderRequest{},
	}

	// Fire up a gRPC server and start listening for incomings.
	handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
		Cancel: resmon.cancel,
		Init: func(srv *grpc.Server) error {
			pulumirpc.RegisterResourceMonitorServer(srv, resmon)
			return nil
		},
		Options: sourceEvalServeOptions(src.plugctx, tracingSpan, env.DebugGRPC.Value()),
	})
	if err != nil {
		return nil, err
	}

	resmon.constructInfo = plugin.ConstructInfo{
		Project:          string(src.runinfo.Proj.Name),
		Stack:            src.runinfo.Target.Name.String(),
		Config:           config,
		ConfigSecretKeys: configSecretKeys,
		DryRun:           src.opts.DryRun,
		Parallel:         src.opts.Parallel,
		MonitorAddress:   fmt.Sprintf("127.0.0.1:%d", handle.Port),
	}
	resmon.done = handle.Done

	go d.serve()

	return resmon, nil
}

// Get or allocate a new grpc client for the given callback address.
func (rm *resmon) GetCallbacksClient(target string) (*CallbacksClient, error) {
	rm.callbacksLock.Lock()
	defer rm.callbacksLock.Unlock()

	if client, has := rm.callbacks[target]; has {
		return client, nil
	}

	conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return nil, err
	}

	client := NewCallbacksClient(conn)
	rm.callbacks[target] = client
	return client, nil
}

// Address returns the address at which the monitor's RPC server may be reached.
func (rm *resmon) Address() string {
	return rm.constructInfo.MonitorAddress
}

// Cancel signals that the engine should be terminated, awaits its termination, and returns any errors that result.
func (rm *resmon) Cancel() error {
	close(rm.cancel)
	errs := []error{<-rm.done}
	for _, client := range rm.callbacks {
		errs = append(errs, client.Close())
	}
	return errors.Join(errs...)
}

func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span, logFile string) []grpc.ServerOption {
	serveOpts := rpcutil.OpenTracingServerInterceptorOptions(
		tracingSpan,
		otgrpc.SpanDecorator(decorateResourceSpans),
	)
	if logFile != "" {
		di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
			LogFile: logFile,
			Mutex:   ctx.DebugTraceMutex,
		})
		if err != nil {
			// ignoring
			return nil
		}
		metadata := map[string]interface{}{
			"mode": "server",
		}
		serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{
			Metadata: metadata,
		})...)
	}
	return serveOpts
}

// getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the
// given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference
// to the default provider for the indicated package.
func (rm *resmon) getProviderReference(defaultProviders *defaultProviders, req providers.ProviderRequest,
	rawProviderRef string,
) (providers.Reference, error) {
	if rawProviderRef != "" {
		// Check if this is a real provider ref (URN::ID) or a package reference (a dashed uuid)
		if strings.Contains(rawProviderRef, "::") {
			ref, err := providers.ParseReference(rawProviderRef)
			if err != nil {
				return providers.Reference{}, fmt.Errorf("could not parse provider reference: %w", err)
			}
			return ref, nil
		}
	}

	ref, err := defaultProviders.getDefaultProviderRef(req)
	if err != nil {
		return providers.Reference{}, err
	}
	return ref, nil
}

// getProviderFromSource fetches the provider plugin for a resource, read, or invoke from the given
// package with the given unparsed provider reference. If the unparsed provider reference is empty,
// this function returns the plugin for the indicated package's default provider.
func (rm *resmon) getProviderFromSource(
	providerSource ProviderSource, defaultProviders *defaultProviders,
	req providers.ProviderRequest, rawProviderRef string,
	token tokens.ModuleMember,
) (plugin.Provider, error) {
	providerRef, err := rm.getProviderReference(defaultProviders, req, rawProviderRef)
	if err != nil {
		return nil, fmt.Errorf("getProviderFromSource: %w", err)
	} else if providers.IsDenyDefaultsProvider(providerRef) {
		msg := diag.GetDefaultProviderDenied("Invoke").Message
		return nil, fmt.Errorf(msg, req.Package(), token)
	}

	provider, ok := providerSource.GetProvider(providerRef)
	if !ok {
		return nil, fmt.Errorf("unknown provider '%v' -> '%v'", rawProviderRef, providerRef)
	}
	return provider, nil
}

func parseProviderRequest(
	pkg tokens.Package, version,
	pluginDownloadURL string, pluginChecksums map[string][]byte,
	parameterization *providers.ProviderParameterization,
) (providers.ProviderRequest, error) {
	if version == "" {
		logging.V(5).Infof("parseProviderRequest(%s): semver version is the empty string", pkg)
		return providers.NewProviderRequest(pkg, nil, pluginDownloadURL, pluginChecksums, parameterization), nil
	}

	parsedVersion, err := semver.Parse(version)
	if err != nil {
		logging.V(5).Infof("parseProviderRequest(%s, %s): semver version string is invalid: %v", pkg, version, err)
		return providers.ProviderRequest{}, err
	}

	url := strings.TrimSuffix(pluginDownloadURL, "/")

	return providers.NewProviderRequest(pkg, &parsedVersion, url, pluginChecksums, parameterization), nil
}

func (rm *resmon) RegisterPackage(ctx context.Context,
	req *pulumirpc.RegisterPackageRequest,
) (*pulumirpc.RegisterPackageResponse, error) {
	logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v)", req)

	name := tokens.Package(req.Name)
	if name == "" {
		return nil, errors.New("package name is empty")
	}

	// First parse the request into a ProviderRequest
	var version *semver.Version
	if req.Version != "" {
		v, err := semver.Parse(req.Version)
		if err != nil {
			return nil, fmt.Errorf("parse package version %s: %w", req.Version, err)
		}
		version = &v
	}
	// Parse the parameterization
	var parameterization *providers.ProviderParameterization
	if req.Parameterization != nil {
		parameterizationVersion, err := semver.Parse(req.Parameterization.Version)
		if err != nil {
			return nil, fmt.Errorf("parse parameter version %s: %w", req.Parameterization.Version, err)
		}

		// RegisterPackageRequest keeps all the plugin information in the root fields "name", "version" etc, while the
		// information about the parameterized package is in the "parameterization" field. Internally in the engine, and
		// for resource state we need to flip that around a bit.
		parameterization = providers.NewProviderParameterization(
			tokens.Package(req.Parameterization.Name),
			parameterizationVersion,
			req.Parameterization.Value,
		)
	}

	pi := providers.NewProviderRequest(
		tokens.Package(req.Name), version, req.DownloadUrl, req.Checksums,
		parameterization)

	rm.packageRefLock.Lock()
	defer rm.packageRefLock.Unlock()

	// See if this package is already registered, else add it to the map.
	for uuid, candidate := range rm.packageRefMap {
		if reflect.DeepEqual(candidate, pi) {
			logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v) matched %s", req, uuid)
			return &pulumirpc.RegisterPackageResponse{Ref: uuid}, nil
		}
	}

	// Wasn't found add it to the map
	uuid := uuid.New().String()
	rm.packageRefMap[uuid] = pi
	logging.V(5).Infof("ResourceMonitor.RegisterPackage(%v) created %s", req, uuid)
	return &pulumirpc.RegisterPackageResponse{Ref: uuid}, nil
}

func (rm *resmon) SupportsFeature(ctx context.Context,
	req *pulumirpc.SupportsFeatureRequest,
) (*pulumirpc.SupportsFeatureResponse, error) {
	hasSupport := false

	// NOTE: DO NOT ADD ANY MORE FEATURES TO THIS LIST
	//
	// Context: https://github.com/pulumi/pulumi-dotnet/pull/88#pullrequestreview-1265714090
	//
	// We shouldn't add any more features to this list, copying strings around codebases is prone to bugs.
	// Rather than adding a new feature here, setup a new SupportsFeatureV2 method, that takes a grpc enum
	// instead. That can then be safely code generated out to each language with no risk of typos.
	//
	// These old features have to stay as is because old engines DO support them, but wouldn't support the new
	// SupportsFeatureV2 method.

	switch req.Id {
	case "secrets":
		hasSupport = true
	case "resourceReferences":
		hasSupport = !rm.opts.DisableResourceReferences
	case "outputValues":
		hasSupport = !rm.opts.DisableOutputValues
	case "aliasSpecs":
		hasSupport = true
	case "deletedWith":
		hasSupport = true
	case "transforms":
		hasSupport = true
	case "invokeTransforms":
		hasSupport = true
	}

	logging.V(5).Infof("ResourceMonitor.SupportsFeature(id: %s) = %t", req.Id, hasSupport)

	return &pulumirpc.SupportsFeatureResponse{
		HasSupport: hasSupport,
	}, nil
}

// Invoke performs an invocation of a member located in a resource provider.
func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {
	// Fetch the token.
	tok := tokens.ModuleMember(req.GetTok())

	label := fmt.Sprintf("ResourceMonitor.Invoke(%s)", tok)
	args, err := plugin.UnmarshalProperties(
		req.GetArgs(), plugin.MarshalOptions{
			Label:            label,
			KeepUnknowns:     true,
			KeepSecrets:      true,
			KeepResources:    true,
			WorkingDirectory: rm.workingDirectory,
		})
	if err != nil {
		return nil, fmt.Errorf("failed to unmarshal %v args: %w", tok, err)
	}

	opts := &pulumirpc.TransformInvokeOptions{
		Provider:          req.GetProvider(),
		Version:           req.GetVersion(),
		PluginDownloadUrl: req.GetPluginDownloadURL(),
		PluginChecksums:   req.GetPluginChecksums(),
	}

	// Lock the invoke transforms and run all of those before loading the provider.
	err = func() error {
		// Function exists to scope the lock
		rm.stackInvokeTransformsLock.Lock()
		defer rm.stackInvokeTransformsLock.Unlock()

		for _, transform := range rm.stackInvokeTransforms {
			newArgs, newOpts, err := transform(ctx, string(tok), args, opts)
			if err != nil {
				return err
			}

			args = newArgs
			opts = newOpts
		}
		return nil
	}()
	if err != nil {
		return nil, err
	}

	// Load up the resource provider if necessary.
	providerReq, err := parseProviderRequest(
		tok.Package(), opts.Version, opts.PluginDownloadUrl, opts.PluginChecksums, nil)
	if err != nil {
		return nil, err
	}

	packageRef := req.GetPackageRef()
	if packageRef != "" {
		var has bool
		providerReq, has = rm.packageRefMap[packageRef]
		if !has {
			return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
		}
	}

	prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, opts.Provider, tok)
	if err != nil {
		return nil, fmt.Errorf("Invoke: %w", err)
	}

	// Do the invoke and then return the arguments.
	logging.V(5).Infof("ResourceMonitor.Invoke received: tok=%v #args=%v", tok, len(args))
	resp, err := prov.Invoke(ctx, plugin.InvokeRequest{
		Tok:  tok,
		Args: args,
	})
	if err != nil {
		return nil, fmt.Errorf("invocation of %v returned an error: %w", tok, err)
	}

	// Respect `AcceptResources` unless `tok` is for the built-in `pulumi:pulumi:getResource` function,
	// in which case always keep resources to maintain the original behavior for older SDKs that are not
	// setting the `AccceptResources` flag.
	keepResources := req.GetAcceptResources()
	if tok == "pulumi:pulumi:getResource" {
		keepResources = true
	}

	mret, err := plugin.MarshalProperties(resp.Properties, plugin.MarshalOptions{
		Label:            label,
		KeepUnknowns:     true,
		KeepSecrets:      true,
		KeepResources:    keepResources,
		WorkingDirectory: rm.workingDirectory,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
	}
	chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(resp.Failures))
	for _, failure := range resp.Failures {
		chkfails = append(chkfails, &pulumirpc.CheckFailure{
			Property: string(failure.Property),
			Reason:   failure.Reason,
		})
	}
	return &pulumirpc.InvokeResponse{Return: mret, Failures: chkfails}, nil
}

// Call dynamically executes a method in the provider associated with a component resource.
func (rm *resmon) Call(ctx context.Context, req *pulumirpc.ResourceCallRequest) (*pulumirpc.CallResponse, error) {
	// Fetch the token and load up the resource provider if necessary.
	tok := tokens.ModuleMember(req.GetTok())
	providerReq, err := parseProviderRequest(
		tok.Package(), req.GetVersion(),
		req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil)
	if err != nil {
		return nil, err
	}

	packageRef := req.GetPackageRef()
	if packageRef != "" {
		var has bool
		providerReq, has = rm.packageRefMap[packageRef]
		if !has {
			return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
		}
	}

	prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider(), tok)
	if err != nil {
		return nil, err
	}

	label := fmt.Sprintf("ResourceMonitor.Call(%s)", tok)

	args, err := plugin.UnmarshalProperties(
		req.GetArgs(), plugin.MarshalOptions{
			Label:                 label,
			KeepUnknowns:          true,
			KeepSecrets:           true,
			KeepResources:         true,
			KeepOutputValues:      true,
			UpgradeToOutputValues: true,
			WorkingDirectory:      rm.workingDirectory,
		})
	if err != nil {
		return nil, fmt.Errorf("failed to unmarshal %v args: %w", tok, err)
	}

	argDependencies := map[resource.PropertyKey][]resource.URN{}
	for name, deps := range req.GetArgDependencies() {
		urns := make([]resource.URN, len(deps.Urns))
		for i, urn := range deps.Urns {
			urn, err := resource.ParseURN(urn)
			if err != nil {
				return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency on argument %d URN: %s", i, err))
			}
			urns[i] = urn
		}
		argDependencies[resource.PropertyKey(name)] = urns
	}

	// If we have output values we can add the dependencies from them to the args dependencies map we send to the provider.
	for key, output := range args {
		argDependencies[key] = extendOutputDependencies(argDependencies[key], output)
	}

	info := plugin.CallInfo{
		Project:        rm.constructInfo.Project,
		Stack:          rm.constructInfo.Stack,
		Config:         rm.constructInfo.Config,
		DryRun:         rm.constructInfo.DryRun,
		Parallel:       rm.constructInfo.Parallel,
		MonitorAddress: rm.constructInfo.MonitorAddress,
	}
	options := plugin.CallOptions{
		ArgDependencies: argDependencies,
	}

	// Do the all and then return the arguments.
	logging.V(5).Infof(
		"ResourceMonitor.Call received: tok=%v #args=%v #info=%v #options=%v", tok, len(args), info, options)
	ret, err := prov.Call(ctx, plugin.CallRequest{
		Tok:     tok,
		Args:    args,
		Info:    info,
		Options: options,
	})
	if err != nil {
		return nil, fmt.Errorf("call of %v returned an error: %w", tok, err)
	}

	if ret.ReturnDependencies == nil {
		ret.ReturnDependencies = map[resource.PropertyKey][]resource.URN{}
	}
	for k, v := range ret.Return {
		ret.ReturnDependencies[k] = extendOutputDependencies(ret.ReturnDependencies[k], v)
	}

	returnDependencies := map[string]*pulumirpc.CallResponse_ReturnDependencies{}
	for name, deps := range ret.ReturnDependencies {
		urns := make([]string, len(deps))
		for i, urn := range deps {
			urns[i] = string(urn)
		}
		returnDependencies[string(name)] = &pulumirpc.CallResponse_ReturnDependencies{Urns: urns}
	}

	mret, err := plugin.MarshalProperties(ret.Return, plugin.MarshalOptions{
		Label:            label,
		KeepUnknowns:     true,
		KeepSecrets:      true,
		KeepResources:    true,
		WorkingDirectory: rm.workingDirectory,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
	}

	chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(ret.Failures))
	for _, failure := range ret.Failures {
		chkfails = append(chkfails, &pulumirpc.CheckFailure{
			Property: string(failure.Property),
			Reason:   failure.Reason,
		})
	}
	return &pulumirpc.CallResponse{Return: mret, ReturnDependencies: returnDependencies, Failures: chkfails}, nil
}

func (rm *resmon) StreamInvoke(
	req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer,
) error {
	tok := tokens.ModuleMember(req.GetTok())
	label := fmt.Sprintf("ResourceMonitor.StreamInvoke(%s)", tok)

	providerReq, err := parseProviderRequest(
		tok.Package(), req.GetVersion(),
		req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil)
	if err != nil {
		return err
	}
	prov, err := rm.getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider(), tok)
	if err != nil {
		return err
	}

	args, err := plugin.UnmarshalProperties(
		req.GetArgs(), plugin.MarshalOptions{
			Label:            label,
			KeepUnknowns:     true,
			KeepSecrets:      true,
			KeepResources:    true,
			WorkingDirectory: rm.workingDirectory,
		})
	if err != nil {
		return fmt.Errorf("failed to unmarshal %v args: %w", tok, err)
	}

	// Synchronously do the StreamInvoke and then return the arguments. This will block until the
	// streaming operation completes!
	logging.V(5).Infof("ResourceMonitor.StreamInvoke received: tok=%v #args=%v", tok, len(args))
	resp, err := prov.StreamInvoke(context.TODO(), plugin.StreamInvokeRequest{
		Tok:  tok,
		Args: args,
		OnNext: func(event resource.PropertyMap) error {
			mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{
				Label:            label,
				KeepUnknowns:     true,
				KeepResources:    req.GetAcceptResources(),
				WorkingDirectory: rm.workingDirectory,
			})
			if err != nil {
				return fmt.Errorf("failed to marshal return: %w", err)
			}

			return stream.Send(&pulumirpc.InvokeResponse{Return: mret})
		},
	})
	if err != nil {
		return fmt.Errorf("streaming invocation of %v returned an error: %w", tok, err)
	}

	chkfails := slice.Prealloc[*pulumirpc.CheckFailure](len(resp.Failures))
	for _, failure := range resp.Failures {
		chkfails = append(chkfails, &pulumirpc.CheckFailure{
			Property: string(failure.Property),
			Reason:   failure.Reason,
		})
	}

	if len(chkfails) > 0 {
		return stream.Send(&pulumirpc.InvokeResponse{Failures: chkfails})
	}
	return nil
}

// ReadResource reads the current state associated with a resource from its provider plugin.
func (rm *resmon) ReadResource(ctx context.Context,
	req *pulumirpc.ReadResourceRequest,
) (*pulumirpc.ReadResourceResponse, error) {
	// Read the basic inputs necessary to identify the plugin.
	t, err := tokens.ParseTypeToken(req.GetType())
	if err != nil {
		return nil, rpcerror.New(codes.InvalidArgument, err.Error())
	}

	name := req.GetName()
	parent, err := resource.ParseOptionalURN(req.GetParent())
	if err != nil {
		return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent URN: %s", err))
	}

	provider := req.GetProvider()
	if !providers.IsProviderType(t) && provider == "" {
		providerReq, err := parseProviderRequest(
			t.Package(), req.GetVersion(),
			req.GetPluginDownloadURL(), req.GetPluginChecksums(), nil)
		if err != nil {
			return nil, err
		}

		packageRef := req.GetPackageRef()
		if packageRef != "" {
			var has bool
			providerReq, has = rm.packageRefMap[packageRef]
			if !has {
				return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
			}
		}

		ref, provErr := rm.defaultProviders.getDefaultProviderRef(providerReq)
		if provErr != nil {
			return nil, provErr
		} else if providers.IsDenyDefaultsProvider(ref) {
			msg := diag.GetDefaultProviderDenied("Read").Message
			return nil, fmt.Errorf(msg, req.GetType(), t)
		}
		provider = ref.String()
	}

	id := resource.ID(req.GetId())
	label := fmt.Sprintf("ResourceMonitor.ReadResource(%s, %s, %s, %s)", id, t, name, provider)
	deps := slice.Prealloc[resource.URN](len(req.GetDependencies()))
	for _, depURN := range req.GetDependencies() {
		urn, err := resource.ParseURN(depURN)
		if err != nil {
			return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency: %s", err))
		}
		deps = append(deps, urn)
	}

	props, err := plugin.UnmarshalProperties(req.GetProperties(), plugin.MarshalOptions{
		Label:            label,
		KeepUnknowns:     true,
		KeepSecrets:      true,
		KeepResources:    true,
		WorkingDirectory: rm.workingDirectory,
	})
	if err != nil {
		return nil, err
	}

	additionalSecretOutputs := slice.Prealloc[resource.PropertyKey](len(req.GetAdditionalSecretOutputs()))
	for _, name := range req.GetAdditionalSecretOutputs() {
		additionalSecretOutputs = append(additionalSecretOutputs, resource.PropertyKey(name))
	}

	event := &readResourceEvent{
		id:                      id,
		name:                    name,
		baseType:                t,
		provider:                provider,
		parent:                  parent,
		props:                   props,
		dependencies:            deps,
		additionalSecretOutputs: additionalSecretOutputs,
		sourcePosition:          rm.sourcePositions.getFromRequest(req),
		done:                    make(chan *ReadResult),
	}
	select {
	case rm.regReadChan <- event:
	case <-rm.cancel:
		logging.V(5).Infof("ResourceMonitor.ReadResource operation canceled, name=%s", name)
		return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource registration")
	}

	// Now block waiting for the operation to finish.
	var result *ReadResult
	select {
	case result = <-event.done:
	case <-rm.cancel:
		logging.V(5).Infof("ResourceMonitor.ReadResource operation canceled, name=%s", name)
		return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on step's done channel")
	}

	contract.Assertf(result != nil, "ReadResource operation returned a nil result")
	marshaled, err := plugin.MarshalProperties(result.State.Outputs, plugin.MarshalOptions{
		Label:            label,
		KeepUnknowns:     true,
		KeepSecrets:      req.GetAcceptSecrets(),
		KeepResources:    req.GetAcceptResources(),
		WorkingDirectory: rm.workingDirectory,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to marshal %s return state: %w", result.State.URN, err)
	}

	return &pulumirpc.ReadResourceResponse{
		Urn:        string(result.State.URN),
		Properties: marshaled,
	}, nil
}

// Wrap the transform callback so the engine can call the callback server, which will then execute the function.  The
// wrapper takes care of all the necessary marshalling and unmarshalling.
func (rm *resmon) wrapTransformCallback(cb *pulumirpc.Callback) (TransformFunction, error) {
	client, err := rm.GetCallbacksClient(cb.Target)
	if err != nil {
		return nil, err
	}

	token := cb.Token
	return func(
		ctx context.Context, name, typ string, custom bool, parent resource.URN,
		props resource.PropertyMap, opts *pulumirpc.TransformResourceOptions,
	) (resource.PropertyMap, *pulumirpc.TransformResourceOptions, error) {
		logging.V(5).Infof("Transform: name=%v type=%v custom=%v parent=%v props=%v opts=%v",
			name, typ, custom, parent, props, opts)

		mopts := plugin.MarshalOptions{
			KeepUnknowns:     true,
			KeepSecrets:      true,
			KeepResources:    true,
			KeepOutputValues: true,
			WorkingDirectory: rm.workingDirectory,
		}

		mprops, err := plugin.MarshalProperties(props, mopts)
		if err != nil {
			return nil, nil, err
		}

		request, err := proto.Marshal(&pulumirpc.TransformRequest{
			Name:       name,
			Type:       typ,
			Custom:     custom,
			Properties: mprops,
			Options:    opts,
		})
		if err != nil {
			return nil, nil, fmt.Errorf("marshaling request: %w", err)
		}

		resp, err := client.Invoke(ctx, &pulumirpc.CallbackInvokeRequest{
			Token:   token,
			Request: request,
		})
		if err != nil {
			logging.V(5).Infof("Transform callback error: %v", err)
			return nil, nil, err
		}

		var response pulumirpc.TransformResponse
		err = proto.Unmarshal(resp.Response, &response)
		if err != nil {
			return nil, nil, fmt.Errorf("unmarshaling response: %w", err)
		}

		newOpts := opts
		if response.Options != nil {
			newOpts = response.Options
		}

		newProps := props
		if response.Properties != nil {
			newProps, err = plugin.UnmarshalProperties(response.Properties, mopts)
			if err != nil {
				return nil, nil, err
			}
		}

		logging.V(5).Infof("Transform: props=%v opts=%v", newProps, newOpts)

		return newProps, newOpts, nil
	}, nil
}

// Wrap the transform callback so the engine can call the callback server, which will then execute the function.  The
// wrapper takes care of all the necessary marshalling and unmarshalling.
func (rm *resmon) wrapInvokeTransformCallback(cb *pulumirpc.Callback) (TransformInvokeFunction, error) {
	client, err := rm.GetCallbacksClient(cb.Target)
	if err != nil {
		return nil, err
	}

	token := cb.Token
	return func(
		ctx context.Context, invokeToken string,
		args resource.PropertyMap, opts *pulumirpc.TransformInvokeOptions,
	) (resource.PropertyMap, *pulumirpc.TransformInvokeOptions, error) {
		logging.V(5).Infof("Invoke transform: token=%v props=%v opts=%v",
			invokeToken, args, opts)

		margs := plugin.MarshalOptions{
			KeepUnknowns:     true,
			KeepSecrets:      true,
			KeepResources:    true,
			KeepOutputValues: true,
			WorkingDirectory: rm.workingDirectory,
		}

		mprops, err := plugin.MarshalProperties(args, margs)
		if err != nil {
			return nil, nil, err
		}

		var request []byte
		request, err = proto.Marshal(&pulumirpc.TransformInvokeRequest{
			Token:   invokeToken,
			Args:    mprops,
			Options: opts,
		})
		if err != nil {
			return nil, nil, fmt.Errorf("marshaling request: %w", err)
		}

		resp, err := client.Invoke(ctx, &pulumirpc.CallbackInvokeRequest{
			Token:   token,
			Request: request,
		})
		if err != nil {
			logging.V(5).Infof("Invoke transform callback error: %v", err)
			return nil, nil, err
		}

		newOpts := opts
		var newProps resource.PropertyMap
		var response pulumirpc.TransformInvokeResponse
		err = proto.Unmarshal(resp.Response, &response)
		if err != nil {
			return nil, nil, fmt.Errorf("unmarshaling response: %w", err)
		}

		if response.Options != nil {
			newOpts = response.Options
		}
		newProps = args
		if response.Args != nil {
			newProps, err = plugin.UnmarshalProperties(response.Args, margs)
			if err != nil {
				return nil, nil, err
			}
		}

		logging.V(5).Infof("Invoke transform: props=%v opts=%v", newProps, newOpts)

		return newProps, newOpts, nil
	}, nil
}

func (rm *resmon) RegisterStackTransform(ctx context.Context, cb *pulumirpc.Callback) (*emptypb.Empty, error) {
	rm.stackTransformsLock.Lock()
	defer rm.stackTransformsLock.Unlock()

	if cb.Target == "" {
		return nil, errors.New("target must be specified")
	}

	wrapped, err := rm.wrapTransformCallback(cb)
	if err != nil {
		return nil, err
	}

	rm.stackTransforms = append(rm.stackTransforms, wrapped)
	return &emptypb.Empty{}, nil
}

func (rm *resmon) RegisterStackInvokeTransform(ctx context.Context, cb *pulumirpc.Callback) (*emptypb.Empty, error) {
	rm.stackInvokeTransformsLock.Lock()
	defer rm.stackInvokeTransformsLock.Unlock()

	if cb.Target == "" {
		return nil, errors.New("target must be specified")
	}

	wrapped, err := rm.wrapInvokeTransformCallback(cb)
	if err != nil {
		return nil, err
	}

	rm.stackInvokeTransforms = append(rm.stackInvokeTransforms, wrapped)
	return &emptypb.Empty{}, nil
}

// inheritFromParent returns a new goal that inherits from the given parent goal.
// Currently only inherits DeletedWith from parent.
func inheritFromParent(child resource.Goal, parent resource.Goal) *resource.Goal {
	goal := child
	if goal.DeletedWith == "" {
		goal.DeletedWith = parent.DeletedWith
	}
	return &goal
}

type sourcePositions struct {
	projectRoot string
}

func newSourcePositions(projectRoot string) *sourcePositions {
	if projectRoot == "" {
		projectRoot = "/"
	} else {
		contract.Assertf(filepath.IsAbs(projectRoot), "projectRoot is not an absolute path")
		projectRoot = filepath.Clean(projectRoot)
	}
	return &sourcePositions{projectRoot: projectRoot}
}

func (s *sourcePositions) parseSourcePosition(raw *pulumirpc.SourcePosition) (string, error) {
	if raw == nil {
		return "", nil
	}

	if raw.Line <= 0 {
		return "", fmt.Errorf("invalid line number %v", raw.Line)
	}

	col := ""
	if raw.Column != 0 {
		if raw.Column < 0 {
			return "", fmt.Errorf("invalid column number %v", raw.Column)
		}
		col = "," + strconv.FormatInt(int64(raw.Column), 10)
	}

	posURL, err := url.Parse(raw.Uri)
	if err != nil {
		return "", err
	}
	if posURL.Scheme != "file" {
		return "", fmt.Errorf("unrecognized scheme %q", posURL.Scheme)
	}

	file := filepath.FromSlash(posURL.Path)
	if !filepath.IsAbs(file) {
		return "", errors.New("source positions must include absolute paths")
	}
	rel, err := filepath.Rel(s.projectRoot, file)
	if err != nil {
		return "", fmt.Errorf("making relative path: %w", err)
	}

	posURL.Scheme = "project"
	posURL.Path = "/" + filepath.ToSlash(rel)
	posURL.Fragment = fmt.Sprintf("%v%s", raw.Line, col)

	return posURL.String(), nil
}

// Allow getFromRequest to accept any gRPC request that has a source position (ReadResourceRequest,
// RegisterResourceRequest, ResourceInvokeRequest, and CallRequest).
type hasSourcePosition interface {
	GetSourcePosition() *pulumirpc.SourcePosition
}

// getFromRequest returns any source position information from an incoming request.
func (s *sourcePositions) getFromRequest(req hasSourcePosition) string {
	pos, err := s.parseSourcePosition(req.GetSourcePosition())
	if err != nil {
		logging.V(5).Infof("parsing source position %#v: %v", req.GetSourcePosition(), err)
		return ""
	}
	return pos
}

// requestFromNodeJS returns true if the request is coming from a Node.js language runtime
// or SDK. This is determined by checking if the request has a "pulumi-runtime" metadata
// header with a value of "nodejs". If no "pulumi-runtime" header is present, then it
// checks if the request has a "user-agent" metadata header that has a value that starts
// with "grpc-node-js/".
func requestFromNodeJS(ctx context.Context) bool {
	if md, hasMetadata := metadata.FromIncomingContext(ctx); hasMetadata {
		// Check for the "pulumi-runtime" header first.
		// We'll always respect this header value when present.
		if runtime, ok := md["pulumi-runtime"]; ok {
			return len(runtime) == 1 && runtime[0] == "nodejs"
		}
		// Otherwise, check the "user-agent" header.
		if ua, ok := md["user-agent"]; ok {
			return len(ua) == 1 && strings.HasPrefix(ua[0], "grpc-node-js/")
		}
	}
	return false
}

// transformAliasForNodeJSCompat transforms the alias from the legacy Node.js values to properly specified values.
func transformAliasForNodeJSCompat(alias *pulumirpc.Alias) *pulumirpc.Alias {
	switch a := alias.Alias.(type) {
	case *pulumirpc.Alias_Spec_:
		// The original implementation in the Node.js SDK did not specify aliases correctly:
		//
		// - It did not set NoParent when it should have, but instead set Parent to empty.
		// - It set NoParent to true and left Parent empty when both the alias and resource had no Parent specified.
		//
		// To maintain compatibility with such versions of the Node.js SDK, we transform these incorrectly
		// specified aliases into properly specified ones that work with this implementation of the engine:
		//
		// - { Parent: "", NoParent: false } -> { Parent: "", NoParent: true }
		// - { Parent: "", NoParent: true }  -> { Parent: "", NoParent: false }
		spec := &pulumirpc.Alias_Spec{
			Name:    a.Spec.Name,
			Type:    a.Spec.Type,
			Stack:   a.Spec.Stack,
			Project: a.Spec.Project,
		}

		switch p := a.Spec.Parent.(type) {
		case *pulumirpc.Alias_Spec_ParentUrn:
			if p.ParentUrn == "" {
				spec.Parent = &pulumirpc.Alias_Spec_NoParent{NoParent: true}
			} else {
				spec.Parent = p
			}
		case *pulumirpc.Alias_Spec_NoParent:
			if p.NoParent {
				spec.Parent = nil
			} else {
				spec.Parent = p
			}
		default:
			spec.Parent = &pulumirpc.Alias_Spec_NoParent{NoParent: true}
		}

		return &pulumirpc.Alias{
			Alias: &pulumirpc.Alias_Spec_{
				Spec: spec,
			},
		}
	}

	return alias
}

func (rm *resmon) resolveProvider(
	provider string, providers map[string]string, parent resource.URN, pkg tokens.Package,
) string {
	if provider != "" {
		return provider
	}
	if prov, ok := providers[string(pkg)]; ok {
		return prov
	}
	if parent != "" {
		rm.componentProvidersLock.Lock()
		defer rm.componentProvidersLock.Unlock()
		if parentsProvider, ok := rm.componentProviders[parent]; ok {
			if prov, ok := parentsProvider[string(pkg)]; ok {
				return prov
			}
		}
	}
	return ""
}

// RegisterResource is invoked by a language process when a new resource has been allocated.
func (rm *resmon) RegisterResource(ctx context.Context,
	req *pulumirpc.RegisterResourceRequest,
) (*pulumirpc.RegisterResourceResponse, error) {
	// Communicate the type, name, and object information to the iterator that is awaiting us.
	name := req.GetName()
	custom := req.GetCustom()
	remote := req.GetRemote()
	parent, err := resource.ParseOptionalURN(req.GetParent())
	if err != nil {
		return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent URN: %s", err))
	}
	id := resource.ID(req.GetImportId())
	sourcePosition := rm.sourcePositions.getFromRequest(req)

	// Custom resources must have a three-part type so that we can 1) identify if they are providers and 2) retrieve the
	// provider responsible for managing a particular resource (based on the type's Package).
	var t tokens.Type
	if custom || remote {
		t, err = tokens.ParseTypeToken(req.GetType())
		if err != nil {
			return nil, rpcerror.New(codes.InvalidArgument, err.Error())
		}
	} else {
		// Component resources may have any format type.
		t = tokens.Type(req.GetType())
	}

	label := fmt.Sprintf("ResourceMonitor.RegisterResource(%s,%s)", t, name)

	// We need to build the full alias spec list here, so we can pass it to transforms.
	aliases := []*pulumirpc.Alias{}
	for _, aliasURN := range req.GetAliasURNs() {
		aliases = append(aliases, &pulumirpc.Alias{Alias: &pulumirpc.Alias_Urn{Urn: aliasURN}})
	}

	// We assume aliases are properly specified. However, if a request hasn't explicitly
	// indicated that it is using properly specified aliases and the request is coming
	// from Node.js, transform the aliases from the incorrect Node.js values to properly
	// specified values, to maintain backward compatibility for users of older Node.js
	// SDKs that aren't sending properly specified aliases.
	transformAliases := !req.GetAliasSpecs() && requestFromNodeJS(ctx)

	for _, aliasObject := range req.GetAliases() {
		if transformAliases {
			aliasObject = transformAliasForNodeJSCompat(aliasObject)
		}
		aliases = append(aliases, aliasObject)
	}

	var deleteBeforeReplace *bool
	// Technically DeleteBeforeReplaceDefined should be used to decided if DeleteBeforeReplace should be looked at or
	// not. However the Go sdk doesn't set Defined so we have a fallback here of respecting this field if either Defined
	// is set or DeleteBeforeReplace is true.
	if req.GetDeleteBeforeReplaceDefined() || req.GetDeleteBeforeReplace() {
		deleteBeforeReplace = &req.DeleteBeforeReplace
	}

	props, err := plugin.UnmarshalProperties(
		req.GetObject(), plugin.MarshalOptions{
			Label:                 label,
			KeepUnknowns:          true,
			ComputeAssetHashes:    true,
			KeepSecrets:           true,
			KeepResources:         true,
			KeepOutputValues:      true,
			UpgradeToOutputValues: true,
			WorkingDirectory:      rm.workingDirectory,
		})
	if err != nil {
		return nil, err
	}

	// Before we pass the props to the transform function we need to ensure that they correctly carry any dependency
	// information.
	dependencies := mapset.NewSet[resource.URN]()
	for _, dependingURN := range req.GetDependencies() {
		urn, err := resource.ParseURN(dependingURN)
		if err != nil {
			return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency URN: %s", err))
		}
		dependencies.Add(urn)
	}

	propertyDependencies := make(map[resource.PropertyKey]mapset.Set[resource.URN])
	if len(req.GetPropertyDependencies()) == 0 && !remote {
		// If this request did not specify property dependencies, treat each property as depending on every resource
		// in the request's dependency list. We don't need to do this when remote is true, because all clients that
		// support remote already support passing property dependencies, so there's no need to backfill here.
		for pk := range props {
			propertyDependencies[pk] = dependencies
		}
	} else {
		// Otherwise, unmarshal the per-property dependency information.
		for pk, pd := range req.GetPropertyDependencies() {
			deps := mapset.NewSet[resource.URN]()
			for _, d := range pd.Urns {
				urn, err := resource.ParseURN(d)
				if err != nil {
					return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency on property %s URN: %s", pk, err))
				}
				deps.Add(urn)
			}
			propertyDependencies[resource.PropertyKey(pk)] = deps
		}
	}

	// If we're running any transforms we need to update all the property values to Outputs to track dependencies.
	if len(req.Transforms) > 0 {
		props = upgradeOutputValues(props, propertyDependencies)
	}

	provider := req.GetProvider()
	if custom || remote {
		provider = rm.resolveProvider(req.GetProvider(), req.GetProviders(), parent, t.Package())
	}

	opts := &pulumirpc.TransformResourceOptions{
		DependsOn:               req.GetDependencies(),
		Protect:                 req.GetProtect(),
		IgnoreChanges:           req.GetIgnoreChanges(),
		ReplaceOnChanges:        req.GetReplaceOnChanges(),
		Version:                 req.GetVersion(),
		Aliases:                 aliases,
		Provider:                provider,
		Providers:               req.GetProviders(),
		CustomTimeouts:          req.GetCustomTimeouts(),
		PluginDownloadUrl:       req.GetPluginDownloadURL(),
		RetainOnDelete:          req.GetRetainOnDelete(),
		DeletedWith:             req.GetDeletedWith(),
		DeleteBeforeReplace:     deleteBeforeReplace,
		AdditionalSecretOutputs: req.GetAdditionalSecretOutputs(),
		PluginChecksums:         req.GetPluginChecksums(),
	}

	// This might be a resource registation for a resource that another process requested to be constructed. If so we'll
	// have saved the pending transforms for this and we should use those rather than what is on the request.
	var transforms []TransformFunction
	pendingKey := fmt.Sprintf("%s::%s::%s", parent, t, name)
	err = func() error {
		rm.pendingTransformsLock.Lock()
		defer rm.pendingTransformsLock.Unlock()

		if pending, ok := rm.pendingTransforms[pendingKey]; ok {
			transforms = pending
		} else {
			transforms, err = slice.MapError(req.Transforms, rm.wrapTransformCallback)
			if err != nil {
				return err
			}
			// We only need to save this for remote calls
			if remote && len(transforms) > 0 {
				rm.pendingTransforms[pendingKey] = transforms
			}
		}
		return nil
	}()
	if err != nil {
		return nil, err
	}

	// Before we calculate anything else run the transformations. First run the transforms for this resource,
	// then it's parents etc etc
	for _, transform := range transforms {
		newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts)
		if err != nil {
			return nil, err
		}
		props = newProps
		opts = newOpts
	}
	// Lookup our parents transformations and run those
	err = func() error {
		// Function exists to scope the lock
		rm.resourceTransformsLock.Lock()
		defer rm.resourceTransformsLock.Unlock()
		rm.parentsLock.Lock()
		defer rm.parentsLock.Unlock()

		current := parent
		for current != "" {
			if transforms, ok := rm.resourceTransforms[current]; ok {
				for _, transform := range transforms {
					newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts)
					if err != nil {
						return err
					}
					props = newProps
					opts = newOpts
				}
			}
			current = rm.parents[current]
		}
		return nil
	}()
	if err != nil {
		return nil, err
	}

	// Then lock the stack transformations and run all of those
	err = func() error {
		// Function exists to scope the lock
		rm.stackTransformsLock.Lock()
		defer rm.stackTransformsLock.Unlock()

		for _, transform := range rm.stackTransforms {
			newProps, newOpts, err := transform(ctx, name, string(t), custom, parent, props, opts)
			if err != nil {
				return err
			}
			props = newProps
			opts = newOpts
		}
		return nil
	}()
	if err != nil {
		return nil, err
	}

	// We handle updating the providers map to include the providers field of the parent if
	// both the current resource and its parent is a component resource.
	func() {
		// Function exists to scope the lock
		rm.componentProvidersLock.Lock()
		defer rm.componentProvidersLock.Unlock()
		if parentsProviders, parentIsComponent := rm.componentProviders[parent]; !custom &&
			parent != "" && parentIsComponent {
			for k, v := range parentsProviders {
				if opts.Providers == nil {
					opts.Providers = map[string]string{}
				}
				if _, ok := opts.Providers[k]; !ok {
					opts.Providers[k] = v
				}
			}
		}
	}()

	var providerRef providers.Reference
	var providerRefs map[string]string

	if custom && !providers.IsProviderType(t) || remote {
		providerReq, err := parseProviderRequest(
			t.Package(), opts.GetVersion(),
			opts.GetPluginDownloadUrl(), opts.GetPluginChecksums(), nil)
		if err != nil {
			return nil, err
		}

		packageRef := req.GetPackageRef()
		if packageRef != "" {
			var has bool
			providerReq, has = rm.packageRefMap[packageRef]
			if !has {
				return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
			}
		}

		providerRef, err = rm.getProviderReference(rm.defaultProviders, providerReq, opts.GetProvider())
		if err != nil {
			return nil, err
		}

		providerRefs = make(map[string]string, len(opts.GetProviders()))
		for name, provider := range opts.GetProviders() {
			ref, err := rm.getProviderReference(rm.defaultProviders, providerReq, provider)
			if err != nil {
				return nil, err
			}
			providerRefs[name] = ref.String()
		}
	}

	parsedAliases := []resource.Alias{}
	for _, aliasObject := range opts.Aliases {
		aliasSpec := aliasObject.GetSpec()
		var alias resource.Alias
		if aliasSpec != nil {
			alias = resource.Alias{
				Name:    aliasSpec.Name,
				Type:    aliasSpec.Type,
				Stack:   aliasSpec.Stack,
				Project: aliasSpec.Project,
			}
			switch parent := aliasSpec.GetParent().(type) {
			case *pulumirpc.Alias_Spec_ParentUrn:
				// Technically an SDK shouldn't set `parent` at all to specify the default parent, but both NodeJS and
				// Python have buggy SDKs that set parent to an empty URN to specify the default parent. We handle this
				// case here to maintain backward compatibility with older SDKs but it would be good to fix this to be
				// strict in V4.
				parentURN, err := resource.ParseOptionalURN(parent.ParentUrn)
				if err != nil {
					return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid parent alias URN: %s", err))
				}
				alias.Parent = parentURN
			case *pulumirpc.Alias_Spec_NoParent:
				alias.NoParent = parent.NoParent
			}
		} else {
			urn, err := resource.ParseURN(aliasObject.GetUrn())
			if err != nil {
				return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid alias URN: %s", err))
			}
			alias = resource.Alias{URN: urn}
		}
		parsedAliases = append(parsedAliases, alias)
	}

	// Reparse the dependency information from any transformation results
	if len(req.Transforms) > 0 {
		dependencies = mapset.NewSet[resource.URN]()
		for _, dependingURN := range opts.DependsOn {
			urn, err := resource.ParseURN(dependingURN)
			if err != nil {
				return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid dependency URN: %s", err))
			}
			dependencies.Add(urn)
		}
		// Now we've run the transforms we can rebuild the property dependency maps. If we have output values we can add the
		// dependencies from them to the dependencies map we send to the provider and save to state.
		propertyDependencies = make(map[resource.PropertyKey]mapset.Set[resource.URN])
		for key, output := range props {
			deps := mapset.NewSet[resource.URN]()
			addOutputDependencies(deps, output)
			propertyDependencies[key] = deps

			// Also add these to the overall dependencies
			dependencies = dependencies.Union(deps)
		}
	} else {
		// If we ran transforms we would have merged all the dependencies togther already, but if we didn't we want to
		// ensure any output values add their dependencies to the dependencies map we send to the provider.
		for key, output := range props {
			if propertyDependencies[key] == nil {
				propertyDependencies[key] = mapset.NewSet[resource.URN]()
			}
			addOutputDependencies(propertyDependencies[key], output)
		}
	}

	rawDependencies := dependencies.ToSlice()
	rawPropertyDependencies := make(map[resource.PropertyKey][]resource.URN)
	for key, deps := range propertyDependencies {
		rawPropertyDependencies[key] = deps.ToSlice()
	}

	if providers.IsProviderType(t) {
		if opts.GetVersion() != "" {
			version, err := semver.Parse(opts.GetVersion())
			if err != nil {
				return nil, fmt.Errorf("%s: passed invalid version: %w", label, err)
			}
			providers.SetProviderVersion(props, &version)
		}
		if opts.GetPluginDownloadUrl() != "" {
			providers.SetProviderURL(props, opts.GetPluginDownloadUrl())
		}

		if req.GetPackageRef() != "" {
			// If the provider resource has a package ref then we need to set all it's input fields as in
			// newRegisterDefaultProviderEvent.
			packageRef := req.GetPackageRef()
			providerReq, has := rm.packageRefMap[packageRef]
			if !has {
				return nil, fmt.Errorf("unknown provider package '%v'", packageRef)
			}

			if providerReq.Version() != nil {
				providers.SetProviderVersion(props, providerReq.Version())
			}
			if providerReq.PluginDownloadURL() != "" {
				providers.SetProviderURL(props, providerReq.PluginDownloadURL())
			}
			if providerReq.PluginChecksums() != nil {
				providers.SetProviderChecksums(props, providerReq.PluginChecksums())
			}
			if providerReq.Parameterization() != nil {
				providers.SetProviderName(props, providerReq.Name())
				providers.SetProviderParameterization(props, providerReq.Parameterization())
			}
		}

		// Make sure that an explicit provider which doesn't specify its plugin gets the
		// same plugin as the default provider for the package.
		defaultProvider, ok := rm.defaultProviders.defaultProviderInfo[providers.GetProviderPackage(t)]
		if ok && opts.GetVersion() == "" && opts.GetPluginDownloadUrl() == "" {
			if defaultProvider.Version != nil {
				providers.SetProviderVersion(props, defaultProvider.Version)
			}
			if defaultProvider.PluginDownloadURL != "" {
				providers.SetProviderURL(props, defaultProvider.PluginDownloadURL)
			}
		}
	}

	protect := opts.Protect
	ignoreChanges := opts.IgnoreChanges
	replaceOnChanges := opts.ReplaceOnChanges
	retainOnDelete := opts.RetainOnDelete
	deletedWith, err := resource.ParseOptionalURN(opts.GetDeletedWith())
	if err != nil {
		return nil, rpcerror.New(codes.InvalidArgument, fmt.Sprintf("invalid DeletedWith URN: %s", err))
	}
	customTimeouts := opts.CustomTimeouts

	additionalSecretOutputs := opts.GetAdditionalSecretOutputs()

	// At this point we're going to forward these properties to the rest of the engine and potentially to providers. As
	// we add features to the code above (most notably transforms) we could end up with more instances of `OutputValue`
	// than the rest of the system historically expects. To minimize the disruption we downgrade `OutputValue`s with no
	// dependencies down to `Computed` and `Secret` or their plain values. We only do this for non-remote resources.
	// Remote resources already deal with `OutputValue`s and even though it would be more consistent to downgrade them
	// here it would be a break change.
	if !remote {
		props = downgradeOutputValues(props)
	}

	logging.V(5).Infof(
		"ResourceMonitor.RegisterResource received: t=%v, name=%v, custom=%v, #props=%v, parent=%v, protect=%v, "+
			"provider=%v, deps=%v, deleteBeforeReplace=%v, ignoreChanges=%v, aliases=%v, customTimeouts=%v, "+
			"providers=%v, replaceOnChanges=%v, retainOnDelete=%v, deletedWith=%v",
		t, name, custom, len(props), parent, protect, providerRef, rawDependencies, opts.DeleteBeforeReplace, ignoreChanges,
		parsedAliases, customTimeouts, providerRefs, replaceOnChanges, retainOnDelete, deletedWith)

	// If this is a remote component, fetch its provider and issue the construct call. Otherwise, register the resource.
	var result *RegisterResult

	var outputDeps map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies
	if remote {
		provider, ok := rm.providers.GetProvider(providerRef)
		if providers.IsDenyDefaultsProvider(providerRef) {
			msg := diag.GetDefaultProviderDenied("").Message
			return nil, fmt.Errorf(msg, t.Package().String(), t.String())
		}
		if !ok {
			return nil, fmt.Errorf("unknown provider '%v'", providerRef)
		}

		// Invoke the provider's Construct RPC method.
		options := plugin.ConstructOptions{
			// We don't actually need to send a list of aliases to construct anymore because the engine does
			// all alias construction.
			Aliases:                 []resource.Alias{},
			Dependencies:            rawDependencies,
			Protect:                 protect,
			PropertyDependencies:    rawPropertyDependencies,
			Providers:               providerRefs,
			AdditionalSecretOutputs: additionalSecretOutputs,
			DeletedWith:             deletedWith,
			IgnoreChanges:           ignoreChanges,
			ReplaceOnChanges:        replaceOnChanges,
			RetainOnDelete:          retainOnDelete,
		}
		if customTimeouts != nil {
			options.CustomTimeouts = &plugin.CustomTimeouts{
				Create: customTimeouts.Create,
				Update: customTimeouts.Update,
				Delete: customTimeouts.Delete,
			}
		}
		if opts.DeleteBeforeReplace != nil {
			options.DeleteBeforeReplace = *opts.DeleteBeforeReplace
		}

		constructResult, err := provider.Construct(ctx, plugin.ConstructRequest{
			Info:    rm.constructInfo,
			Type:    t,
			Name:    name,
			Parent:  parent,
			Inputs:  props,
			Options: options,
		})
		if err != nil {
			return nil, err
		}

		result = &RegisterResult{State: &resource.State{URN: constructResult.URN, Outputs: constructResult.Outputs}}

		// The provider may have returned OutputValues in "Outputs", we need to downgrade them to Computed or
		// Secret but also add them to the outputDeps map.
		if constructResult.OutputDependencies == nil {
			constructResult.OutputDependencies = map[resource.PropertyKey][]resource.URN{}
		}
		for k, v := range result.State.Outputs {
			constructResult.OutputDependencies[k] = extendOutputDependencies(constructResult.OutputDependencies[k], v)
		}

		outputDeps = map[string]*pulumirpc.RegisterResourceResponse_PropertyDependencies{}
		for k, deps := range constructResult.OutputDependencies {
			urns := make([]string, len(deps))
			for i, d := range deps {
				urns[i] = string(d)
			}
			outputDeps[string(k)] = &pulumirpc.RegisterResourceResponse_PropertyDependencies{Urns: urns}
		}

	} else {
		additionalSecretKeys := slice.Prealloc[resource.PropertyKey](len(additionalSecretOutputs))
		for _, name := range additionalSecretOutputs {
			additionalSecretKeys = append(additionalSecretKeys, resource.PropertyKey(name))
		}

		var timeouts resource.CustomTimeouts
		if customTimeouts != nil {
			if customTimeouts.Create != "" {
				seconds, err := generateTimeoutInSeconds(customTimeouts.Create)
				if err != nil {
					return nil, err
				}
				timeouts.Create = seconds
			}
			if customTimeouts.Delete != "" {
				seconds, err := generateTimeoutInSeconds(customTimeouts.Delete)
				if err != nil {
					return nil, err
				}
				timeouts.Delete = seconds
			}
			if customTimeouts.Update != "" {
				seconds, err := generateTimeoutInSeconds(customTimeouts.Update)
				if err != nil {
					return nil, err
				}
				timeouts.Update = seconds
			}
		}

		goal := resource.NewGoal(t, name, custom, props, parent, protect, rawDependencies,
			providerRef.String(), nil, rawPropertyDependencies, opts.DeleteBeforeReplace, ignoreChanges,
			additionalSecretKeys, parsedAliases, id, &timeouts, replaceOnChanges, retainOnDelete, deletedWith,
			sourcePosition,
		)

		if goal.Parent != "" {
			rm.resGoalsLock.Lock()
			parentGoal, ok := rm.resGoals[goal.Parent]
			if ok {
				goal = inheritFromParent(*goal, parentGoal)
			}
			rm.resGoalsLock.Unlock()
		}
		// Send the goal state to the engine.
		step := &registerResourceEvent{
			goal: goal,
			done: make(chan *RegisterResult),
		}

		select {
		case rm.regChan <- step:
		case <-rm.cancel:
			logging.V(5).Infof("ResourceMonitor.RegisterResource operation canceled, name=%s", name)
			return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource registration")
		}

		// Now block waiting for the operation to finish.
		select {
		case result = <-step.done:
		case <-rm.cancel:
			logging.V(5).Infof("ResourceMonitor.RegisterResource operation canceled, name=%s", name)
			return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on step's done channel")
		}
		if result != nil && result.Result != ResultStateSuccess && !req.GetSupportsResultReporting() {
			return nil, rpcerror.New(codes.Internal, "resource registration failed")
		}
		if result != nil && result.State != nil && result.State.URN != "" {
			rm.resGoalsLock.Lock()
			rm.resGoals[result.State.URN] = *goal
			rm.resGoalsLock.Unlock()
		}
	}

	if result != nil && result.State != nil && result.State.URN != "" {
		// We've got a safe URN now, save the parent and transformations
		func() {
			rm.parentsLock.Lock()
			defer rm.parentsLock.Unlock()
			rm.parents[result.State.URN] = parent
		}()
		func() {
			rm.resourceTransformsLock.Lock()
			defer rm.resourceTransformsLock.Unlock()
			rm.resourceTransforms[result.State.URN] = transforms
		}()
		if !custom {
			func() {
				rm.componentProvidersLock.Lock()
				defer rm.componentProvidersLock.Unlock()
				rm.componentProviders[result.State.URN] = opts.GetProviders()
			}()
		}
	}

	// Filter out partially-known values if the requestor does not support them.
	outputs := result.State.Outputs

	// Local ComponentResources may contain unresolved resource refs, so ignore those outputs.
	if !req.GetCustom() && !remote {
		// In the case of a SameStep, the old resource outputs are returned to the language host after the step is
		// executed. The outputs of a ComponentResource may depend on resources that have not been registered at the
		// time the ComponentResource is itself registered, as the outputs are set by a later call to
		// RegisterResourceOutputs. Therefore, when the SameStep returns the old resource outputs for a
		// ComponentResource, it may return references to resources that have not yet been registered, which will cause
		// the SDK's calls to getResource to fail when it attempts to resolve those references.
		//
		// Work on a more targeted fix is tracked in https://github.com/pulumi/pulumi/issues/5978
		outputs = resource.PropertyMap{}
	}

	if !req.GetSupportsPartialValues() {
		logging.V(5).Infof("stripping unknowns from RegisterResource response for urn %v", result.State.URN)
		filtered := resource.PropertyMap{}
		for k, v := range outputs {
			if !v.ContainsUnknowns() {
				filtered[k] = v
			}
		}
		outputs = filtered
	}

	// TODO(@platform):
	// Currently component resources ignore these options:
	// • ignoreChanges
	// • customTimeouts
	// • additionalSecretOutputs
	// • replaceOnChanges
	// • retainOnDelete
	// • deletedWith
	// Revisit these semantics in Pulumi v4.0
	// See this issue for more: https://github.com/pulumi/pulumi/issues/9704
	if !custom {
		rm.checkComponentOption(result.State.URN, "ignoreChanges", func() bool {
			return len(ignoreChanges) > 0
		})
		rm.checkComponentOption(result.State.URN, "customTimeouts", func() bool {
			if customTimeouts == nil {
				return false
			}
			hasUpdateTimeout := customTimeouts.Update != ""
			hasCreateTimeout := customTimeouts.Create != ""
			hasDeleteTimeout := customTimeouts.Delete != ""
			return hasCreateTimeout || hasUpdateTimeout || hasDeleteTimeout
		})
		rm.checkComponentOption(result.State.URN, "additionalSecretOutputs", func() bool {
			return len(additionalSecretOutputs) > 0
		})
		rm.checkComponentOption(result.State.URN, "replaceOnChanges", func() bool {
			return len(replaceOnChanges) > 0
		})
		rm.checkComponentOption(result.State.URN, "retainOnDelete", func() bool {
			return retainOnDelete
		})
		rm.checkComponentOption(result.State.URN, "deletedWith", func() bool {
			return deletedWith != ""
		})
	}

	logging.V(5).Infof(
		"ResourceMonitor.RegisterResource operation finished: t=%v, urn=%v, #outs=%v",
		result.State.Type, result.State.URN, len(outputs))

	// Finally, unpack the response into properties that we can return to the language runtime.  This mostly includes
	// an ID, URN, and defaults and output properties that will all be blitted back onto the runtime object.
	obj, err := plugin.MarshalProperties(outputs, plugin.MarshalOptions{
		Label:            label,
		KeepUnknowns:     true,
		KeepSecrets:      req.GetAcceptSecrets(),
		KeepResources:    req.GetAcceptResources(),
		WorkingDirectory: rm.workingDirectory,
	})
	if err != nil {
		return nil, err
	}

	// Assert that we never leak the unconfigured provider ID to the language host.
	contract.Assertf(
		!providers.IsProviderType(result.State.Type) || result.State.ID != providers.UnconfiguredID,
		"provider resource %s has unconfigured ID", result.State.URN)

	reason := pulumirpc.Result_SUCCESS
	if result.Result == ResultStateSkipped {
		reason = pulumirpc.Result_SKIP
	} else if result.Result == ResultStateFailed {
		reason = pulumirpc.Result_FAIL
	}
	return &pulumirpc.RegisterResourceResponse{
		Urn:                  string(result.State.URN),
		Id:                   string(result.State.ID),
		Object:               obj,
		PropertyDependencies: outputDeps,
		Result:               reason,
	}, nil
}

// checkComponentOption generates a warning message on the resource
// 'urn' if 'check' returns true.
// This function is intended to validate options passed to component resources,
// so urn is expected to refer to a component.
func (rm *resmon) checkComponentOption(urn resource.URN, optName string, check func() bool) {
	if check() {
		logging.V(10).Infof("The option '%s' has no automatic effect on component resource '%s', "+
			"ensure it is handled correctly in the component code.", optName, urn)
	}
}

// RegisterResourceOutputs records some new output properties for a resource that have arrived after its initial
// provisioning.  These will make their way into the eventual checkpoint state file for that resource.
func (rm *resmon) RegisterResourceOutputs(ctx context.Context,
	req *pulumirpc.RegisterResourceOutputsRequest,
) (*emptypb.Empty, error) {
	// Obtain and validate the message's inputs (a URN plus the output property map).
	urn, err := resource.ParseURN(req.Urn)
	if err != nil {
		return nil, fmt.Errorf("invalid resource URN: %w", err)
	}

	label := fmt.Sprintf("ResourceMonitor.RegisterResourceOutputs(%s)", urn)
	outs, err := plugin.UnmarshalProperties(
		req.GetOutputs(), plugin.MarshalOptions{
			Label:              label,
			KeepUnknowns:       true,
			ComputeAssetHashes: true,
			KeepSecrets:        true,
			KeepResources:      true,
			WorkingDirectory:   rm.workingDirectory,
		})
	if err != nil {
		return nil, fmt.Errorf("cannot unmarshal output properties: %w", err)
	}
	logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs received: urn=%v, #outs=%v", urn, len(outs))

	// Now send the step over to the engine to perform.
	step := &registerResourceOutputsEvent{
		urn:     urn,
		outputs: outs,
		done:    make(chan bool),
	}

	select {
	case rm.regOutChan <- step:
	case <-rm.cancel:
		logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs operation canceled, urn=%s", urn)
		return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while sending resource outputs")
	}

	// Now block waiting for the operation to finish.
	select {
	case <-step.done:
	case <-rm.cancel:
		logging.V(5).Infof("ResourceMonitor.RegisterResourceOutputs operation canceled, urn=%s", urn)
		return nil, rpcerror.New(codes.Unavailable, "resource monitor shut down while waiting on output step's done channel")
	}

	logging.V(5).Infof(
		"ResourceMonitor.RegisterResourceOutputs operation finished: urn=%v, #outs=%v", urn, len(outs))
	return &emptypb.Empty{}, nil
}

type registerResourceEvent struct {
	goal *resource.Goal       // the resource goal state produced by the iterator.
	done chan *RegisterResult // the channel to communicate with after the resource state is available.
}

var _ RegisterResourceEvent = (*registerResourceEvent)(nil)

func (g *registerResourceEvent) event() {}

func (g *registerResourceEvent) Goal() *resource.Goal {
	return g.goal
}

func (g *registerResourceEvent) Done(result *RegisterResult) {
	// Communicate the resulting state back to the RPC thread, which is parked awaiting our reply.
	g.done <- result
}

type registerResourceOutputsEvent struct {
	urn     resource.URN         // the URN to which this completion applies.
	outputs resource.PropertyMap // an optional property bag for output properties.
	done    chan bool            // the channel to communicate with after the operation completes.
}

var _ RegisterResourceOutputsEvent = (*registerResourceOutputsEvent)(nil)

func (g *registerResourceOutputsEvent) event() {}

func (g *registerResourceOutputsEvent) URN() resource.URN {
	return g.urn
}

func (g *registerResourceOutputsEvent) Outputs() resource.PropertyMap {
	return g.outputs
}

func (g *registerResourceOutputsEvent) Done() {
	// Communicate the resulting state back to the RPC thread, which is parked awaiting our reply.
	g.done <- true
}

type readResourceEvent struct {
	id                      resource.ID
	name                    string
	baseType                tokens.Type
	provider                string
	parent                  resource.URN
	props                   resource.PropertyMap
	dependencies            []resource.URN
	additionalSecretOutputs []resource.PropertyKey
	sourcePosition          string
	done                    chan *ReadResult
}

var _ ReadResourceEvent = (*readResourceEvent)(nil)

func (g *readResourceEvent) event() {}

func (g *readResourceEvent) ID() resource.ID                  { return g.id }
func (g *readResourceEvent) Name() string                     { return g.name }
func (g *readResourceEvent) Type() tokens.Type                { return g.baseType }
func (g *readResourceEvent) Provider() string                 { return g.provider }
func (g *readResourceEvent) Parent() resource.URN             { return g.parent }
func (g *readResourceEvent) Properties() resource.PropertyMap { return g.props }
func (g *readResourceEvent) Dependencies() []resource.URN     { return g.dependencies }
func (g *readResourceEvent) AdditionalSecretOutputs() []resource.PropertyKey {
	return g.additionalSecretOutputs
}
func (g *readResourceEvent) SourcePosition() string { return g.sourcePosition }

func (g *readResourceEvent) Done(result *ReadResult) {
	g.done <- result
}

func generateTimeoutInSeconds(timeout string) (float64, error) {
	duration, err := time.ParseDuration(timeout)
	if err != nil {
		return 0, fmt.Errorf("unable to parse customTimeout Value %s", timeout)
	}

	return duration.Seconds(), nil
}

func decorateResourceSpans(span opentracing.Span, method string, req, resp interface{}, grpcError error) {
	if req == nil {
		return
	}

	switch method {
	case "/pulumirpc.ResourceMonitor/Invoke":
		span.SetTag("pulumi-decorator", req.(*pulumirpc.ResourceInvokeRequest).Tok)
	case "/pulumirpc.ResourceMonitor/ReadResource":
		span.SetTag("pulumi-decorator", req.(*pulumirpc.ReadResourceRequest).Type)
	case "/pulumirpc.ResourceMonitor/RegisterResource":
		span.SetTag("pulumi-decorator", req.(*pulumirpc.RegisterResourceRequest).Type)
	}
}

// downgradeOutputValues recursively replaces all Output values with `Computed`, `Secret`, or their plain
// value. This loses all dependency information.
func downgradeOutputValues(v resource.PropertyMap) resource.PropertyMap {
	var downgradeOutputPropertyValue func(v resource.PropertyValue) resource.PropertyValue

	downgradeOutputPropertyValue = func(v resource.PropertyValue) resource.PropertyValue {
		if v.IsOutput() {
			output := v.OutputValue()
			var result resource.PropertyValue
			if output.Known {
				result = downgradeOutputPropertyValue(output.Element)
			} else {
				result = resource.MakeComputed(resource.NewStringProperty(""))
			}
			if output.Secret {
				result = resource.MakeSecret(result)
			}
			return result
		}
		if v.IsObject() {
			return resource.NewObjectProperty(downgradeOutputValues(v.ObjectValue()))
		}
		if v.IsArray() {
			result := make([]resource.PropertyValue, len(v.ArrayValue()))
			for i, elem := range v.ArrayValue() {
				result[i] = downgradeOutputPropertyValue(elem)
			}
			return resource.NewArrayProperty(result)
		}
		if v.IsSecret() {
			return resource.MakeSecret(downgradeOutputPropertyValue(v.SecretValue().Element))
		}
		if v.IsResourceReference() {
			ref := v.ResourceReferenceValue()
			return resource.NewResourceReferenceProperty(
				resource.ResourceReference{
					URN:            ref.URN,
					ID:             downgradeOutputPropertyValue(ref.ID),
					PackageVersion: ref.PackageVersion,
				})
		}
		return v
	}

	result := make(resource.PropertyMap)
	for k, pv := range v {
		result[k] = downgradeOutputPropertyValue(pv)
	}
	return result
}

func upgradeOutputValues(
	v resource.PropertyMap, propertyDependencies map[resource.PropertyKey]mapset.Set[resource.URN],
) resource.PropertyMap {
	// We assume that by the time this is being called we've upgraded all Secret/Computed values to outputs. We just
	// need to add the dependency information from propertyDependencies.

	result := make(resource.PropertyMap)
	for k, pv := range v {
		if deps, has := propertyDependencies[k]; has {
			currentDeps := mapset.NewSet[resource.URN]()
			addOutputDependencies(currentDeps, pv)
			if currentDeps.IsSuperset(deps) {
				// already has the deps, just copy across
				result[k] = pv
			} else {
				var output resource.Output
				if pv.IsOutput() {
					output = pv.OutputValue()
				} else {
					output = resource.Output{
						Element: pv,
						Known:   true,
					}
				}

				// Merge all the dependencies from the propertyDependencies map with any current dependencies on this
				// output value.
				currentDeps.Clear()
				currentDeps.Append(output.Dependencies...)
				currentDeps = currentDeps.Union(deps)

				output.Dependencies = currentDeps.ToSlice()
				result[k] = resource.NewOutputProperty(output)
			}
		} else {
			// no deps just copy across
			result[k] = pv
		}
	}
	return result
}

func extendOutputDependencies(deps []resource.URN, v resource.PropertyValue) []resource.URN {
	set := mapset.NewSet(deps...)
	addOutputDependencies(set, v)
	return set.ToSlice()
}

func addOutputDependencies(deps mapset.Set[resource.URN], v resource.PropertyValue) {
	if v.IsOutput() {
		output := v.OutputValue()
		if output.Known {
			addOutputDependencies(deps, output.Element)
		}
		deps.Append(output.Dependencies...)
	}
	if v.IsResourceReference() {
		ref := v.ResourceReferenceValue()
		addOutputDependencies(deps, ref.ID)
	}
	if v.IsObject() {
		for _, elem := range v.ObjectValue() {
			addOutputDependencies(deps, elem)
		}
	}
	if v.IsArray() {
		for _, elem := range v.ArrayValue() {
			addOutputDependencies(deps, elem)
		}
	}
	if v.IsSecret() {
		addOutputDependencies(deps, v.SecretValue().Element)
	}
}