// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package deploy

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"

	"github.com/pkg/errors"
	"github.com/pulumi/pulumi/pkg/diag"
	"github.com/pulumi/pulumi/pkg/util/contract"
	"github.com/pulumi/pulumi/pkg/util/logging"
)

const (
	// Dummy workerID for synchronous operations.
	synchronousWorkerID = -1

	// Utility constant for easy debugging.
	stepExecutorLogLevel = 4
)

var (
	// errStepApplyFailed is a sentinel error for errors that arise when step application fails.
	// We (the step executor) are not responsible for reporting those errors so this sentinel ensures
	// that we don't do so.
	errStepApplyFailed = errors.New("step application failed")
)

// A Chain is a sequence of Steps that must be executed in the given order.
type Chain = []Step

// stepExecutor is the component of the engine responsible for taking steps and executing
// them, possibly in parallel if requested. The step generator operates on the granularity
// of "chains", which are sequences of steps that must be executed exactly in the given order.
// Chains are a simplification of the full dependency graph DAG within Pulumi programs. Since
// Pulumi language hosts can only invoke the resource monitor once all of their dependencies have
// resolved, we (the engine) can assume that any chain given to us by the step generator is already
// ready to execute.
type stepExecutor struct {
	plan        *Plan    // The plan currently being executed.
	opts        Options  // The options for this current plan.
	preview     bool     // Whether or not we are doing a preview.
	pendingNews sync.Map // Resources that have been created but are pending a RegisterResourceOutputs.

	workers        sync.WaitGroup // WaitGroup tracking the worker goroutines that are owned by this step executor.
	incomingChains chan Chain     // Incoming chains that we are to execute

	ctx      context.Context    // cancellation context for the current plan.
	cancel   context.CancelFunc // CancelFunc that cancels the above context.
	sawError atomic.Value       // atomic boolean indicating whether or not the step excecutor saw that there was an error.
}

//
// The stepExecutor communicates with a stepGenerator by listening to a channel. As the step generator
// generates new chains that need to be executed, the step executor will listen to this channel to execute
// those steps.
//

// Execute submits a Chain for asynchronous execution. The execution of the chain will begin as soon as there
// is a worker available to execute it.
func (se *stepExecutor) Execute(chain Chain) {
	// The select here is to avoid blocking on a send to se.incomingChains if a cancellation is pending.
	// If one is pending, we should exit early - we will shortly be tearing down the engine and exiting.
	select {
	case se.incomingChains <- chain:
	case <-se.ctx.Done():
	}
}

// ExecuteRegisterResourceOutputs services a RegisterResourceOutputsEvent synchronously on the calling goroutine.
func (se *stepExecutor) ExecuteRegisterResourceOutputs(e RegisterResourceOutputsEvent) {
	// Look up the final state in the pending registration list.
	urn := e.URN()
	value, has := se.pendingNews.Load(urn)
	contract.Assertf(has, "cannot complete a resource '%v' whose registration isn't pending", urn)
	reg := value.(Step)
	contract.Assertf(reg != nil, "expected a non-nil resource step ('%v')", urn)
	se.pendingNews.Delete(urn)
	// Unconditionally set the resource's outputs to what was provided.  This intentionally overwrites whatever
	// might already be there, since otherwise "deleting" outputs would have no affect.
	outs := e.Outputs()
	se.log(synchronousWorkerID,
		"registered resource outputs %s: old=#%d, new=#%d", urn, len(reg.New().Outputs), len(outs))
	reg.New().Outputs = e.Outputs()
	// If there is an event subscription for finishing the resource, execute them.
	if e := se.opts.Events; e != nil {
		if eventerr := e.OnResourceOutputs(reg); eventerr != nil {
			se.log(synchronousWorkerID, "register resource outputs failed: %s", eventerr.Error())

			// This is a bit of a kludge, but ExecuteRegisterResourceOutputs is an odd duck
			// in that it doesn't execute on worker goroutines. Arguably, it should, but today it's
			// not possible to express RegisterResourceOutputs as a step. We could 1) more generally allow
			// clients of stepExecutor to do work on worker threads by e.g. scheduling arbitrary callbacks
			// or 2) promote RRE to be step-like so that it can be scheduled as if it were a step. Neither
			// of these are particularly appealing right now.
			outErr := errors.Wrap(eventerr, "resource complete event returned an error")
			diagMsg := diag.RawMessage(reg.URN(), outErr.Error())
			se.plan.Diag().Errorf(diagMsg)
			se.cancelDueToError()
			return
		}
	}
	e.Done()
}

// Errored returnes whether or not this step executor saw a step whose execution ended in failure.
func (se *stepExecutor) Errored() bool {
	return se.sawError.Load().(bool)
}

// SignalCompletion signals to the stepExecutor that there are no more chains left to execute. All worker
// threads will terminate as soon as they retire all of the work they are currently executing.
func (se *stepExecutor) SignalCompletion() {
	close(se.incomingChains)
}

// WaitForCompletion blocks the calling goroutine until the step executor completes execution of all in-flight
// chains.
func (se *stepExecutor) WaitForCompletion() {
	se.log(synchronousWorkerID, "StepExecutor.waitForCompletion(): waiting for worker threads to exit")
	se.workers.Wait()
	se.log(synchronousWorkerID, "StepExecutor.waitForCompletion(): worker threads all exited")
}

//
// As calls to `Execute` submit chains for execution, some number of worker goroutines will continuously
// read from `incomingChains` and execute any chains that are received. The core execution logic is in
// the next few functions.
//

// executeChain executes a chain, one step at a time. If any step in the chain fails to execute, or if the
// context is canceled, the chain stops execution.
func (se *stepExecutor) executeChain(workerID int, chain Chain) {
	for _, step := range chain {
		select {
		case <-se.ctx.Done():
			se.log(workerID, "step %v on %v canceled", step.Op(), step.URN())
			return
		default:
		}

		if err := se.executeStep(workerID, step); err != nil {
			se.log(workerID, "step %v on %v failed, signalling cancellation", step.Op(), step.URN())
			se.cancelDueToError()
			if err != errStepApplyFailed {
				// Step application errors are recorded by the OnResourceStepPost callback. This is confusing,
				// but it means that at this level we shouldn't be logging any errors that came from there.
				//
				// The errStepApplyFailed sentinel signals that the error that failed this chain was a step apply
				// error and that we shouldn't log it. Everything else should be logged to the diag system as usual.
				diagMsg := diag.RawMessage(step.URN(), err.Error())
				se.plan.Diag().Errorf(diagMsg)
			}
			return
		}
	}
}

func (se *stepExecutor) cancelDueToError() {
	se.sawError.Store(true)
	se.cancel()
}

//
// The next few functions are responsible for executing individual steps. The basic flow of step
// execution is
//   1. The pre-step event is raised, if there are any attached callbacks to the engine
//   2. If successful, the step is executed (if not a preview)
//   3. The post-step event is raised, if there are any attached callbacks to the engine
//
// The pre-step event returns an interface{}, which is some arbitrary context that must be passed
// verbatim to the post-step event.
//

// executeStep executes a single step, returning true if the step execution was successful and
// false if it was not.
func (se *stepExecutor) executeStep(workerID int, step Step) error {
	var payload interface{}
	events := se.opts.Events
	if events != nil {
		var err error
		payload, err = events.OnResourceStepPre(step)
		if err != nil {
			se.log(workerID, "step %v on %v failed pre-resource step: %v", step.Op(), step.URN(), err)
			return errors.Wrap(err, "pre-step event returned an error")
		}
	}

	se.log(workerID, "applying step %v on %v (preview %v)", step.Op(), step.URN(), se.preview)
	status, stepComplete, err := step.Apply(se.preview)

	if err == nil {
		// If we have a state object, and this is a create or update, remember it, as we may need to update it later.
		if step.Logical() && step.New() != nil {
			if prior, has := se.pendingNews.Load(step.URN()); has {
				return errors.Errorf(
					"resource '%s' registered twice (%s and %s)", step.URN(), prior.(Step).Op(), step.Op())
			}

			se.pendingNews.Store(step.URN(), step)
		}
	}

	if events != nil {
		if postErr := events.OnResourceStepPost(payload, step, status, err); postErr != nil {
			se.log(workerID, "step %v on %v failed post-resource step: %v", step.Op(), step.URN(), postErr)
			return errors.Wrap(postErr, "post-step event returned an error")
		}
	}

	// Calling stepComplete allows steps that depend on this step to continue. OnResourceStepPost saved the results
	// of the step in the snapshot, so we are ready to go.
	if stepComplete != nil {
		se.log(workerID, "step %v on %v retired", step.Op(), step.URN())
		stepComplete()
	}

	if err != nil {
		se.log(workerID, "step %v on %v failed with an error: %v", step.Op(), step.URN(), err)
		return errStepApplyFailed
	}

	return nil
}

// log is a simple logging helper for the step executor.
func (se *stepExecutor) log(workerID int, msg string, args ...interface{}) {
	if logging.V(stepExecutorLogLevel) {
		message := fmt.Sprintf(msg, args...)
		logging.V(stepExecutorLogLevel).Infof("StepExecutor worker(%d): %s", workerID, message)
	}
}

//
// The step executor owns a number of goroutines that it considers to be "workers", responsible for
// executing steps. By default, as we ease into the waters of parallelism, there is at most one worker
// active.
//
// Workers continuously pull from se.incomingChains, executing chains as they are provided to the executor.
// There are two reasons why a worker would exit:
//
//  1. A worker exits if se.ctx is canceled. There are two ways that se.ctx gets canceled: first, if there is
//     a step error in another worker, it will cancel the context. Second, if the plan executor experiences an
//     error when generating steps or doing pre or post-step events, it will cancel the context.
//  2. A worker exits if it experiences an error when running a step.
//

// worker is the base function for all step executor worker goroutines. It continuously polls for new chains
// and executes any that it gets from the channel.
func (se *stepExecutor) worker(workerID int) {
	se.log(workerID, "worker coming online")
	defer se.workers.Done()

	for {
		se.log(workerID, "worker waiting for incoming chains")
		select {
		case chain := <-se.incomingChains:
			if chain == nil {
				se.log(workerID, "worker received nil chain, exiting")
				return
			}

			se.log(workerID, "worker received chain for execution")
			se.executeChain(workerID, chain)
		case <-se.ctx.Done():
			se.log(workerID, "worker exiting due to cancellation")
			return
		}
	}
}

func newStepExecutor(ctx context.Context, cancel context.CancelFunc, plan *Plan, opts Options,
	preview bool) *stepExecutor {
	exec := &stepExecutor{
		plan:           plan,
		opts:           opts,
		preview:        preview,
		incomingChains: make(chan Chain),
		ctx:            ctx,
		cancel:         cancel,
	}

	exec.sawError.Store(false)
	fanout := opts.DegreeOfParallelism()
	for i := 0; i < fanout; i++ {
		exec.workers.Add(1)
		go exec.worker(i)
	}

	return exec
}