// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deploy

import (
	"context"
	"fmt"
	"math"
	"regexp"
	"strings"
	"sync"

	uuid "github.com/gofrs/uuid"

	"github.com/pulumi/pulumi/pkg/v3/codegen/schema"
	"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
	"github.com/pulumi/pulumi/pkg/v3/resource/graph"
	"github.com/pulumi/pulumi/pkg/v3/util/gsync"
	"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
	"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
	"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)

// BackendClient is used to retrieve information about stacks from a backend.
type BackendClient interface {
	// GetStackOutputs returns the outputs (if any) for the named stack or an error if the stack cannot be found.
	GetStackOutputs(ctx context.Context, name string) (resource.PropertyMap, error)

	// GetStackResourceOutputs returns the resource outputs for a stack, or an error if the stack
	// cannot be found. Resources are retrieved from the latest stack snapshot, which may include
	// ongoing updates. They are returned in a `PropertyMap` mapping resource URN to another
	// `Propertymap` with members `type` (containing the Pulumi type ID for the resource) and
	// `outputs` (containing the resource outputs themselves).
	GetStackResourceOutputs(ctx context.Context, stackName string) (resource.PropertyMap, error)
}

// Options controls the deployment process.
type Options struct {
	// true if the process is a dry run (that is, won't make any changes), such as
	// during a preview action or when previewing another action like refresh or
	// destroy.
	DryRun bool
	// the degree of parallelism for resource operations (<=1 for serial).
	Parallel int
	// whether or not to refresh before executing the deployment.
	Refresh bool
	// whether or not to exit after refreshing (i.e. this is specifically a
	// refresh operation).
	RefreshOnly bool
	// if specified, only operate on the specified resources.
	Targets UrnTargets
	// if specified, mark the specified resources for replacement.
	ReplaceTargets UrnTargets
	// true if target dependents should be computed automatically.
	TargetDependents bool
	// whether or not to use legacy diffing behavior.
	UseLegacyDiff bool
	// true if the deployment should use legacy refresh diffing behavior and
	// report only output changes, as opposed to computing diffs against desired
	// state.
	UseLegacyRefreshDiff bool
	// true to disable resource reference support.
	DisableResourceReferences bool
	// true to disable output value support.
	DisableOutputValues bool
	// true to enable plan generation.
	GeneratePlan bool
	// true if we should continue with the deployment even if a resource operation fails.
	ContinueOnError bool
}

// DegreeOfParallelism returns the degree of parallelism that should be used during the
// deployment process.
func (o Options) DegreeOfParallelism() int {
	if o.Parallel <= 1 {
		return 1
	}
	return o.Parallel
}

// InfiniteParallelism returns whether or not the requested level of parallelism is unbounded.
func (o Options) InfiniteParallelism() bool {
	return o.Parallel == math.MaxInt32
}

// An immutable set of urns to target with an operation.
//
// The zero value of UrnTargets is the set of all URNs.
type UrnTargets struct {
	// UrnTargets is internally made up of two components: literals, which are fully
	// specified URNs and globs, which are partially specified URNs.

	literals []resource.URN
	globs    map[string]*regexp.Regexp
}

// Create a new set of targets.
//
// Each element is considered a glob if it contains any '*' and an URN otherwise. No other
// URN validation is performed.
//
// If len(urnOrGlobs) == 0, an unconstrained set will be created.
func NewUrnTargets(urnOrGlobs []string) UrnTargets {
	literals, globs := []resource.URN{}, map[string]*regexp.Regexp{}
	for _, urn := range urnOrGlobs {
		if strings.ContainsRune(urn, '*') {
			globs[urn] = nil
		} else {
			literals = append(literals, resource.URN(urn))
		}
	}
	return UrnTargets{literals, globs}
}

// Create a new set of targets from fully resolved URNs.
func NewUrnTargetsFromUrns(urns []resource.URN) UrnTargets {
	return UrnTargets{urns, nil}
}

// Return a copy of the UrnTargets
func (t UrnTargets) Clone() UrnTargets {
	newLiterals := append(make([]resource.URN, 0, len(t.literals)), t.literals...)
	newGlobs := make(map[string]*regexp.Regexp, len(t.globs))
	for k, v := range t.globs {
		newGlobs[k] = v
	}
	return UrnTargets{
		literals: newLiterals,
		globs:    newGlobs,
	}
}

// Return if the target set constrains the set of acceptable URNs.
func (t UrnTargets) IsConstrained() bool {
	return len(t.literals) > 0 || len(t.globs) > 0
}

// Get a regexp that can match on the glob. This function caches regexp generation.
func (t UrnTargets) getMatcher(glob string) *regexp.Regexp {
	if r := t.globs[glob]; r != nil {
		return r
	}
	segmentGlob := strings.Split(glob, "**")
	for i, v := range segmentGlob {
		part := strings.Split(v, "*")
		for i, v := range part {
			part[i] = regexp.QuoteMeta(v)
		}
		segmentGlob[i] = strings.Join(part, "[^:]*")
	}

	// Because we have quoted all input, this is safe to compile.
	r := regexp.MustCompile("^" + strings.Join(segmentGlob, ".*") + "$")

	// We cache and return the matcher
	t.globs[glob] = r
	return r
}

// Check if Targets contains the URN.
//
// If method receiver is not initialized, `true` is always returned.
func (t UrnTargets) Contains(urn resource.URN) bool {
	if !t.IsConstrained() {
		return true
	}
	for _, literal := range t.literals {
		if literal == urn {
			return true
		}
	}
	for glob := range t.globs {
		if t.getMatcher(glob).MatchString(string(urn)) {
			return true
		}
	}
	return false
}

// URN literals specified as targets.
//
// It doesn't make sense to iterate over all targets, since the list of targets may be
// infinite.
func (t UrnTargets) Literals() []resource.URN {
	return t.literals
}

// Adds a literal iff t is already initialized.
func (t *UrnTargets) addLiteral(urn resource.URN) {
	if t.IsConstrained() {
		t.literals = append(t.literals, urn)
	}
}

// StepExecutorEvents is an interface that can be used to hook resource lifecycle events.
type StepExecutorEvents interface {
	OnResourceStepPre(step Step) (interface{}, error)
	OnResourceStepPost(ctx interface{}, step Step, status resource.Status, err error) error
	OnResourceOutputs(step Step) error
}

// PolicyEvents is an interface that can be used to hook policy events.
type PolicyEvents interface {
	OnPolicyViolation(resource.URN, plugin.AnalyzeDiagnostic)
	OnPolicyRemediation(resource.URN, plugin.Remediation, resource.PropertyMap, resource.PropertyMap)
}

// Events is an interface that can be used to hook interesting engine events.
type Events interface {
	StepExecutorEvents
	PolicyEvents
}

type resourcePlans struct {
	m     sync.RWMutex
	plans Plan
}

func newResourcePlan(config config.Map) *resourcePlans {
	return &resourcePlans{
		plans: NewPlan(config),
	}
}

func (m *resourcePlans) set(urn resource.URN, plan *ResourcePlan) {
	m.m.Lock()
	defer m.m.Unlock()

	if _, ok := m.plans.ResourcePlans[urn]; ok {
		panic(fmt.Sprintf("tried to set resource plan for %s but it's already been set", urn))
	}

	m.plans.ResourcePlans[urn] = plan
}

func (m *resourcePlans) get(urn resource.URN) (*ResourcePlan, bool) {
	m.m.RLock()
	defer m.m.RUnlock()

	p, ok := m.plans.ResourcePlans[urn]
	return p, ok
}

func (m *resourcePlans) plan() *Plan {
	return &m.plans
}

// A Deployment manages the iterative computation and execution of a deployment based on a stream of goal states.
// A running deployment emits events that indicate its progress. These events must be used to record the new state
// of the deployment target.
type Deployment struct {
	// the plugin context (for provider operations).
	ctx *plugin.Context
	// options for this deployment.
	opts *Options
	// event handlers for this deployment.
	events Events
	// the deployment target.
	target *Target
	// the old resource snapshot for comparison.
	prev *Snapshot
	// a map of all old resources.
	olds map[resource.URN]*resource.State
	// a map of all planned resource changes, if any.
	plan *Plan
	// resources to import, if this is an import deployment.
	imports []Import
	// true if this is an import deployment.
	isImport bool
	// the schema cache for this deployment, if any.
	schemaLoader schema.Loader
	// the source of new resources.
	source Source
	// the policy packs to run during this deployment's generation.
	localPolicyPackPaths []string
	// the dependency graph of the old snapshot.
	depGraph *graph.DependencyGraph
	// the provider registry for this deployment.
	providers *providers.Registry
	// the set of resource goals generated by the deployment.
	goals *gsync.Map[resource.URN, *resource.Goal]
	// the set of new resources generated by the deployment.
	news *gsync.Map[resource.URN, *resource.State]
	// the set of new resource plans.
	newPlans *resourcePlans
}

// addDefaultProviders adds any necessary default provider definitions and references to the given snapshot. Version
// information for these providers is sourced from the snapshot's manifest; inputs parameters are sourced from the
// stack's configuration.
func addDefaultProviders(target *Target, source Source, prev *Snapshot) error {
	if prev == nil {
		return nil
	}

	// Pull the versions we'll use for default providers from the snapshot's manifest.
	defaultProviderInfo := make(map[tokens.Package]workspace.PluginSpec)
	for _, p := range prev.Manifest.Plugins {
		defaultProviderInfo[tokens.Package(p.Name)] = p.Spec()
	}

	// Determine the necessary set of default providers and inject references to default providers as appropriate.
	//
	// We do this by scraping the snapshot for custom resources that does not reference a provider and adding
	// default providers for these resources' packages. Each of these resources is rewritten to reference the default
	// provider for its package.
	//
	// The configuration for each default provider is pulled from the stack's configuration information.
	var defaultProviders []*resource.State
	defaultProviderRefs := make(map[tokens.Package]providers.Reference)
	for _, res := range prev.Resources {
		if providers.IsProviderType(res.URN.Type()) || !res.Custom || res.Provider != "" {
			continue
		}

		pkg := res.URN.Type().Package()
		ref, ok := defaultProviderRefs[pkg]
		if !ok {
			inputs, err := target.GetPackageConfig(pkg)
			if err != nil {
				return fmt.Errorf("could not fetch configuration for default provider '%v'", pkg)
			}
			if pkgInfo, ok := defaultProviderInfo[pkg]; ok {
				providers.SetProviderVersion(inputs, pkgInfo.Version)
				providers.SetProviderURL(inputs, pkgInfo.PluginDownloadURL)
				providers.SetProviderChecksums(inputs, pkgInfo.Checksums)
			}

			uuid, err := uuid.NewV4()
			if err != nil {
				return err
			}

			urn, id := defaultProviderURN(target, source, pkg), resource.ID(uuid.String())
			ref, err = providers.NewReference(urn, id)
			contract.Assertf(err == nil,
				"could not create provider reference with URN %v and ID %v", urn, id)

			provider := &resource.State{
				Type:    urn.Type(),
				URN:     urn,
				Custom:  true,
				ID:      id,
				Inputs:  inputs,
				Outputs: inputs,
			}
			defaultProviders = append(defaultProviders, provider)
			defaultProviderRefs[pkg] = ref
		}
		res.Provider = ref.String()
	}

	// If any default providers are necessary, prepend their definitions to the snapshot's resources. This trivially
	// guarantees that all default provider references name providers that precede the referent in the snapshot.
	if len(defaultProviders) != 0 {
		prev.Resources = append(defaultProviders, prev.Resources...)
	}

	return nil
}

// migrateProviders is responsible for adding default providers to old snapshots and filling in output properties for
// providers that do not have them.
func migrateProviders(target *Target, prev *Snapshot, source Source) error {
	// Add any necessary default provider references to the previous snapshot in order to accommodate stacks that were
	// created prior to the changes that added first-class providers. We do this here rather than in the migration
	// package s.t. the inputs to any default providers (which we fetch from the stacks's configuration) are as
	// accurate as possible.
	if err := addDefaultProviders(target, source, prev); err != nil {
		return err
	}

	// Migrate provider resources from the old, output-less format to the new format where all inputs are reflected as
	// outputs.
	if prev != nil {
		for _, res := range prev.Resources {
			// If we have no old outputs for a provider, use its old inputs as its old outputs. This handles the
			// scenario where the CLI is being upgraded from a version that did not reflect provider inputs to
			// provider outputs, and a provider is being upgraded from a version that did not implement DiffConfig to
			// a version that does.
			if providers.IsProviderType(res.URN.Type()) && len(res.Inputs) != 0 && len(res.Outputs) == 0 {
				res.Outputs = res.Inputs
			}
		}
	}

	return nil
}

func buildResourceMap(prev *Snapshot, preview bool) ([]*resource.State, map[resource.URN]*resource.State, error) {
	olds := make(map[resource.URN]*resource.State)
	if prev == nil {
		return nil, olds, nil
	}

	for _, oldres := range prev.Resources {
		// Ignore resources that are pending deletion; these should not be recorded in the LUT.
		if oldres.Delete {
			continue
		}

		urn := oldres.URN
		if olds[urn] != nil {
			return nil, nil, fmt.Errorf("unexpected duplicate resource '%s'", urn)
		}
		olds[urn] = oldres
	}

	return prev.Resources, olds, nil
}

// NewDeployment creates a new deployment from a resource snapshot plus a package to evaluate.
//
// From the old and new states, it understands how to orchestrate an evaluation and analyze the resulting resources.
// The deployment may be used to simply inspect a series of operations, or actually perform them; these operations are
// generated based on analysis of the old and new states.  If a resource exists in new, but not old, for example, it
// results in a create; if it exists in both, but is different, it results in an update; and so on and so forth.
//
// Note that a deployment uses internal concurrency and parallelism in various ways, so it must be closed if for some
// reason it isn't carried out to its final conclusion. This will result in cancellation and reclamation of resources.
func NewDeployment(
	ctx *plugin.Context,
	opts *Options,
	events Events,
	target *Target,
	prev *Snapshot,
	plan *Plan,
	source Source,
	localPolicyPackPaths []string,
	backendClient BackendClient,
) (*Deployment, error) {
	contract.Requiref(ctx != nil, "ctx", "must not be nil")
	contract.Requiref(target != nil, "target", "must not be nil")
	contract.Requiref(source != nil, "source", "must not be nil")

	if err := migrateProviders(target, prev, source); err != nil {
		return nil, err
	}

	// Produce a map of all old resources for fast access.
	//
	// NOTE: we can and do mutate prev.Resources, olds, and depGraph during execution after performing a refresh. See
	// deploymentExecutor.refresh for details.
	oldResources, olds, err := buildResourceMap(prev, opts.DryRun)
	if err != nil {
		return nil, err
	}

	// Build the dependency graph for the old resources.
	depGraph := graph.NewDependencyGraph(oldResources)

	// Create a goal map for the deployment.
	newGoals := &gsync.Map[resource.URN, *resource.Goal]{}

	// Create a resource map for the deployment.
	newResources := &gsync.Map[resource.URN, *resource.State]{}

	// Create a new builtin provider. This provider implements features such as `getStack`.
	builtins := newBuiltinProvider(backendClient, newResources, ctx.Diag)

	// Create a new provider registry. Although we really only need to pass in any providers that were present in the
	// old resource list, the registry itself will filter out other sorts of resources when processing the prior state,
	// so we just pass all of the old resources.
	reg := providers.NewRegistry(ctx.Host, opts.DryRun, builtins)

	return &Deployment{
		ctx:                  ctx,
		opts:                 opts,
		events:               events,
		target:               target,
		prev:                 prev,
		plan:                 plan,
		olds:                 olds,
		source:               source,
		localPolicyPackPaths: localPolicyPackPaths,
		depGraph:             depGraph,
		providers:            reg,
		goals:                newGoals,
		news:                 newResources,
		newPlans:             newResourcePlan(target.Config),
	}, nil
}

func (d *Deployment) Ctx() *plugin.Context                   { return d.ctx }
func (d *Deployment) Target() *Target                        { return d.target }
func (d *Deployment) Diag() diag.Sink                        { return d.ctx.Diag }
func (d *Deployment) Prev() *Snapshot                        { return d.prev }
func (d *Deployment) Olds() map[resource.URN]*resource.State { return d.olds }
func (d *Deployment) Source() Source                         { return d.source }

func (d *Deployment) SameProvider(res *resource.State) error {
	return d.providers.Same(res)
}

// EnsureProvider ensures that the provider for the given resource is available in the registry. It assumes
// the provider is available in the previous snapshot.
func (d *Deployment) EnsureProvider(provider string) error {
	if provider == "" {
		return nil
	}

	providerRef, err := providers.ParseReference(provider)
	if err != nil {
		return fmt.Errorf("invalid provider reference %v: %w", provider, err)
	}
	_, has := d.GetProvider(providerRef)
	if !has {
		// We need to create the provider in the registry, find its old state and just "Same" it.
		var providerResource *resource.State
		for _, r := range d.prev.Resources {
			if r.URN == providerRef.URN() && r.ID == providerRef.ID() {
				providerResource = r
				break
			}
		}
		if providerResource == nil {
			return fmt.Errorf("could not find provider %v", providerRef)
		}

		err := d.SameProvider(providerResource)
		if err != nil {
			return fmt.Errorf("could not create provider %v: %w", providerRef, err)
		}
	}

	return nil
}

func (d *Deployment) GetProvider(ref providers.Reference) (plugin.Provider, bool) {
	return d.providers.GetProvider(ref)
}

// generateURN generates a resource's URN from its parent, type, and name under the scope of the deployment's stack and
// project.
func (d *Deployment) generateURN(parent resource.URN, ty tokens.Type, name string) resource.URN {
	// Use the resource goal state name to produce a globally unique URN.
	parentType := tokens.Type("")
	if parent != "" && parent.QualifiedType() != resource.RootStackType {
		// Skip empty parents and don't use the root stack type; otherwise, use the full qualified type.
		parentType = parent.QualifiedType()
	}

	return resource.NewURN(d.Target().Name.Q(), d.source.Project(), parentType, ty, name)
}

// defaultProviderURN generates the URN for the global provider given a package.
func defaultProviderURN(target *Target, source Source, pkg tokens.Package) resource.URN {
	return resource.NewURN(target.Name.Q(), source.Project(), "", providers.MakeProviderType(pkg), "default")
}

// generateEventURN generates a URN for the resource associated with the given event.
func (d *Deployment) generateEventURN(event SourceEvent) resource.URN {
	contract.Requiref(event != nil, "event", "must not be nil")

	switch e := event.(type) {
	case RegisterResourceEvent:
		goal := e.Goal()
		return d.generateURN(goal.Parent, goal.Type, goal.Name)
	case ReadResourceEvent:
		return d.generateURN(e.Parent(), e.Type(), e.Name())
	case RegisterResourceOutputsEvent:
		return e.URN()
	default:
		return ""
	}
}

// Execute executes a deployment to completion, using the given cancellation context and running a preview or update.
func (d *Deployment) Execute(ctx context.Context) (*Plan, error) {
	deploymentExec := &deploymentExecutor{deployment: d}
	return deploymentExec.Execute(ctx)
}