// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
	"errors"

	"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
	"github.com/pulumi/pulumi/pkg/v3/secrets"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
	"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)

var _ = SnapshotManager((*Journal)(nil))

type JournalEntryKind int

const (
	JournalEntryBegin   JournalEntryKind = 0
	JournalEntrySuccess JournalEntryKind = 1
	JournalEntryFailure JournalEntryKind = 2
	JournalEntryOutputs JournalEntryKind = 4
)

type JournalEntry struct {
	Kind JournalEntryKind
	Step deploy.Step
}

type JournalEntries []JournalEntry

func (entries JournalEntries) Snap(base *deploy.Snapshot) (*deploy.Snapshot, error) {
	// Build up a list of current resources by replaying the journal.
	resources, dones := []*resource.State{}, make(map[*resource.State]bool)
	ops, doneOps := []resource.Operation{}, make(map[*resource.State]bool)
	for _, e := range entries {
		logging.V(7).Infof("%v %v (%v)", e.Step.Op(), e.Step.URN(), e.Kind)

		// Begin journal entries add pending operations to the snapshot. As we see success or failure
		// entries, we'll record them in doneOps.
		switch e.Kind {
		case JournalEntryBegin:
			switch e.Step.Op() {
			case deploy.OpCreate, deploy.OpCreateReplacement:
				ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeCreating))
			case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
				ops = append(ops, resource.NewOperation(e.Step.Old(), resource.OperationTypeDeleting))
			case deploy.OpRead, deploy.OpReadReplacement:
				ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeReading))
			case deploy.OpUpdate:
				ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeUpdating))
			case deploy.OpImport, deploy.OpImportReplacement:
				ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeImporting))
			}
		case JournalEntryFailure, JournalEntrySuccess:
			switch e.Step.Op() {
			//nolint:lll
			case deploy.OpCreate, deploy.OpCreateReplacement, deploy.OpRead, deploy.OpReadReplacement, deploy.OpUpdate,
				deploy.OpImport, deploy.OpImportReplacement:
				doneOps[e.Step.New()] = true
			case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
				doneOps[e.Step.Old()] = true
			}
		case JournalEntryOutputs:
			// We do nothing for outputs, since they don't affect the snapshot.
		}

		// Now mark resources done as necessary.
		if e.Kind == JournalEntrySuccess {
			switch e.Step.Op() {
			case deploy.OpSame:
				step, ok := e.Step.(*deploy.SameStep)
				contract.Assertf(ok, "expected *deploy.SameStep, got %T", e.Step)
				if !step.IsSkippedCreate() {
					resources = append(resources, e.Step.New())
					dones[e.Step.Old()] = true
				}
			case deploy.OpUpdate:
				resources = append(resources, e.Step.New())
				dones[e.Step.Old()] = true
			case deploy.OpCreate, deploy.OpCreateReplacement:
				resources = append(resources, e.Step.New())
				if old := e.Step.Old(); old != nil && old.PendingReplacement {
					dones[old] = true
				}
			case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
				if old := e.Step.Old(); !old.PendingReplacement {
					dones[old] = true
				}
			case deploy.OpReplace:
				// do nothing.
			case deploy.OpRead, deploy.OpReadReplacement:
				resources = append(resources, e.Step.New())
				if e.Step.Old() != nil {
					dones[e.Step.Old()] = true
				}
			case deploy.OpRemovePendingReplace:
				dones[e.Step.Old()] = true
			case deploy.OpImport, deploy.OpImportReplacement:
				resources = append(resources, e.Step.New())
				dones[e.Step.New()] = true
			}
		}
	}

	// Append any resources from the base snapshot that were not produced by the current snapshot.
	// See backend.SnapshotManager.snap for why this works.
	if base != nil {
		for _, res := range base.Resources {
			if !dones[res] {
				resources = append(resources, res)
			}
		}
	}

	// Append any pending operations.
	var operations []resource.Operation
	for _, op := range ops {
		if !doneOps[op.Resource] {
			operations = append(operations, op)
		}
	}

	if base != nil {
		// Track pending create operations from the base snapshot
		// and propagate them to the new snapshot: we don't want to clear pending CREATE operations
		// because these must require user intervention to be cleared or resolved.
		for _, pendingOperation := range base.PendingOperations {
			if pendingOperation.Type == resource.OperationTypeCreating {
				operations = append(operations, pendingOperation)
			}
		}
	}

	// If we have a base snapshot, copy over its secrets manager.
	var secretsManager secrets.Manager
	if base != nil {
		secretsManager = base.SecretsManager
	}

	manifest := deploy.Manifest{}
	manifest.Magic = manifest.NewMagic()

	snap := deploy.NewSnapshot(manifest, secretsManager, resources, operations)
	normSnap, err := snap.NormalizeURNReferences()
	if err != nil {
		return snap, err
	}
	return normSnap, normSnap.VerifyIntegrity()
}

type Journal struct {
	entries JournalEntries
	events  chan JournalEntry
	cancel  chan bool
	done    chan bool
}

func (j *Journal) Entries() JournalEntries {
	<-j.done

	return j.entries
}

func (j *Journal) Close() error {
	close(j.cancel)
	<-j.done

	return nil
}

func (j *Journal) BeginMutation(step deploy.Step) (SnapshotMutation, error) {
	select {
	case j.events <- JournalEntry{Kind: JournalEntryBegin, Step: step}:
		return j, nil
	case <-j.cancel:
		return nil, errors.New("journal closed")
	}
}

func (j *Journal) End(step deploy.Step, success bool) error {
	kind := JournalEntryFailure
	if success {
		kind = JournalEntrySuccess
	}
	select {
	case j.events <- JournalEntry{Kind: kind, Step: step}:
		return nil
	case <-j.cancel:
		return errors.New("journal closed")
	}
}

func (j *Journal) RegisterResourceOutputs(step deploy.Step) error {
	select {
	case j.events <- JournalEntry{Kind: JournalEntryOutputs, Step: step}:
		return nil
	case <-j.cancel:
		return errors.New("journal closed")
	}
}

func (j *Journal) RecordPlugin(plugin workspace.PluginInfo) error {
	return nil
}

func (j *Journal) Snap(base *deploy.Snapshot) (*deploy.Snapshot, error) {
	return j.entries.Snap(base)
}

func NewJournal() *Journal {
	j := &Journal{
		events: make(chan JournalEntry),
		cancel: make(chan bool),
		done:   make(chan bool),
	}
	go func() {
		for {
			select {
			case <-j.cancel:
				close(j.done)
				return
			case e := <-j.events:
				j.entries = append(j.entries, e)
			}
		}
	}()
	return j
}