// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deploy

import (
	"context"
	cryptorand "crypto/rand"
	"errors"
	"fmt"
	"sort"

	"github.com/blang/semver"
	"github.com/pulumi/pulumi/pkg/v3/codegen/schema"
	"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
	"github.com/pulumi/pulumi/pkg/v3/util/gsync"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
	"github.com/pulumi/pulumi/sdk/v3/go/common/slice"
	"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
)

// An Import specifies a resource to import.
type Import struct {
	Type              tokens.Type       // The type token for the resource. Required.
	Name              string            // The name of the resource. Required.
	ID                resource.ID       // The ID of the resource. Required.
	Parent            resource.URN      // The parent of the resource, if any.
	Provider          resource.URN      // The specific provider to use for the resource, if any.
	Version           *semver.Version   // The provider version to use for the resource, if any.
	PluginDownloadURL string            // The provider PluginDownloadURL to use for the resource, if any.
	PluginChecksums   map[string][]byte // The provider checksums to use for the resource, if any.
	Protect           bool              // Whether to mark the resource as protected after import
	Properties        []string          // Which properties to include (Defaults to required properties)

	// True if this import should create an empty component resource. ID must not be set if this is used.
	Component bool
	// True if this is a remote component resource. Component must be true if this is true.
	Remote bool
}

// ImportOptions controls the import process.
type ImportOptions struct {
	Events   Events // an optional events callback interface.
	Parallel int    // the degree of parallelism for resource operations (<=1 for serial).
}

// NewImportDeployment creates a new import deployment from a resource snapshot plus a set of resources to import.
//
// From the old and new states, it understands how to orchestrate an evaluation and analyze the resulting resources.
// The deployment may be used to simply inspect a series of operations, or actually perform them; these operations are
// generated based on analysis of the old and new states.  If a resource exists in new, but not old, for example, it
// results in a create; if it exists in both, but is different, it results in an update; and so on and so forth.
//
// Note that a deployment uses internal concurrency and parallelism in various ways, so it must be closed if for some
// reason it isn't carried out to its final conclusion. This will result in cancellation and reclamation of resources.
func NewImportDeployment(
	ctx *plugin.Context,
	opts *Options,
	events Events,
	target *Target,
	projectName tokens.PackageName,
	imports []Import,
) (*Deployment, error) {
	contract.Requiref(ctx != nil, "ctx", "must not be nil")
	contract.Requiref(target != nil, "target", "must not be nil")

	prev := target.Snapshot
	source := NewErrorSource(projectName)
	if err := migrateProviders(target, prev, source); err != nil {
		return nil, err
	}

	// Produce a map of all old resources for fast access.
	_, olds, err := buildResourceMap(prev, opts.DryRun)
	if err != nil {
		return nil, err
	}

	// Create a goal map for the deployment.
	newGoals := &gsync.Map[resource.URN, *resource.Goal]{}

	builtins := newBuiltinProvider(nil, nil, ctx.Diag)

	// Create a new provider registry.
	reg := providers.NewRegistry(ctx.Host, opts.DryRun, builtins)

	// Return the prepared deployment.
	return &Deployment{
		ctx:          ctx,
		opts:         opts,
		events:       events,
		target:       target,
		prev:         prev,
		olds:         olds,
		goals:        newGoals,
		imports:      imports,
		isImport:     true,
		schemaLoader: schema.NewPluginLoader(ctx.Host),
		source:       NewErrorSource(projectName),
		providers:    reg,
		newPlans:     newResourcePlan(target.Config),
		news:         &gsync.Map[resource.URN, *resource.State]{},
	}, nil
}

type noopEvent int

func (noopEvent) event()                      {}
func (noopEvent) Goal() *resource.Goal        { return nil }
func (noopEvent) Done(result *RegisterResult) {}

type noopOutputsEvent resource.URN

func (noopOutputsEvent) event()                        {}
func (e noopOutputsEvent) URN() resource.URN           { return resource.URN(e) }
func (noopOutputsEvent) Outputs() resource.PropertyMap { return resource.PropertyMap{} }
func (noopOutputsEvent) Done()                         {}

type importer struct {
	deployment *Deployment
	executor   *stepExecutor
	preview    bool
}

func (i *importer) executeSerial(ctx context.Context, steps ...Step) bool {
	return i.wait(ctx, i.executor.ExecuteSerial(steps))
}

func (i *importer) executeParallel(ctx context.Context, steps ...Step) bool {
	return i.wait(ctx, i.executor.ExecuteParallel(steps))
}

func (i *importer) wait(ctx context.Context, token completionToken) bool {
	token.Wait(ctx)
	return ctx.Err() == nil && i.executor.Errored() == nil
}

func (i *importer) registerExistingResources(ctx context.Context) bool {
	if i != nil && i.deployment != nil && i.deployment.prev != nil {
		// Issue same steps per existing resource to make sure that they are recorded in the snapshot.
		// We issue these steps serially s.t. the resources remain in the order in which they appear in the state.
		for _, r := range i.deployment.prev.Resources {
			if r.Delete {
				continue
			}

			// Clear the ID because Same asserts that the new state has no ID.
			new := r.Copy()
			new.ID = ""
			// Set a dummy goal so the resource is tracked as managed.
			i.deployment.goals.Store(r.URN, &resource.Goal{})
			if !i.executeSerial(ctx, NewSameStep(i.deployment, noopEvent(0), r, new)) {
				return false
			}
		}
	}
	return true
}

func (i *importer) getOrCreateStackResource(ctx context.Context) (resource.URN, bool, bool) {
	// Get or create the root resource.
	if i.deployment.prev != nil {
		for _, res := range i.deployment.prev.Resources {
			if res.Type == resource.RootStackType && res.Parent == "" {
				return res.URN, false, true
			}
		}
	}

	projectName, stackName := i.deployment.source.Project(), i.deployment.target.Name
	typ, name := resource.RootStackType, fmt.Sprintf("%s-%s", projectName, stackName)
	urn := resource.NewURN(stackName.Q(), projectName, "", typ, name)
	state := resource.NewState(typ, urn, false, false, "", resource.PropertyMap{}, nil, "", false, false, nil, nil, "",
		nil, false, nil, nil, nil, "", false, "", nil, nil, "", nil)
	// TODO(seqnum) should stacks be created with 1? When do they ever get recreated/replaced?
	if !i.executeSerial(ctx, NewCreateStep(i.deployment, noopEvent(0), state)) {
		return "", false, false
	}
	return urn, true, true
}

func (i *importer) registerProviders(ctx context.Context) (map[resource.URN]string, bool, error) {
	urnToReference := map[resource.URN]string{}

	// Determine which default providers are not present in the state. If all default providers are accounted for,
	// we're done.
	//
	// NOTE: what if the configuration for an existing default provider has changed? If it has, we should diff it and
	// replace it appropriately or we should not use the ambient config at all.
	defaultProviderRequests := slice.Prealloc[providers.ProviderRequest](len(i.deployment.imports))
	defaultProviders := map[resource.URN]struct{}{}
	for _, imp := range i.deployment.imports {
		if imp.Component && !imp.Remote {
			// Skip local component resources, they don't have providers.
			continue
		}

		if imp.Provider != "" {
			// If the provider for this import exists, map its URN to its provider reference. If it does not exist,
			// the import step will issue an appropriate error or errors.
			ref := string(imp.Provider)
			if state, ok := i.deployment.olds[imp.Provider]; ok {
				r, err := providers.NewReference(imp.Provider, state.ID)
				contract.AssertNoErrorf(err,
					"could not create provider reference with URN %q and ID %q", imp.Provider, state.ID)
				ref = r.String()
			}
			urnToReference[imp.Provider] = ref
			continue
		}

		if imp.Type.Package() == "" {
			return nil, false, errors.New("incorrect package type specified")
		}
		req := providers.NewProviderRequest(imp.Type.Package(), imp.Version, imp.PluginDownloadURL, imp.PluginChecksums, nil)
		typ, name := providers.MakeProviderType(req.Package()), req.DefaultName()
		urn := i.deployment.generateURN("", typ, name)
		if state, ok := i.deployment.olds[urn]; ok {
			ref, err := providers.NewReference(urn, state.ID)
			contract.AssertNoErrorf(err,
				"could not create provider reference with URN %q and ID %q", urn, state.ID)
			urnToReference[urn] = ref.String()
			continue
		}
		if _, ok := defaultProviders[urn]; ok {
			continue
		}

		defaultProviderRequests = append(defaultProviderRequests, req)
		defaultProviders[urn] = struct{}{}
	}
	if len(defaultProviderRequests) == 0 {
		return urnToReference, true, nil
	}

	steps := make([]Step, len(defaultProviderRequests))
	sort.Slice(defaultProviderRequests, func(i, j int) bool {
		return defaultProviderRequests[i].String() < defaultProviderRequests[j].String()
	})
	for idx, req := range defaultProviderRequests {
		if req.Package() == "" {
			return nil, false, errors.New("incorrect package type specified")
		}

		typ, name := providers.MakeProviderType(req.Package()), req.DefaultName()
		urn := i.deployment.generateURN("", typ, name)

		// Fetch, prepare, and check the configuration for this provider.
		inputs, err := i.deployment.target.GetPackageConfig(req.Package())
		if err != nil {
			return nil, false, fmt.Errorf("failed to fetch provider config: %w", err)
		}

		// Calculate the inputs for the provider using the ambient config.
		if v := req.Version(); v != nil {
			providers.SetProviderVersion(inputs, v)
		}
		if url := req.PluginDownloadURL(); url != "" {
			providers.SetProviderURL(inputs, url)
		}
		if checksums := req.PluginChecksums(); checksums != nil {
			providers.SetProviderChecksums(inputs, checksums)
		}
		resp, err := i.deployment.providers.Check(ctx, plugin.CheckRequest{
			URN:  urn,
			News: inputs,
		})
		if err != nil {
			return nil, false, fmt.Errorf("failed to validate provider config: %w", err)
		}

		state := resource.NewState(typ, urn, true, false, "", inputs, nil, "", false, false, nil, nil, "", nil, false,
			nil, nil, nil, "", false, "", nil, nil, "", nil)
		// TODO(seqnum) should default providers be created with 1? When do they ever get recreated/replaced?
		if issueCheckErrors(i.deployment, state, urn, resp.Failures) {
			return nil, false, nil
		}

		// Set a dummy goal so the resource is tracked as managed.
		i.deployment.goals.Store(urn, &resource.Goal{})
		steps[idx] = NewCreateStep(i.deployment, noopEvent(0), state)
	}

	// Issue the create steps.
	if !i.executeParallel(ctx, steps...) {
		return nil, false, nil
	}

	// Update the URN to reference map.
	for _, s := range steps {
		res := s.Res()
		id := res.ID
		if i.preview {
			id = providers.UnknownID
		}
		ref, err := providers.NewReference(res.URN, id)
		contract.AssertNoErrorf(err, "could not create provider reference with URN %q and ID %q", res.URN, id)
		urnToReference[res.URN] = ref.String()
	}

	return urnToReference, true, nil
}

func (i *importer) importResources(ctx context.Context) error {
	contract.Assertf(len(i.deployment.imports) != 0, "no resources to import")

	if !i.registerExistingResources(ctx) {
		return nil
	}

	stackURN, createdStack, ok := i.getOrCreateStackResource(ctx)
	if !ok {
		return nil
	}

	urnToReference, ok, err := i.registerProviders(ctx)
	if !ok {
		return err
	}

	// Create a step per resource to import and execute them in parallel batches which don't depend on each other.
	// If there are duplicates, fail the import.
	urns := map[resource.URN]struct{}{}
	steps := slice.Prealloc[Step](len(i.deployment.imports))
	for _, imp := range i.deployment.imports {
		parent := imp.Parent
		if parent == "" {
			parent = stackURN
		}
		urn := i.deployment.generateURN(parent, imp.Type, imp.Name)

		// Check for duplicate imports.
		if _, has := urns[urn]; has {
			return fmt.Errorf("duplicate import '%v' of type '%v'", imp.Name, imp.Type)
		}
		urns[urn] = struct{}{}

		// If the resource already exists and the ID matches the ID to import, then Same this resource. If the ID does
		// not match, the step itself will issue an error.
		if old, ok := i.deployment.olds[urn]; ok {
			oldID := old.ID
			if old.ImportID != "" {
				oldID = old.ImportID
			}
			if oldID == imp.ID {
				// Clear the ID because Same asserts that the new state has no ID.
				new := old.Copy()
				new.ID = ""
				// Set a dummy goal so the resource is tracked as managed.
				i.deployment.goals.Store(old.URN, &resource.Goal{})
				steps = append(steps, NewSameStep(i.deployment, noopEvent(0), old, new))
				continue
			}
		}

		// If the resource already exists and the ID matches the ID to import, then Same this resource. If the ID does
		// not match, the step itself will issue an error.
		if old, ok := i.deployment.olds[urn]; ok {
			oldID := old.ID
			if old.ImportID != "" {
				oldID = old.ImportID
			}
			if oldID == imp.ID {
				// Clear the ID because Same asserts that the new state has no ID.
				new := old.Copy()
				new.ID = ""
				// Set a dummy goal so the resource is tracked as managed.
				i.deployment.goals.Store(old.URN, &resource.Goal{})
				steps = append(steps, NewSameStep(i.deployment, noopEvent(0), old, new))
				continue
			}
		}

		providerURN := imp.Provider
		if providerURN == "" && (!imp.Component || imp.Remote) {
			req := providers.NewProviderRequest(imp.Type.Package(), imp.Version, imp.PluginDownloadURL, imp.PluginChecksums, nil)
			typ, name := providers.MakeProviderType(req.Package()), req.DefaultName()
			providerURN = i.deployment.generateURN("", typ, name)
		}

		var provider string
		if providerURN != "" {
			// Fetch the provider reference for this import. All provider URNs should be mapped.
			provider, ok = urnToReference[providerURN]
			contract.Assertf(ok, "provider reference for URN %v not found", providerURN)
		}

		// Create the new desired state. Note that the resource is protected. Provider might be "" at this point.
		new := resource.NewState(
			urn.Type(), urn, !imp.Component, false, imp.ID, resource.PropertyMap{}, nil, parent, imp.Protect,
			false, nil, nil, provider, nil, false, nil, nil, nil, "", false, "", nil, nil, "", nil)
		// Set a dummy goal so the resource is tracked as managed.
		i.deployment.goals.Store(urn, &resource.Goal{})

		if imp.Component {
			if imp.Remote {
				contract.Assertf(ok, "provider reference for URN %v not found", providerURN)
			}

			steps = append(steps, newImportDeploymentStep(i.deployment, new, nil))
		} else {
			contract.Assertf(ok, "provider reference for URN %v not found", providerURN)

			// If we have a plan for this resource we need to feed the saved seed to Check to remove non-determinism
			var randomSeed []byte
			if i.deployment.plan != nil {
				if resourcePlan, ok := i.deployment.plan.ResourcePlans[urn]; ok {
					randomSeed = resourcePlan.Seed
				}
			} else {
				randomSeed = make([]byte, 32)
				n, err := cryptorand.Read(randomSeed)
				contract.AssertNoErrorf(err, "could not read random bytes")
				contract.Assertf(n == len(randomSeed), "read %d random bytes, expected %d", n, len(randomSeed))
			}

			steps = append(steps, newImportDeploymentStep(i.deployment, new, randomSeed))
		}
	}

	// We've created all the steps above but we need to execute them in parallel batches which don't depend on each other
	for len(urns) > 0 {
		// Find all the steps that can be executed in parallel. `urns` is a map of every resource we still
		// need to import so if we need a resource from that map we can't yet build this resource.
		parallelSteps := []Step{}
		for _, step := range steps {
			// If we've already done this step don't do it again
			if _, ok := urns[step.New().URN]; !ok {
				continue
			}

			// If the step has no dependencies (we actually only need to look at parent), it can be executed in parallel
			if _, ok := urns[step.New().Parent]; !ok {
				parallelSteps = append(parallelSteps, step)
			}
		}

		// Remove all the urns we're about to import
		for _, step := range parallelSteps {
			delete(urns, step.New().URN)
		}

		if !i.executeParallel(ctx, parallelSteps...) {
			return nil
		}
	}

	if createdStack {
		return i.executor.ExecuteRegisterResourceOutputs(noopOutputsEvent(stackURN))
	}

	return nil
}