mirror of https://github.com/pulumi/pulumi.git
246 lines
7.2 KiB
Go
246 lines
7.2 KiB
Go
// Copyright 2016-2022, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package engine
|
|
|
|
import (
|
|
"errors"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
|
|
"github.com/pulumi/pulumi/pkg/v3/secrets"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
|
|
)
|
|
|
|
var _ = SnapshotManager((*Journal)(nil))
|
|
|
|
type JournalEntryKind int
|
|
|
|
const (
|
|
JournalEntryBegin JournalEntryKind = 0
|
|
JournalEntrySuccess JournalEntryKind = 1
|
|
JournalEntryFailure JournalEntryKind = 2
|
|
JournalEntryOutputs JournalEntryKind = 4
|
|
)
|
|
|
|
type JournalEntry struct {
|
|
Kind JournalEntryKind
|
|
Step deploy.Step
|
|
}
|
|
|
|
type JournalEntries []JournalEntry
|
|
|
|
func (entries JournalEntries) Snap(base *deploy.Snapshot) (*deploy.Snapshot, error) {
|
|
// Build up a list of current resources by replaying the journal.
|
|
resources, dones := []*resource.State{}, make(map[*resource.State]bool)
|
|
ops, doneOps := []resource.Operation{}, make(map[*resource.State]bool)
|
|
for _, e := range entries {
|
|
logging.V(7).Infof("%v %v (%v)", e.Step.Op(), e.Step.URN(), e.Kind)
|
|
|
|
// Begin journal entries add pending operations to the snapshot. As we see success or failure
|
|
// entries, we'll record them in doneOps.
|
|
switch e.Kind {
|
|
case JournalEntryBegin:
|
|
switch e.Step.Op() {
|
|
case deploy.OpCreate, deploy.OpCreateReplacement:
|
|
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeCreating))
|
|
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
|
|
ops = append(ops, resource.NewOperation(e.Step.Old(), resource.OperationTypeDeleting))
|
|
case deploy.OpRead, deploy.OpReadReplacement:
|
|
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeReading))
|
|
case deploy.OpUpdate:
|
|
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeUpdating))
|
|
case deploy.OpImport, deploy.OpImportReplacement:
|
|
ops = append(ops, resource.NewOperation(e.Step.New(), resource.OperationTypeImporting))
|
|
}
|
|
case JournalEntryFailure, JournalEntrySuccess:
|
|
switch e.Step.Op() {
|
|
//nolint:lll
|
|
case deploy.OpCreate, deploy.OpCreateReplacement, deploy.OpRead, deploy.OpReadReplacement, deploy.OpUpdate,
|
|
deploy.OpImport, deploy.OpImportReplacement:
|
|
doneOps[e.Step.New()] = true
|
|
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
|
|
doneOps[e.Step.Old()] = true
|
|
}
|
|
case JournalEntryOutputs:
|
|
// We do nothing for outputs, since they don't affect the snapshot.
|
|
}
|
|
|
|
// Now mark resources done as necessary.
|
|
if e.Kind == JournalEntrySuccess {
|
|
switch e.Step.Op() {
|
|
case deploy.OpSame:
|
|
step, ok := e.Step.(*deploy.SameStep)
|
|
contract.Assertf(ok, "expected *deploy.SameStep, got %T", e.Step)
|
|
if !step.IsSkippedCreate() {
|
|
resources = append(resources, e.Step.New())
|
|
dones[e.Step.Old()] = true
|
|
}
|
|
case deploy.OpUpdate:
|
|
resources = append(resources, e.Step.New())
|
|
dones[e.Step.Old()] = true
|
|
case deploy.OpCreate, deploy.OpCreateReplacement:
|
|
resources = append(resources, e.Step.New())
|
|
if old := e.Step.Old(); old != nil && old.PendingReplacement {
|
|
dones[old] = true
|
|
}
|
|
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
|
|
if old := e.Step.Old(); !old.PendingReplacement {
|
|
dones[old] = true
|
|
}
|
|
case deploy.OpReplace:
|
|
// do nothing.
|
|
case deploy.OpRead, deploy.OpReadReplacement:
|
|
resources = append(resources, e.Step.New())
|
|
if e.Step.Old() != nil {
|
|
dones[e.Step.Old()] = true
|
|
}
|
|
case deploy.OpRemovePendingReplace:
|
|
dones[e.Step.Old()] = true
|
|
case deploy.OpImport, deploy.OpImportReplacement:
|
|
resources = append(resources, e.Step.New())
|
|
dones[e.Step.New()] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Append any resources from the base snapshot that were not produced by the current snapshot.
|
|
// See backend.SnapshotManager.snap for why this works.
|
|
if base != nil {
|
|
for _, res := range base.Resources {
|
|
if !dones[res] {
|
|
resources = append(resources, res)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Append any pending operations.
|
|
var operations []resource.Operation
|
|
for _, op := range ops {
|
|
if !doneOps[op.Resource] {
|
|
operations = append(operations, op)
|
|
}
|
|
}
|
|
|
|
if base != nil {
|
|
// Track pending create operations from the base snapshot
|
|
// and propagate them to the new snapshot: we don't want to clear pending CREATE operations
|
|
// because these must require user intervention to be cleared or resolved.
|
|
for _, pendingOperation := range base.PendingOperations {
|
|
if pendingOperation.Type == resource.OperationTypeCreating {
|
|
operations = append(operations, pendingOperation)
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we have a base snapshot, copy over its secrets manager and metadata.
|
|
var secretsManager secrets.Manager
|
|
var metadata deploy.SnapshotMetadata
|
|
if base != nil {
|
|
secretsManager = base.SecretsManager
|
|
metadata = base.Metadata
|
|
}
|
|
|
|
manifest := deploy.Manifest{}
|
|
manifest.Magic = manifest.NewMagic()
|
|
|
|
snap := deploy.NewSnapshot(manifest, secretsManager, resources, operations, metadata)
|
|
normSnap, err := snap.NormalizeURNReferences()
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
return normSnap, normSnap.VerifyIntegrity()
|
|
}
|
|
|
|
type Journal struct {
|
|
entries JournalEntries
|
|
events chan JournalEntry
|
|
cancel chan bool
|
|
done chan bool
|
|
}
|
|
|
|
func (j *Journal) Entries() JournalEntries {
|
|
<-j.done
|
|
|
|
return j.entries
|
|
}
|
|
|
|
func (j *Journal) Close() error {
|
|
close(j.cancel)
|
|
<-j.done
|
|
|
|
return nil
|
|
}
|
|
|
|
func (j *Journal) BeginMutation(step deploy.Step) (SnapshotMutation, error) {
|
|
select {
|
|
case j.events <- JournalEntry{Kind: JournalEntryBegin, Step: step}:
|
|
return j, nil
|
|
case <-j.cancel:
|
|
return nil, errors.New("journal closed")
|
|
}
|
|
}
|
|
|
|
func (j *Journal) End(step deploy.Step, success bool) error {
|
|
kind := JournalEntryFailure
|
|
if success {
|
|
kind = JournalEntrySuccess
|
|
}
|
|
select {
|
|
case j.events <- JournalEntry{Kind: kind, Step: step}:
|
|
return nil
|
|
case <-j.cancel:
|
|
return errors.New("journal closed")
|
|
}
|
|
}
|
|
|
|
func (j *Journal) RegisterResourceOutputs(step deploy.Step) error {
|
|
select {
|
|
case j.events <- JournalEntry{Kind: JournalEntryOutputs, Step: step}:
|
|
return nil
|
|
case <-j.cancel:
|
|
return errors.New("journal closed")
|
|
}
|
|
}
|
|
|
|
func (j *Journal) RecordPlugin(plugin workspace.PluginInfo) error {
|
|
return nil
|
|
}
|
|
|
|
func (j *Journal) Snap(base *deploy.Snapshot) (*deploy.Snapshot, error) {
|
|
return j.entries.Snap(base)
|
|
}
|
|
|
|
func NewJournal() *Journal {
|
|
j := &Journal{
|
|
events: make(chan JournalEntry),
|
|
cancel: make(chan bool),
|
|
done: make(chan bool),
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-j.cancel:
|
|
close(j.done)
|
|
return
|
|
case e := <-j.events:
|
|
j.entries = append(j.entries, e)
|
|
}
|
|
}
|
|
}()
|
|
return j
|
|
}
|