// Copyright 2016-2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deploy

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"github.com/pulumi/pulumi/pkg/v3/util/gsync"
	"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
	"github.com/pulumi/pulumi/sdk/v3/go/common/promise"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

const (
	// Dummy workerID for synchronous operations.
	synchronousWorkerID = -1
	infiniteWorkerID    = -2

	// Utility constant for easy debugging.
	stepExecutorLogLevel = 4
)

// StepApplyFailed is a sentinel error for errors that arise when step application fails.
// We (the step executor) are not responsible for reporting those errors so this sentinel ensures
// that we don't do so.
type StepApplyFailed struct {
	Err error
}

func (saf StepApplyFailed) Error() string {
	return fmt.Sprintf("step application failed: %s", saf.Err)
}

func (saf StepApplyFailed) Unwrap() error {
	return saf.Err
}

// The step executor operates in terms of "chains" and "antichains". A chain is set of steps that are totally ordered
// when ordered by dependency; each step in a chain depends directly on the step that comes before it. An antichain
// is a set of steps that is completely incomparable when ordered by dependency. The step executor is aware that chains
// must be executed serially and antichains can be executed concurrently.
//
// See https://en.wikipedia.org/wiki/Antichain for more complete definitions. The below type aliases are useful for
// documentation purposes.

// A Chain is a sequence of Steps that must be executed in the given order.
type chain = []Step

// An Antichain is a set of Steps that can be executed in parallel.
type antichain = []Step

// A CompletionToken is a token returned by the step executor that is completed when the chain has completed execution.
// Callers can use it to optionally wait synchronously on the completion of a chain.
type completionToken struct {
	channel chan bool
}

// Wait blocks until the completion token is signalled or until the given context completes, whatever occurs first.
func (c completionToken) Wait(ctx context.Context) {
	select {
	case <-c.channel:
	case <-ctx.Done():
	}
}

// incomingChain represents a request to the step executor to execute a chain.
type incomingChain struct {
	Chain          chain     // The chain we intend to execute
	CompletionChan chan bool // A completion channel to be closed when the chain has completed execution
}

// stepExecutor is the component of the engine responsible for taking steps and executing
// them, possibly in parallel if requested. The step generator operates on the granularity
// of "chains", which are sequences of steps that must be executed exactly in the given order.
// Chains are a simplification of the full dependency graph DAG within Pulumi programs. Since
// Pulumi language hosts can only invoke the resource monitor once all of their dependencies have
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
// ready to execute.
type stepExecutor struct {
	// The deployment currently being executed.
	deployment *Deployment
	// Resources that have been created but are pending a RegisterResourceOutputs.
	pendingNews gsync.Map[resource.URN, Step]

	// True if errors should be ignored completely, without any handling or
	// reporting. This is used in the case of imports and refreshes. It is _not_
	// the same as ContinueOnError, which allows execution to continue in the face
	// of errors that may occur during updates. If both ignoreErrors and
	// ContinueOnError are set, ignoreErrors takes precedence.
	ignoreErrors bool

	// Lock protecting the running of workers. This can be used to synchronize with step executor.
	workerLock sync.RWMutex

	workers        sync.WaitGroup     // WaitGroup tracking the worker goroutines that are owned by this step executor.
	incomingChains chan incomingChain // Incoming chains that we are to execute

	ctx    context.Context    // cancellation context for the current deployment.
	cancel context.CancelFunc // CancelFunc that cancels the above context.

	// async promise indicating an error seen by the step executor, if multiple errors are seen this will only
	// record the first.
	sawError promise.CompletionSource[struct{}]

	erroredStepLock sync.RWMutex
	erroredSteps    []Step

	// ExecuteRegisterResourceOutputs will save the event for the stack resource so that the stack outputs
	// can be finalized at the end of the deployment. We do this so we can determine whether or not the
	// deployment succeeded. If there were errors, we update any stack outputs that were updated, but don't delete
	// any old outputs.
	stackOutputsEvent RegisterResourceOutputsEvent
}

//
// The stepExecutor communicates with a stepGenerator by listening to a channel. As the step generator
// generates new chains that need to be executed, the step executor will listen to this channel to execute
// those steps.
//

// Execute submits a Chain for asynchronous execution. The execution of the chain will begin as soon as there
// is a worker available to execute it.
func (se *stepExecutor) ExecuteSerial(chain chain) completionToken {
	// The select here is to avoid blocking on a send to se.incomingChains if a cancellation is pending.
	// If one is pending, we should exit early - we will shortly be tearing down the engine and exiting.

	completion := make(chan bool)
	select {
	case se.incomingChains <- incomingChain{Chain: chain, CompletionChan: completion}:
	case <-se.ctx.Done():
		close(completion)
	}

	return completionToken{channel: completion}
}

// Locks the step executor from executing any more steps. This is used to synchronize with the step executor.
func (se *stepExecutor) Lock() {
	se.workerLock.Lock()
}

// Unlocks the step executor to allow it to execute more steps. This is used to synchronize with the step executor.
func (se *stepExecutor) Unlock() {
	se.workerLock.Unlock()
}

func (se *stepExecutor) GetErroredSteps() []Step {
	se.erroredStepLock.RLock()
	defer se.erroredStepLock.RUnlock()
	return se.erroredSteps
}

// ExecuteParallel submits an antichain for parallel execution. All of the steps within the antichain are submitted for
// concurrent execution.
func (se *stepExecutor) ExecuteParallel(antichain antichain) completionToken {
	var wg sync.WaitGroup

	// ExecuteParallel is implemented in terms of ExecuteSerial - it executes each step individually and waits for all
	// of the steps to complete.
	wg.Add(len(antichain))
	for _, step := range antichain {
		tok := se.ExecuteSerial(chain{step})
		go func() {
			defer wg.Done()
			tok.Wait(se.ctx)
		}()
	}

	done := make(chan bool)
	go func() {
		wg.Wait()
		close(done)
	}()

	return completionToken{channel: done}
}

// ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine.
func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputsEvent) error {
	return se.executeRegisterResourceOutputs(e, false /* errored */, false /* finalizingStackOutputs */)
}

func (se *stepExecutor) executeRegisterResourceOutputs(
	e RegisterResourceOutputsEvent,
	errored,
	finalizingStackOutputs bool,
) error {
	urn := e.URN()

	if finalizingStackOutputs {
		contract.Assertf(urn.QualifiedType() == resource.RootStackType, "expected a stack resource urn, got %v", urn)
	}

	// If we're not finalizing and we've received an event for the stack's outputs, save the event for finalization
	// later. We finalize stack outputs at the end of the deployment, so we can determine whether or not the
	// deployment succeeded. If the deployment was successful, we use the new stack outputs. If there was an error,
	// we only replace outputs that have new outputs, but to otherwise leave old outputs untouched.
	if !finalizingStackOutputs && urn.QualifiedType() == resource.RootStackType {
		se.stackOutputsEvent = e
		e.Done()
		return nil
	}

	// Look up the final state in the pending registration list.
	reg, has := se.pendingNews.Load(urn)
	if !has {
		return fmt.Errorf("cannot complete a resource '%v' whose registration isn't pending", urn)
	}
	contract.Assertf(reg != nil, "expected a non-nil resource step ('%v')", urn)
	se.pendingNews.Delete(urn)
	// Unconditionally set the resource's outputs to what was provided.  This intentionally overwrites whatever
	// might already be there, since otherwise "deleting" outputs would have no affect.
	outs := e.Outputs()
	se.log(synchronousWorkerID,
		"registered resource outputs %s: old=#%d, new=#%d", urn, len(reg.New().Outputs), len(outs))

	old := se.deployment.Olds()[urn]
	var oldOuts resource.PropertyMap
	if old != nil {
		oldOuts = old.Outputs
	}

	// If we're finalizing stack outputs and there was an error, the absence of an output can't safely be assumed to
	// mean it was deleted, so we keep old outputs, overwriting new ones.
	if finalizingStackOutputs && errored {
		outs = oldOuts.Copy()
		for k, v := range e.Outputs() {
			outs[k] = v
		}
	}

	// If a plan is present check that these outputs match what we recorded before
	if se.deployment.plan != nil {
		resourcePlan, ok := se.deployment.plan.ResourcePlans[urn]
		if !ok {
			return fmt.Errorf("no plan for resource %v", urn)
		}

		if err := resourcePlan.checkOutputs(oldOuts, outs); err != nil {
			return fmt.Errorf("resource violates plan: %w", err)
		}
	}

	reg.New().Lock.Lock()
	reg.New().Outputs = outs
	reg.New().Lock.Unlock()

	// If we're generating plans save these new outputs to the plan
	if se.deployment.opts.GeneratePlan {
		if resourcePlan, ok := se.deployment.newPlans.get(urn); ok {
			resourcePlan.Goal.OutputDiff = NewPlanDiff(oldOuts.Diff(outs))
			resourcePlan.Outputs = outs
		} else {
			return fmt.Errorf(
				"resource should already have a plan from when we called register resources [urn=%v]", urn)
		}
	}

	// If there is an event subscription for finishing the resource, execute them.
	if e := se.deployment.events; e != nil {
		if eventerr := e.OnResourceOutputs(reg); eventerr != nil {
			se.log(synchronousWorkerID, "register resource outputs failed: %s", eventerr)

			// This is a bit of a kludge, but ExecuteRegisterResourceOutputs is an odd duck
			// in that it doesn't execute on worker goroutines. Arguably, it should, but today it's
			// not possible to express RegisterResourceOutputs as a step. We could 1) more generally allow
			// clients of stepExecutor to do work on worker threads by e.g. scheduling arbitrary callbacks
			// or 2) promote RRE to be step-like so that it can be scheduled as if it were a step. Neither
			// of these are particularly appealing right now.
			outErr := fmt.Errorf("resource complete event returned an error: %w", eventerr)
			diagMsg := diag.RawMessage(reg.URN(), outErr.Error())
			se.deployment.Diag().Errorf(diagMsg)
			se.cancelDueToError(eventerr, nil)
			return nil
		}
	}
	if !finalizingStackOutputs {
		e.Done()
	}
	return nil
}

// Errored returns whether or not this step executor saw a step whose execution ended in failure.
func (se *stepExecutor) Errored() error {
	// See if the sawError promise has been rejected yet
	_, err, _ := se.sawError.Promise().TryResult()
	// err will be nil if the promise has not been rejected yet
	return err
}

// SignalCompletion signals to the stepExecutor that there are no more chains left to execute. All worker
// threads will terminate as soon as they retire all of the work they are currently executing.
func (se *stepExecutor) SignalCompletion() {
	close(se.incomingChains)
}

// WaitForCompletion blocks the calling goroutine until the step executor completes execution of all in-flight
// chains.
func (se *stepExecutor) WaitForCompletion() {
	se.log(synchronousWorkerID, "StepExecutor.waitForCompletion(): waiting for worker threads to exit")
	se.workers.Wait()
	se.log(synchronousWorkerID, "StepExecutor.waitForCompletion(): worker threads all exited")
}

//
// As calls to `Execute` submit chains for execution, some number of worker goroutines will continuously
// read from `incomingChains` and execute any chains that are received. The core execution logic is in
// the next few functions.
//

// executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the
// context is canceled, the chain stops execution.
func (se *stepExecutor) executeChain(workerID int, chain chain) {
	for _, step := range chain {
		select {
		case <-se.ctx.Done():
			se.log(workerID, "step %v on %v canceled", step.Op(), step.URN())
			return
		default:
		}

		// Take the work lock before executing the step, this uses the "read" side of the lock because we're ok with as
		// many workers as possible executing steps in parallel.
		se.workerLock.RLock()
		err := se.executeStep(workerID, step)
		// Regardless of error we need to release the lock here.
		se.workerLock.RUnlock()

		if err != nil {
			se.log(workerID, "step %v on %v failed, signalling cancellation", step.Op(), step.URN())
			se.cancelDueToError(err, step)

			var saf StepApplyFailed
			if !errors.As(err, &saf) {
				// Step application errors are recorded by the OnResourceStepPost callback. This is confusing,
				// but it means that at this level we shouldn't be logging any errors that came from there.
				//
				// The StepApplyFailed sentinel signals that the error that failed this chain was a step apply
				// error and that we shouldn't log it. Everything else should be logged to the diag system as usual.
				diagMsg := diag.RawMessage(step.URN(), err.Error())
				se.deployment.Diag().Errorf(diagMsg)
			}
			return
		}
	}
}

func (se *stepExecutor) cancelDueToError(err error, step Step) {
	set := se.sawError.Reject(err)
	if !set {
		logging.V(10).Infof("StepExecutor already recorded an error then saw: %v", err)
	}
	if se.ignoreErrors {
		// Do nothing.
	} else if se.deployment.opts.ContinueOnError {
		step.Fail()
		// Record the failure, but allow the deployment to continue.
		se.erroredStepLock.Lock()
		defer se.erroredStepLock.Unlock()
		se.erroredSteps = append(se.erroredSteps, step)
	} else {
		se.cancel()
	}
}

//
// The next few functions are responsible for executing individual steps. The basic flow of step
// execution is
//   1. The pre-step event is raised, if there are any attached callbacks to the engine
//   2. If successful, the step is executed (if not a preview)
//   3. The post-step event is raised, if there are any attached callbacks to the engine
//
// The pre-step event returns an interface{}, which is some arbitrary context that must be passed
// verbatim to the post-step event.
//

// executeStep executes a single step, returning true if the step execution was successful and
// false if it was not.
func (se *stepExecutor) executeStep(workerID int, step Step) error {
	var payload interface{}
	events := se.deployment.events
	if events != nil {
		var err error
		payload, err = events.OnResourceStepPre(step)
		if err != nil {
			se.log(workerID, "step %v on %v failed pre-resource step: %v", step.Op(), step.URN(), err)
			return fmt.Errorf("pre-step event returned an error: %w", err)
		}
	}

	se.log(workerID, "applying step %v on %v (preview %v)", step.Op(), step.URN(), se.deployment.opts.DryRun)
	status, stepComplete, err := step.Apply()

	if err == nil {
		// If we have a state object, and this is a create or update, remember it, as we may need to update it later.
		if step.Logical() && step.New() != nil {
			if prior, has := se.pendingNews.Load(step.URN()); has {
				return fmt.Errorf("resource '%s' registered twice (%s and %s)", step.URN(), prior.Op(), step.Op())
			}

			se.pendingNews.Store(step.URN(), step)
		}
	}

	// Ensure that any secrets properties in the output are marked as such and that the resource is tracked in the set
	// of registered resources. We skip this for replace steps because while they _do_ have a "new" side to them that
	// state may have already been added to the snapshot manager (in the case of create before delete replacements
	// because the Create step is run before the Replace step) and mutating the state again causes dataraces (see
	// https://github.com/pulumi/pulumi/issues/14994).
	if step.New() != nil && step.Op() != OpReplace {
		newState := step.New()
		newState.Lock.Lock()

		for _, k := range newState.AdditionalSecretOutputs {
			if k == "id" {
				se.deployment.Diag().Warningf(&diag.Diag{
					URN:     step.URN(),
					Message: "The 'id' property cannot be made secret. See pulumi/pulumi#2717 for more details.",
				})
			} else {
				if v, has := newState.Outputs[k]; has && !v.IsSecret() {
					newState.Outputs[k] = resource.MakeSecret(v)
				} else if !has { //nolint:staticcheck // https://github.com/pulumi/pulumi/issues/9926
					// TODO (https://github.com/pulumi/pulumi/issues/9926): We want to re-enable this warning
					// but it requires that providers always return back _every_ output even in preview. We
					// might need to add a new "unset" PropertyValue to do this as there might be optional
					// secret outputs and the engine needs to be able to tell the difference between "this
					// isn't a valid output of the resource" and "this value just hasn't been set in this
					// instance". Arguably for user side additionalSecretOutputs that distinction probably
					// doesn't matter (if you ask for an optional output to be made secret but then the
					// provider doesn't return it maybe you want the warning that nothing is actually being
					// affected?). But for SDK generated we always send the same list and the user doesn't
					// control it so we need to make sure that if there is an optional output that this
					// warning doesn't get triggered.

					// User asked us to make k a secret, but we don't have a property k. This is probably a
					// mistake (mostly likely due to casing, eg my_prop vs myProp) but warn the user so they know
					// the key didn't do anything.
					// msg := fmt.Sprintf("Could not find property '%s' listed in additional secret outputs.", k)
					// se.deployment.Diag().Warningf(diag.RawMessage(step.URN(), msg))
				}
			}
		}

		// If an input secret is potentially leaked as an output, preemptively mark it as secret.
		for k, out := range newState.Outputs {
			if !out.IsSecret() {
				in, has := newState.Inputs[k]
				if !has {
					continue
				}
				if in.IsSecret() {
					newState.Outputs[k] = resource.MakeSecret(out)
				}
			}
		}

		newState.Lock.Unlock()

		// If this is not a resource that is managed by Pulumi, then we can ignore it.
		if _, hasGoal := se.deployment.goals.Load(newState.URN); hasGoal {
			se.deployment.news.Store(newState.URN, newState)
		}

		// If we're generating plans update the resource's outputs in the generated plan.
		if se.deployment.opts.GeneratePlan {
			if resourcePlan, ok := se.deployment.newPlans.get(newState.URN); ok {
				resourcePlan.Outputs = newState.Outputs
			}
		}
	}

	if events != nil {
		if postErr := events.OnResourceStepPost(payload, step, status, err); postErr != nil {
			se.log(workerID, "step %v on %v failed post-resource step: %v", step.Op(), step.URN(), postErr)
			return fmt.Errorf("post-step event returned an error: %w", postErr)
		}
	}

	// Calling stepComplete allows steps that depend on this step to continue. OnResourceStepPost saved the results
	// of the step in the snapshot, so we are ready to go.
	if stepComplete != nil {
		se.log(workerID, "step %v on %v retired", step.Op(), step.URN())
		stepComplete()
	}

	if err != nil {
		se.log(workerID, "step %v on %v failed with an error: %v", step.Op(), step.URN(), err)
		return StepApplyFailed{err}
	}

	return nil
}

// log is a simple logging helper for the step executor.
func (se *stepExecutor) log(workerID int, msg string, args ...interface{}) {
	if logging.V(stepExecutorLogLevel) {
		message := fmt.Sprintf(msg, args...)
		logging.V(stepExecutorLogLevel).Infof("StepExecutor worker(%d): %s", workerID, message)
	}
}

//
// The step executor owns a number of goroutines that it considers to be "workers", responsible for
// executing steps. By default, as we ease into the waters of parallelism, there is at most one worker
// active.
//
// Workers continuously pull from se.incomingChains, executing chains as they are provided to the executor.
// There are two reasons why a worker would exit:
//
//  1. A worker exits if se.ctx is canceled. There are two ways that se.ctx gets canceled: first, if there is
//     a step error in another worker, it will cancel the context. Second, if the deployment executor experiences an
//     error when generating steps or doing pre or post-step events, it will cancel the context.
//  2. A worker exits if it experiences an error when running a step.
//

// worker is the base function for all step executor worker goroutines. It continuously polls for new chains
// and executes any that it gets from the channel. If `launchAsync` is true, worker launches a new goroutine
// that will execute the chain so that the execution continues asynchronously and this worker can proceed to
// the next chain.
func (se *stepExecutor) worker(workerID int, launchAsync bool) {
	se.log(workerID, "worker coming online")
	defer se.workers.Done()

	oneshotWorkerID := 0
	for {
		se.log(workerID, "worker waiting for incoming chains")
		select {
		case request := <-se.incomingChains:
			if request.Chain == nil {
				se.log(workerID, "worker received nil chain, exiting")
				return
			}

			se.log(workerID, "worker received chain for execution")
			if !launchAsync {
				se.executeChain(workerID, request.Chain)
				close(request.CompletionChan)
				continue
			}

			// If we're launching asynchronously, make up a new worker ID for this new oneshot worker and record its
			// launch with our worker wait group.
			se.workers.Add(1)
			newWorkerID := oneshotWorkerID
			go func() {
				defer se.workers.Done()
				se.log(newWorkerID, "launching oneshot worker")
				se.executeChain(newWorkerID, request.Chain)
				close(request.CompletionChan)
			}()

			oneshotWorkerID++
		case <-se.ctx.Done():
			se.log(workerID, "worker exiting due to cancellation")
			return
		}
	}
}

func newStepExecutor(
	ctx context.Context,
	cancel context.CancelFunc,
	deployment *Deployment,
	ignoreErrors bool,
) *stepExecutor {
	exec := &stepExecutor{
		deployment:     deployment,
		ignoreErrors:   ignoreErrors,
		incomingChains: make(chan incomingChain),
		ctx:            ctx,
		cancel:         cancel,
	}

	// If we're being asked to run as parallel as possible, spawn a single worker that launches chain executions
	// asynchronously.
	if deployment.opts.InfiniteParallelism() {
		exec.workers.Add(1)
		go exec.worker(infiniteWorkerID, true /*launchAsync*/)
		return exec
	}

	// Otherwise, launch a worker goroutine for each degree of parallelism.
	fanout := deployment.opts.DegreeOfParallelism()
	for i := 0; i < fanout; i++ {
		exec.workers.Add(1)
		go exec.worker(i, false /*launchAsync*/)
	}

	return exec
}