// Copyright 2016-2020, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package deploy import ( "context" cryptorand "crypto/rand" "errors" "fmt" "sort" "github.com/blang/semver" "github.com/pulumi/pulumi/pkg/v3/codegen/schema" "github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers" "github.com/pulumi/pulumi/pkg/v3/util/gsync" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin" "github.com/pulumi/pulumi/sdk/v3/go/common/slice" "github.com/pulumi/pulumi/sdk/v3/go/common/tokens" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" ) // An Import specifies a resource to import. type Import struct { Type tokens.Type // The type token for the resource. Required. Name string // The name of the resource. Required. ID resource.ID // The ID of the resource. Required. Parent resource.URN // The parent of the resource, if any. Provider resource.URN // The specific provider to use for the resource, if any. Version *semver.Version // The provider version to use for the resource, if any. PluginDownloadURL string // The provider PluginDownloadURL to use for the resource, if any. PluginChecksums map[string][]byte // The provider checksums to use for the resource, if any. Protect bool // Whether to mark the resource as protected after import Properties []string // Which properties to include (Defaults to required properties) // True if this import should create an empty component resource. ID must not be set if this is used. Component bool // True if this is a remote component resource. Component must be true if this is true. Remote bool } // ImportOptions controls the import process. type ImportOptions struct { Events Events // an optional events callback interface. Parallel int // the degree of parallelism for resource operations (<=1 for serial). } // NewImportDeployment creates a new import deployment from a resource snapshot plus a set of resources to import. // // From the old and new states, it understands how to orchestrate an evaluation and analyze the resulting resources. // The deployment may be used to simply inspect a series of operations, or actually perform them; these operations are // generated based on analysis of the old and new states. If a resource exists in new, but not old, for example, it // results in a create; if it exists in both, but is different, it results in an update; and so on and so forth. // // Note that a deployment uses internal concurrency and parallelism in various ways, so it must be closed if for some // reason it isn't carried out to its final conclusion. This will result in cancellation and reclamation of resources. func NewImportDeployment( ctx *plugin.Context, opts *Options, events Events, target *Target, projectName tokens.PackageName, imports []Import, ) (*Deployment, error) { contract.Requiref(ctx != nil, "ctx", "must not be nil") contract.Requiref(target != nil, "target", "must not be nil") prev := target.Snapshot source := NewErrorSource(projectName) if err := migrateProviders(target, prev, source); err != nil { return nil, err } // Produce a map of all old resources for fast access. _, olds, err := buildResourceMap(prev, opts.DryRun) if err != nil { return nil, err } // Create a goal map for the deployment. newGoals := &gsync.Map[resource.URN, *resource.Goal]{} builtins := newBuiltinProvider(nil, nil, ctx.Diag) // Create a new provider registry. reg := providers.NewRegistry(ctx.Host, opts.DryRun, builtins) // Return the prepared deployment. return &Deployment{ ctx: ctx, opts: opts, events: events, target: target, prev: prev, olds: olds, goals: newGoals, imports: imports, isImport: true, schemaLoader: schema.NewPluginLoader(ctx.Host), source: NewErrorSource(projectName), providers: reg, newPlans: newResourcePlan(target.Config), news: &gsync.Map[resource.URN, *resource.State]{}, }, nil } type noopEvent int func (noopEvent) event() {} func (noopEvent) Goal() *resource.Goal { return nil } func (noopEvent) Done(result *RegisterResult) {} type noopOutputsEvent resource.URN func (noopOutputsEvent) event() {} func (e noopOutputsEvent) URN() resource.URN { return resource.URN(e) } func (noopOutputsEvent) Outputs() resource.PropertyMap { return resource.PropertyMap{} } func (noopOutputsEvent) Done() {} type importer struct { deployment *Deployment executor *stepExecutor preview bool } func (i *importer) executeSerial(ctx context.Context, steps ...Step) bool { return i.wait(ctx, i.executor.ExecuteSerial(steps)) } func (i *importer) executeParallel(ctx context.Context, steps ...Step) bool { return i.wait(ctx, i.executor.ExecuteParallel(steps)) } func (i *importer) wait(ctx context.Context, token completionToken) bool { token.Wait(ctx) return ctx.Err() == nil && i.executor.Errored() == nil } func (i *importer) registerExistingResources(ctx context.Context) bool { if i != nil && i.deployment != nil && i.deployment.prev != nil { // Issue same steps per existing resource to make sure that they are recorded in the snapshot. // We issue these steps serially s.t. the resources remain in the order in which they appear in the state. for _, r := range i.deployment.prev.Resources { if r.Delete { continue } // Clear the ID because Same asserts that the new state has no ID. new := r.Copy() new.ID = "" // Set a dummy goal so the resource is tracked as managed. i.deployment.goals.Store(r.URN, &resource.Goal{}) if !i.executeSerial(ctx, NewSameStep(i.deployment, noopEvent(0), r, new)) { return false } } } return true } func (i *importer) getOrCreateStackResource(ctx context.Context) (resource.URN, bool, bool) { // Get or create the root resource. if i.deployment.prev != nil { for _, res := range i.deployment.prev.Resources { if res.Type == resource.RootStackType && res.Parent == "" { return res.URN, false, true } } } projectName, stackName := i.deployment.source.Project(), i.deployment.target.Name typ, name := resource.RootStackType, fmt.Sprintf("%s-%s", projectName, stackName) urn := resource.NewURN(stackName.Q(), projectName, "", typ, name) state := resource.NewState(typ, urn, false, false, "", resource.PropertyMap{}, nil, "", false, false, nil, nil, "", nil, false, nil, nil, nil, "", false, "", nil, nil, "", nil) // TODO(seqnum) should stacks be created with 1? When do they ever get recreated/replaced? if !i.executeSerial(ctx, NewCreateStep(i.deployment, noopEvent(0), state)) { return "", false, false } return urn, true, true } func (i *importer) registerProviders(ctx context.Context) (map[resource.URN]string, bool, error) { urnToReference := map[resource.URN]string{} // Determine which default providers are not present in the state. If all default providers are accounted for, // we're done. // // NOTE: what if the configuration for an existing default provider has changed? If it has, we should diff it and // replace it appropriately or we should not use the ambient config at all. defaultProviderRequests := slice.Prealloc[providers.ProviderRequest](len(i.deployment.imports)) defaultProviders := map[resource.URN]struct{}{} for _, imp := range i.deployment.imports { if imp.Component && !imp.Remote { // Skip local component resources, they don't have providers. continue } if imp.Provider != "" { // If the provider for this import exists, map its URN to its provider reference. If it does not exist, // the import step will issue an appropriate error or errors. ref := string(imp.Provider) if state, ok := i.deployment.olds[imp.Provider]; ok { r, err := providers.NewReference(imp.Provider, state.ID) contract.AssertNoErrorf(err, "could not create provider reference with URN %q and ID %q", imp.Provider, state.ID) ref = r.String() } urnToReference[imp.Provider] = ref continue } if imp.Type.Package() == "" { return nil, false, errors.New("incorrect package type specified") } req := providers.NewProviderRequest(imp.Type.Package(), imp.Version, imp.PluginDownloadURL, imp.PluginChecksums, nil) typ, name := providers.MakeProviderType(req.Package()), req.DefaultName() urn := i.deployment.generateURN("", typ, name) if state, ok := i.deployment.olds[urn]; ok { ref, err := providers.NewReference(urn, state.ID) contract.AssertNoErrorf(err, "could not create provider reference with URN %q and ID %q", urn, state.ID) urnToReference[urn] = ref.String() continue } if _, ok := defaultProviders[urn]; ok { continue } defaultProviderRequests = append(defaultProviderRequests, req) defaultProviders[urn] = struct{}{} } if len(defaultProviderRequests) == 0 { return urnToReference, true, nil } steps := make([]Step, len(defaultProviderRequests)) sort.Slice(defaultProviderRequests, func(i, j int) bool { return defaultProviderRequests[i].String() < defaultProviderRequests[j].String() }) for idx, req := range defaultProviderRequests { if req.Package() == "" { return nil, false, errors.New("incorrect package type specified") } typ, name := providers.MakeProviderType(req.Package()), req.DefaultName() urn := i.deployment.generateURN("", typ, name) // Fetch, prepare, and check the configuration for this provider. inputs, err := i.deployment.target.GetPackageConfig(req.Package()) if err != nil { return nil, false, fmt.Errorf("failed to fetch provider config: %w", err) } // Calculate the inputs for the provider using the ambient config. if v := req.Version(); v != nil { providers.SetProviderVersion(inputs, v) } if url := req.PluginDownloadURL(); url != "" { providers.SetProviderURL(inputs, url) } if checksums := req.PluginChecksums(); checksums != nil { providers.SetProviderChecksums(inputs, checksums) } resp, err := i.deployment.providers.Check(ctx, plugin.CheckRequest{ URN: urn, News: inputs, }) if err != nil { return nil, false, fmt.Errorf("failed to validate provider config: %w", err) } state := resource.NewState(typ, urn, true, false, "", inputs, nil, "", false, false, nil, nil, "", nil, false, nil, nil, nil, "", false, "", nil, nil, "", nil) // TODO(seqnum) should default providers be created with 1? When do they ever get recreated/replaced? if issueCheckErrors(i.deployment, state, urn, resp.Failures) { return nil, false, nil } // Set a dummy goal so the resource is tracked as managed. i.deployment.goals.Store(urn, &resource.Goal{}) steps[idx] = NewCreateStep(i.deployment, noopEvent(0), state) } // Issue the create steps. if !i.executeParallel(ctx, steps...) { return nil, false, nil } // Update the URN to reference map. for _, s := range steps { res := s.Res() id := res.ID if i.preview { id = providers.UnknownID } ref, err := providers.NewReference(res.URN, id) contract.AssertNoErrorf(err, "could not create provider reference with URN %q and ID %q", res.URN, id) urnToReference[res.URN] = ref.String() } return urnToReference, true, nil } func (i *importer) importResources(ctx context.Context) error { contract.Assertf(len(i.deployment.imports) != 0, "no resources to import") if !i.registerExistingResources(ctx) { return nil } stackURN, createdStack, ok := i.getOrCreateStackResource(ctx) if !ok { return nil } urnToReference, ok, err := i.registerProviders(ctx) if !ok { return err } // Create a step per resource to import and execute them in parallel batches which don't depend on each other. // If there are duplicates, fail the import. urns := map[resource.URN]struct{}{} steps := slice.Prealloc[Step](len(i.deployment.imports)) for _, imp := range i.deployment.imports { parent := imp.Parent if parent == "" { parent = stackURN } urn := i.deployment.generateURN(parent, imp.Type, imp.Name) // Check for duplicate imports. if _, has := urns[urn]; has { return fmt.Errorf("duplicate import '%v' of type '%v'", imp.Name, imp.Type) } urns[urn] = struct{}{} // If the resource already exists and the ID matches the ID to import, then Same this resource. If the ID does // not match, the step itself will issue an error. if old, ok := i.deployment.olds[urn]; ok { oldID := old.ID if old.ImportID != "" { oldID = old.ImportID } if oldID == imp.ID { // Clear the ID because Same asserts that the new state has no ID. new := old.Copy() new.ID = "" // Set a dummy goal so the resource is tracked as managed. i.deployment.goals.Store(old.URN, &resource.Goal{}) steps = append(steps, NewSameStep(i.deployment, noopEvent(0), old, new)) continue } } // If the resource already exists and the ID matches the ID to import, then Same this resource. If the ID does // not match, the step itself will issue an error. if old, ok := i.deployment.olds[urn]; ok { oldID := old.ID if old.ImportID != "" { oldID = old.ImportID } if oldID == imp.ID { // Clear the ID because Same asserts that the new state has no ID. new := old.Copy() new.ID = "" // Set a dummy goal so the resource is tracked as managed. i.deployment.goals.Store(old.URN, &resource.Goal{}) steps = append(steps, NewSameStep(i.deployment, noopEvent(0), old, new)) continue } } providerURN := imp.Provider if providerURN == "" && (!imp.Component || imp.Remote) { req := providers.NewProviderRequest(imp.Type.Package(), imp.Version, imp.PluginDownloadURL, imp.PluginChecksums, nil) typ, name := providers.MakeProviderType(req.Package()), req.DefaultName() providerURN = i.deployment.generateURN("", typ, name) } var provider string if providerURN != "" { // Fetch the provider reference for this import. All provider URNs should be mapped. provider, ok = urnToReference[providerURN] contract.Assertf(ok, "provider reference for URN %v not found", providerURN) } // Create the new desired state. Note that the resource is protected. Provider might be "" at this point. new := resource.NewState( urn.Type(), urn, !imp.Component, false, imp.ID, resource.PropertyMap{}, nil, parent, imp.Protect, false, nil, nil, provider, nil, false, nil, nil, nil, "", false, "", nil, nil, "", nil) // Set a dummy goal so the resource is tracked as managed. i.deployment.goals.Store(urn, &resource.Goal{}) if imp.Component { if imp.Remote { contract.Assertf(ok, "provider reference for URN %v not found", providerURN) } steps = append(steps, newImportDeploymentStep(i.deployment, new, nil)) } else { contract.Assertf(ok, "provider reference for URN %v not found", providerURN) // If we have a plan for this resource we need to feed the saved seed to Check to remove non-determinism var randomSeed []byte if i.deployment.plan != nil { if resourcePlan, ok := i.deployment.plan.ResourcePlans[urn]; ok { randomSeed = resourcePlan.Seed } } else { randomSeed = make([]byte, 32) n, err := cryptorand.Read(randomSeed) contract.AssertNoErrorf(err, "could not read random bytes") contract.Assertf(n == len(randomSeed), "read %d random bytes, expected %d", n, len(randomSeed)) } steps = append(steps, newImportDeploymentStep(i.deployment, new, randomSeed)) } } // We've created all the steps above but we need to execute them in parallel batches which don't depend on each other for len(urns) > 0 { // Find all the steps that can be executed in parallel. `urns` is a map of every resource we still // need to import so if we need a resource from that map we can't yet build this resource. parallelSteps := []Step{} for _, step := range steps { // If we've already done this step don't do it again if _, ok := urns[step.New().URN]; !ok { continue } // If the step has no dependencies (we actually only need to look at parent), it can be executed in parallel if _, ok := urns[step.New().Parent]; !ok { parallelSteps = append(parallelSteps, step) } } // Remove all the urns we're about to import for _, step := range parallelSteps { delete(urns, step.New().URN) } if !i.executeParallel(ctx, parallelSteps...) { return nil } } if createdStack { return i.executor.ExecuteRegisterResourceOutputs(noopOutputsEvent(stackURN)) } return nil }