mirror of https://github.com/pulumi/pulumi.git
838 lines
32 KiB
Go
838 lines
32 KiB
Go
// Copyright 2016-2022, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package backend
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/engine"
|
|
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
|
|
"github.com/pulumi/pulumi/pkg/v3/secrets"
|
|
"github.com/pulumi/pulumi/pkg/v3/version"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/env"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
|
)
|
|
|
|
// DisableIntegrityChecking can be set to true to disable checkpoint state integrity verification. This is not
|
|
// recommended, because it could mean proceeding even in the face of a corrupted checkpoint state file, but can
|
|
// be used as a last resort when a command absolutely must be run.
|
|
var DisableIntegrityChecking bool
|
|
|
|
// SnapshotPersister is an interface implemented by our backends that implements snapshot
|
|
// persistence. In order to fit into our current model, snapshot persisters have two functions:
|
|
// saving snapshots and invalidating already-persisted snapshots.
|
|
type SnapshotPersister interface {
|
|
// Persists the given snapshot. Returns an error if the persistence failed.
|
|
Save(snapshot *deploy.Snapshot) error
|
|
}
|
|
|
|
// SnapshotManager is an implementation of engine.SnapshotManager that inspects steps and performs
|
|
// mutations on the global snapshot object serially. This implementation maintains two bits of state: the "base"
|
|
// snapshot, which is completely immutable and represents the state of the world prior to the application
|
|
// of the current plan, and a "new" list of resources, which consists of the resources that were operated upon
|
|
// by the current plan.
|
|
//
|
|
// Important to note is that, although this SnapshotManager is designed to be easily convertible into a thread-safe
|
|
// implementation, the code as it is today is *not thread safe*. In particular, it is not legal for there to be
|
|
// more than one `SnapshotMutation` active at any point in time. This is because this SnapshotManager invalidates
|
|
// the last persisted snapshot in `BeginSnapshot`. This is designed to match existing behavior and will not
|
|
// be the state of things going forward.
|
|
//
|
|
// The resources stored in the `resources` slice are pointers to resource objects allocated by the engine.
|
|
// This is subtle and a little confusing. The reason for this is that the engine directly mutates resource objects
|
|
// that it creates and expects those mutations to be persisted directly to the snapshot.
|
|
type SnapshotManager struct {
|
|
persister SnapshotPersister // The persister responsible for invalidating and persisting the snapshot
|
|
baseSnapshot *deploy.Snapshot // The base snapshot for this plan
|
|
secretsManager secrets.Manager // The default secrets manager to use
|
|
resources []*resource.State // The list of resources operated upon by this plan
|
|
operations []resource.Operation // The set of operations known to be outstanding in this plan
|
|
dones map[*resource.State]bool // The set of resources that have been operated upon already by this plan
|
|
completeOps map[*resource.State]bool // The set of resources that have completed their operation
|
|
mutationRequests chan<- mutationRequest // The queue of mutation requests, to be retired serially by the manager
|
|
cancel chan bool // A channel used to request cancellation of any new mutation requests.
|
|
done <-chan error // A channel that sends a single result when the manager has shut down.
|
|
}
|
|
|
|
var _ engine.SnapshotManager = (*SnapshotManager)(nil)
|
|
|
|
type mutationRequest struct {
|
|
mutator func() bool
|
|
result chan<- error
|
|
}
|
|
|
|
type TimeMonitor interface {
|
|
Start()
|
|
Stop()
|
|
StartChild()
|
|
StopChild()
|
|
GetTimes() (time.Duration, time.Duration, int)
|
|
}
|
|
|
|
type MonotonicTimeMonitor struct {
|
|
running bool
|
|
start time.Time
|
|
total time.Duration
|
|
childTotal time.Duration
|
|
childStart time.Time
|
|
childCount int
|
|
}
|
|
|
|
func (tm *MonotonicTimeMonitor) Start() {
|
|
contract.Assertf(!tm.running, "Starting a monitor that is already running is not permitted.")
|
|
tm.running = true
|
|
tm.start = time.Now()
|
|
tm.total = 0
|
|
tm.childTotal = 0
|
|
tm.childCount = 0
|
|
}
|
|
|
|
func (tm *MonotonicTimeMonitor) Stop() {
|
|
contract.Assertf(tm.running, "Stopping a monitor that is not running is not permitted.")
|
|
tm.total = time.Since(tm.start)
|
|
tm.running = false
|
|
}
|
|
|
|
func (tm *MonotonicTimeMonitor) StartChild() {
|
|
tm.childStart = time.Now()
|
|
}
|
|
|
|
func (tm *MonotonicTimeMonitor) StopChild() {
|
|
tm.childCount++
|
|
tm.childTotal += time.Since(tm.childStart)
|
|
}
|
|
|
|
func (tm *MonotonicTimeMonitor) GetTimes() (total, partial time.Duration, count int) {
|
|
return tm.total, tm.childTotal, tm.childCount
|
|
}
|
|
|
|
func (sm *SnapshotManager) Close() error {
|
|
close(sm.cancel)
|
|
return <-sm.done
|
|
}
|
|
|
|
// If you need to understand what's going on in this file, start here!
|
|
//
|
|
// mutate is the serialization point for reads and writes of the global snapshot state.
|
|
// The given function will be, at the time of its invocation, the only function allowed to
|
|
// mutate state within the SnapshotManager.
|
|
//
|
|
// Serialization is performed by pushing the mutator function onto a channel, where another
|
|
// goroutine is polling the channel and executing the mutation functions as they come.
|
|
// This function optionally verifies the integrity of the snapshot before and after mutation.
|
|
//
|
|
// The mutator may indicate that its corresponding checkpoint write may be safely elided by
|
|
// returning `false`. As of this writing, we only elide writes after same steps with no
|
|
// meaningful changes (see sameSnapshotMutation.mustWrite for details). Any elided writes
|
|
// are flushed by the next non-elided write or the next call to Close.
|
|
//
|
|
// You should never observe or mutate the global snapshot without using this function unless
|
|
// you have a very good justification.
|
|
func (sm *SnapshotManager) mutate(mutator func() bool) error {
|
|
result := make(chan error)
|
|
select {
|
|
case sm.mutationRequests <- mutationRequest{mutator: mutator, result: result}:
|
|
return <-result
|
|
case <-sm.cancel:
|
|
return errors.New("snapshot manager closed")
|
|
}
|
|
}
|
|
|
|
// RegisterResourceOutputs handles the registering of outputs on a Step that has already
|
|
// completed. This is accomplished by doing an in-place mutation of the resources currently
|
|
// resident in the snapshot.
|
|
//
|
|
// Due to the way this is currently implemented, the engine directly mutates output properties
|
|
// on the resource State object that it created. Since we are storing pointers to these objects
|
|
// in the `resources` slice, we need only to do a no-op mutation in order to flush these new
|
|
// mutations to disk.
|
|
//
|
|
// Note that this is completely not thread-safe and defeats the purpose of having a `mutate` callback
|
|
// entirely, but the hope is that this state of things will not be permament.
|
|
func (sm *SnapshotManager) RegisterResourceOutputs(step deploy.Step) error {
|
|
return sm.mutate(func() bool { return true })
|
|
}
|
|
|
|
// BeginMutation signals to the SnapshotManager that the engine intends to mutate the global snapshot
|
|
// by performing the given Step. This function gives the SnapshotManager a chance to record the
|
|
// intent to mutate before the mutation occurs.
|
|
func (sm *SnapshotManager) BeginMutation(step deploy.Step) (engine.SnapshotMutation, error) {
|
|
contract.Requiref(step != nil, "step", "cannot be nil")
|
|
logging.V(9).Infof("SnapshotManager: Beginning mutation for step `%s` on resource `%s`", step.Op(), step.URN())
|
|
|
|
switch step.Op() {
|
|
case deploy.OpSame:
|
|
return &sameSnapshotMutation{sm}, nil
|
|
case deploy.OpCreate, deploy.OpCreateReplacement:
|
|
return sm.doCreate(step)
|
|
case deploy.OpUpdate:
|
|
return sm.doUpdate(step)
|
|
case deploy.OpDelete, deploy.OpDeleteReplaced, deploy.OpReadDiscard, deploy.OpDiscardReplaced:
|
|
return sm.doDelete(step)
|
|
case deploy.OpReplace:
|
|
return &replaceSnapshotMutation{sm}, nil
|
|
case deploy.OpRead, deploy.OpReadReplacement:
|
|
return sm.doRead(step)
|
|
case deploy.OpRefresh:
|
|
return &refreshSnapshotMutation{sm}, nil
|
|
case deploy.OpRemovePendingReplace:
|
|
return &removePendingReplaceSnapshotMutation{sm}, nil
|
|
case deploy.OpImport, deploy.OpImportReplacement:
|
|
return sm.doImport(step)
|
|
}
|
|
|
|
contract.Failf("unknown StepOp: %s", step.Op())
|
|
return nil, nil
|
|
}
|
|
|
|
// All SnapshotMutation implementations in this file follow the same basic formula:
|
|
// mark the "old" state as done and mark the "new" state as new. The two special
|
|
// cases are Create (where the "old" state does not exist) and Delete (where the "new" state
|
|
// does not exist).
|
|
//
|
|
// Marking a resource state as old prevents it from being persisted to the snapshot in
|
|
// the `snap` function. Marking a resource state as new /enables/ it to be persisted to
|
|
// the snapshot in `snap`. See the comments in `snap` for more details.
|
|
|
|
type sameSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
// mustWrite returns true if any semantically meaningful difference exists between the old and new states of a same
|
|
// step that forces us to write the checkpoint. If no such difference exists, the checkpoint write that corresponds to
|
|
// this step can be elided.
|
|
func (ssm *sameSnapshotMutation) mustWrite(step *deploy.SameStep) bool {
|
|
old := step.Old()
|
|
new := step.New()
|
|
|
|
contract.Assertf(old.Delete == new.Delete,
|
|
"either both or neither resource must be pending deletion, got %v (old) != %v (new)",
|
|
old.Delete, new.Delete)
|
|
contract.Assertf(old.External == new.External,
|
|
"either both or neither resource must be external, got %v (old) != %v (new)",
|
|
old.External, new.External)
|
|
contract.Assertf(!step.IsSkippedCreate(), "create cannot be skipped for step")
|
|
|
|
// If the URN of this resource has changed, we must write the checkpoint. This should only be possible when a
|
|
// resource is aliased.
|
|
if old.URN != new.URN {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of URN")
|
|
return true
|
|
}
|
|
|
|
// If the type of this resource has changed, we must write the checkpoint. This should only be possible when a
|
|
// resource is aliased.
|
|
if old.Type != new.Type {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Type")
|
|
return true
|
|
}
|
|
|
|
// If the kind of this resource has changed, we must write the checkpoint.
|
|
if old.Custom != new.Custom {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Custom")
|
|
return true
|
|
}
|
|
|
|
// We need to persist the changes if CustomTimes have changed
|
|
if old.CustomTimeouts != new.CustomTimeouts {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of CustomTimeouts")
|
|
return true
|
|
}
|
|
|
|
// We need to persist the changes if CustomTimes have changed
|
|
if old.RetainOnDelete != new.RetainOnDelete {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of RetainOnDelete")
|
|
return true
|
|
}
|
|
|
|
contract.Assertf(old.ID == new.ID,
|
|
"old and new resource IDs must be equal, got %v (old) != %v (new)", old.ID, new.ID)
|
|
|
|
// If this resource's provider has changed, we must write the checkpoint. This can happen in scenarios involving
|
|
// aliased providers or upgrades to default providers.
|
|
if old.Provider != new.Provider {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Provider")
|
|
return true
|
|
}
|
|
|
|
// If this resource's parent has changed, we must write the checkpoint.
|
|
if old.Parent != new.Parent {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Parent")
|
|
return true
|
|
}
|
|
|
|
// If the DeletedWith attribute of this resource has changed, we must write the checkpoint.
|
|
if old.DeletedWith != new.DeletedWith {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of DeletedWith")
|
|
return true
|
|
}
|
|
|
|
// If the protection attribute of this resource has changed, we must write the checkpoint.
|
|
if old.Protect != new.Protect {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Protect")
|
|
return true
|
|
}
|
|
|
|
// If the source position of this resource has changed, we must write the checkpoint.
|
|
if old.SourcePosition != new.SourcePosition {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of SourcePosition")
|
|
return true
|
|
}
|
|
|
|
// If the inputs or outputs of this resource have changed, we must write the checkpoint. Note that it is possible
|
|
// for the inputs of a "same" resource to have changed even if the contents of the input bags are different if the
|
|
// resource's provider deems the physical change to be semantically irrelevant.
|
|
if !old.Inputs.DeepEquals(new.Inputs) {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Inputs")
|
|
return true
|
|
}
|
|
if !old.Outputs.DeepEquals(new.Outputs) {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Outputs")
|
|
return true
|
|
}
|
|
|
|
// Sort dependencies before comparing them. If the dependencies have changed, we must write the checkpoint.
|
|
sortDeps := func(deps []resource.URN) {
|
|
sort.Slice(deps, func(i, j int) bool { return deps[i] < deps[j] })
|
|
}
|
|
sortDeps(old.Dependencies)
|
|
sortDeps(new.Dependencies)
|
|
// reflect.DeepEqual does not treat `nil` and `[]URN{}` as equal, so we must check for both
|
|
// lists being empty ourselves.
|
|
if len(old.Dependencies) != 0 || len(new.Dependencies) != 0 {
|
|
if !reflect.DeepEqual(old.Dependencies, new.Dependencies) {
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() true because of Dependencies")
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Init errors are strictly advisory, so we do not consider them when deciding whether or not to write the
|
|
// checkpoint.
|
|
|
|
logging.V(9).Infof("SnapshotManager: mustWrite() false")
|
|
return false
|
|
}
|
|
|
|
func (ssm *sameSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
contract.Requiref(step.Op() == deploy.OpSame, "step.Op()", "must be %q, got %q", deploy.OpSame, step.Op())
|
|
logging.V(9).Infof("SnapshotManager: sameSnapshotMutation.End(..., %v)", successful)
|
|
return ssm.manager.mutate(func() bool {
|
|
sameStep := step.(*deploy.SameStep)
|
|
|
|
ssm.manager.markOperationComplete(step.New())
|
|
if successful {
|
|
ssm.manager.markDone(step.Old())
|
|
|
|
// In the case of a 'resource create' in a program that wasn't specified by the user in the
|
|
// --target list, we *never* want to write this to the checkpoint. We treat it as if it
|
|
// doesn't exist at all. That way when the program runs the next time, we'll actually
|
|
// create it.
|
|
if sameStep.IsSkippedCreate() {
|
|
return false
|
|
}
|
|
|
|
ssm.manager.markNew(step.New())
|
|
|
|
// Note that "Same" steps only consider input and provider diffs, so it is possible to see a same step for a
|
|
// resource with new dependencies, outputs, parent, protection. etc.
|
|
//
|
|
// As such, we diff all of the non-input properties of the resource here and write the snapshot if we find any
|
|
// changes.
|
|
if !ssm.mustWrite(sameStep) {
|
|
logging.V(9).Infof("SnapshotManager: sameSnapshotMutation.End() eliding write")
|
|
return false
|
|
}
|
|
}
|
|
|
|
logging.V(9).Infof("SnapshotManager: sameSnapshotMutation.End() not eliding write")
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (sm *SnapshotManager) doCreate(step deploy.Step) (engine.SnapshotMutation, error) {
|
|
logging.V(9).Infof("SnapshotManager.doCreate(%s)", step.URN())
|
|
err := sm.mutate(func() bool {
|
|
sm.markOperationPending(step.New(), resource.OperationTypeCreating)
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &createSnapshotMutation{sm}, nil
|
|
}
|
|
|
|
type createSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (csm *createSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
logging.V(9).Infof("SnapshotManager: createSnapshotMutation.End(..., %v)", successful)
|
|
return csm.manager.mutate(func() bool {
|
|
csm.manager.markOperationComplete(step.New())
|
|
if successful {
|
|
// There is some very subtle behind-the-scenes magic here that
|
|
// comes into play whenever this create is a CreateReplacement.
|
|
//
|
|
// Despite intending for the base snapshot to be immutable, the engine
|
|
// does in fact mutate it by setting a `Delete` flag on resources
|
|
// being replaced as part of a Create-Before-Delete replacement sequence.
|
|
// Since we are storing the base snapshot and all resources by reference
|
|
// (we have pointers to engine-allocated objects), this transparently
|
|
// "just works" for the SnapshotManager.
|
|
csm.manager.markNew(step.New())
|
|
|
|
// If we had an old state that was marked as pending-replacement, mark its replacement as complete such
|
|
// that it is flushed from the state file.
|
|
if old := step.Old(); old != nil && old.PendingReplacement {
|
|
csm.manager.markDone(old)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (sm *SnapshotManager) doUpdate(step deploy.Step) (engine.SnapshotMutation, error) {
|
|
logging.V(9).Infof("SnapshotManager.doUpdate(%s)", step.URN())
|
|
err := sm.mutate(func() bool {
|
|
sm.markOperationPending(step.New(), resource.OperationTypeUpdating)
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &updateSnapshotMutation{sm}, nil
|
|
}
|
|
|
|
type updateSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (usm *updateSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
logging.V(9).Infof("SnapshotManager: updateSnapshotMutation.End(..., %v)", successful)
|
|
return usm.manager.mutate(func() bool {
|
|
usm.manager.markOperationComplete(step.New())
|
|
if successful {
|
|
usm.manager.markDone(step.Old())
|
|
usm.manager.markNew(step.New())
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (sm *SnapshotManager) doDelete(step deploy.Step) (engine.SnapshotMutation, error) {
|
|
logging.V(9).Infof("SnapshotManager.doDelete(%s)", step.URN())
|
|
err := sm.mutate(func() bool {
|
|
sm.markOperationPending(step.Old(), resource.OperationTypeDeleting)
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &deleteSnapshotMutation{sm}, nil
|
|
}
|
|
|
|
type deleteSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (dsm *deleteSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
logging.V(9).Infof("SnapshotManager: deleteSnapshotMutation.End(..., %v)", successful)
|
|
return dsm.manager.mutate(func() bool {
|
|
dsm.manager.markOperationComplete(step.Old())
|
|
if successful {
|
|
contract.Assertf(
|
|
!step.Old().Protect ||
|
|
step.Op() == deploy.OpDiscardReplaced ||
|
|
step.Op() == deploy.OpDeleteReplaced,
|
|
"Old must be unprotected (got %v) or the operation must be a replace (got %q)",
|
|
step.Old().Protect, step.Op())
|
|
|
|
if !step.Old().PendingReplacement {
|
|
dsm.manager.markDone(step.Old())
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
type replaceSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (rsm *replaceSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
logging.V(9).Infof("SnapshotManager: replaceSnapshotMutation.End(..., %v)", successful)
|
|
return nil
|
|
}
|
|
|
|
func (sm *SnapshotManager) doRead(step deploy.Step) (engine.SnapshotMutation, error) {
|
|
logging.V(9).Infof("SnapshotManager.doRead(%s)", step.URN())
|
|
err := sm.mutate(func() bool {
|
|
sm.markOperationPending(step.New(), resource.OperationTypeReading)
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &readSnapshotMutation{sm}, nil
|
|
}
|
|
|
|
type readSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (rsm *readSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
logging.V(9).Infof("SnapshotManager: readSnapshotMutation.End(..., %v)", successful)
|
|
return rsm.manager.mutate(func() bool {
|
|
rsm.manager.markOperationComplete(step.New())
|
|
if successful {
|
|
if step.Old() != nil {
|
|
rsm.manager.markDone(step.Old())
|
|
}
|
|
|
|
rsm.manager.markNew(step.New())
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
type refreshSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (rsm *refreshSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
contract.Requiref(step.Op() == deploy.OpRefresh, "step.Op", "must be %q, got %q", deploy.OpRefresh, step.Op())
|
|
logging.V(9).Infof("SnapshotManager: refreshSnapshotMutation.End(..., %v)", successful)
|
|
return rsm.manager.mutate(func() bool {
|
|
// We always elide refreshes. The expectation is that all of these run before any actual mutations and that
|
|
// some other component will rewrite the base snapshot in-memory, so there's no action the snapshot
|
|
// manager needs to take other than to remember that the base snapshot--and therefore the actual snapshot--may
|
|
// have changed.
|
|
return false
|
|
})
|
|
}
|
|
|
|
type removePendingReplaceSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (rsm *removePendingReplaceSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
contract.Requiref(step.Op() == deploy.OpRemovePendingReplace, "step.Op",
|
|
"must be %q, got %q", deploy.OpRemovePendingReplace, step.Op())
|
|
return rsm.manager.mutate(func() bool {
|
|
res := step.Old()
|
|
contract.Assertf(res.PendingReplacement, "resource %q must be pending replacement", res.URN)
|
|
rsm.manager.markDone(res)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (sm *SnapshotManager) doImport(step deploy.Step) (engine.SnapshotMutation, error) {
|
|
logging.V(9).Infof("SnapshotManager.doImport(%s)", step.URN())
|
|
err := sm.mutate(func() bool {
|
|
sm.markOperationPending(step.New(), resource.OperationTypeImporting)
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &importSnapshotMutation{sm}, nil
|
|
}
|
|
|
|
type importSnapshotMutation struct {
|
|
manager *SnapshotManager
|
|
}
|
|
|
|
func (ism *importSnapshotMutation) End(step deploy.Step, successful bool) error {
|
|
contract.Requiref(step != nil, "step", "must not be nil")
|
|
contract.Requiref(step.Op() == deploy.OpImport || step.Op() == deploy.OpImportReplacement, "step.Op",
|
|
"must be %q or %q, got %q", deploy.OpImport, deploy.OpImportReplacement, step.Op())
|
|
|
|
return ism.manager.mutate(func() bool {
|
|
ism.manager.markOperationComplete(step.New())
|
|
if successful {
|
|
ism.manager.markNew(step.New())
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// markDone marks a resource as having been processed. Resources that have been marked
|
|
// in this manner won't be persisted in the snapshot.
|
|
func (sm *SnapshotManager) markDone(state *resource.State) {
|
|
contract.Requiref(state != nil, "state", "must not be nil")
|
|
sm.dones[state] = true
|
|
logging.V(9).Infof("Marked old state snapshot as done: %v", state.URN)
|
|
}
|
|
|
|
// markNew marks a resource as existing in the new snapshot. This occurs on
|
|
// successful non-deletion operations where the given state is the new state
|
|
// of a resource that will be persisted to the snapshot.
|
|
func (sm *SnapshotManager) markNew(state *resource.State) {
|
|
contract.Requiref(state != nil, "state", "must not be nil")
|
|
sm.resources = append(sm.resources, state)
|
|
logging.V(9).Infof("Appended new state snapshot to be written: %v", state.URN)
|
|
}
|
|
|
|
// markOperationPending marks a resource as undergoing an operation that will now be considered pending.
|
|
func (sm *SnapshotManager) markOperationPending(state *resource.State, op resource.OperationType) {
|
|
contract.Requiref(state != nil, "state", "must not be nil")
|
|
sm.operations = append(sm.operations, resource.NewOperation(state, op))
|
|
logging.V(9).Infof("SnapshotManager.markPendingOperation(%s, %s)", state.URN, string(op))
|
|
}
|
|
|
|
// markOperationComplete marks a resource as having completed the operation that it previously was performing.
|
|
func (sm *SnapshotManager) markOperationComplete(state *resource.State) {
|
|
contract.Requiref(state != nil, "state", "must not be nil")
|
|
sm.completeOps[state] = true
|
|
logging.V(9).Infof("SnapshotManager.markOperationComplete(%s)", state.URN)
|
|
}
|
|
|
|
// snap produces a new Snapshot given the base snapshot and a list of resources that the current
|
|
// plan has created.
|
|
func (sm *SnapshotManager) snap() *deploy.Snapshot {
|
|
// At this point we have two resource DAGs. One of these is the base DAG for this plan; the other is the current DAG
|
|
// for this plan. Any resource r may be present in both DAGs. In order to produce a snapshot, we need to merge these
|
|
// DAGs such that all resource dependencies are correctly preserved. Conceptually, the merge proceeds as follows:
|
|
//
|
|
// - Begin with an empty merged DAG.
|
|
// - For each resource r in the current DAG, insert r and its outgoing edges into the merged DAG.
|
|
// - For each resource r in the base DAG:
|
|
// - If r is in the merged DAG, we are done: if the resource is in the merged DAG, it must have been in the
|
|
// current DAG, which accurately captures its current dependencies.
|
|
// - If r is not in the merged DAG, insert it and its outgoing edges into the merged DAG.
|
|
//
|
|
// Physically, however, each DAG is represented as list of resources without explicit dependency edges. In place of
|
|
// edges, it is assumed that the list represents a valid topological sort of its source DAG. Thus, any resource r at
|
|
// index i in a list L must be assumed to be dependent on all resources in L with index j s.t. j < i. Due to this
|
|
// representation, we implement the algorithm above as follows to produce a merged list that represents a valid
|
|
// topological sort of the merged DAG:
|
|
//
|
|
// - Begin with an empty merged list.
|
|
// - For each resource r in the current list, append r to the merged list. r must be in a correct location in the
|
|
// merged list, as its position relative to its assumed dependencies has not changed.
|
|
// - For each resource r in the base list:
|
|
// - If r is in the merged list, we are done by the logic given in the original algorithm.
|
|
// - If r is not in the merged list, append r to the merged list. r must be in a correct location in the merged
|
|
// list:
|
|
// - If any of r's dependencies were in the current list, they must already be in the merged list and their
|
|
// relative order w.r.t. r has not changed.
|
|
// - If any of r's dependencies were not in the current list, they must already be in the merged list, as
|
|
// they would have been appended to the list before r.
|
|
|
|
// Start with a copy of the resources produced during the evaluation of the current plan.
|
|
resources := make([]*resource.State, len(sm.resources))
|
|
copy(resources, sm.resources)
|
|
|
|
// Append any resources from the base plan that were not produced by the current plan.
|
|
if base := sm.baseSnapshot; base != nil {
|
|
for _, res := range base.Resources {
|
|
if !sm.dones[res] {
|
|
resources = append(resources, res)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Record any pending operations, if there are any outstanding that have not completed yet.
|
|
var operations []resource.Operation
|
|
for _, op := range sm.operations {
|
|
if !sm.completeOps[op.Resource] {
|
|
operations = append(operations, op)
|
|
}
|
|
}
|
|
|
|
// Track pending create operations from the base snapshot
|
|
// and propagate them to the new snapshot: we don't want to clear pending CREATE operations
|
|
// because these must require user intervention to be cleared or resolved.
|
|
if base := sm.baseSnapshot; base != nil {
|
|
for _, pendingOperation := range base.PendingOperations {
|
|
if pendingOperation.Type == resource.OperationTypeCreating {
|
|
operations = append(operations, pendingOperation)
|
|
}
|
|
}
|
|
}
|
|
|
|
manifest := deploy.Manifest{
|
|
Time: time.Now(),
|
|
Version: version.Version,
|
|
// Plugins: sm.plugins, - Explicitly dropped, since we don't use the plugin list in the manifest anymore.
|
|
}
|
|
|
|
// The backend.SnapshotManager and backend.SnapshotPersister will keep track of any changes to
|
|
// the Snapshot (checkpoint file) in the HTTP backend. We will reuse the snapshot's secrets manager when possible
|
|
// to ensure that secrets are not re-encrypted on each update.
|
|
secretsManager := sm.secretsManager
|
|
if sm.baseSnapshot != nil && secrets.AreCompatible(secretsManager, sm.baseSnapshot.SecretsManager) {
|
|
secretsManager = sm.baseSnapshot.SecretsManager
|
|
}
|
|
|
|
manifest.Magic = manifest.NewMagic()
|
|
return deploy.NewSnapshot(manifest, secretsManager, resources, operations)
|
|
}
|
|
|
|
// saveSnapshot persists the current snapshot and optionally verifies it afterwards.
|
|
func (sm *SnapshotManager) saveSnapshot() error {
|
|
snap, err := sm.snap().NormalizeURNReferences()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to normalize URN references: %w", err)
|
|
}
|
|
if err := sm.persister.Save(snap); err != nil {
|
|
return fmt.Errorf("failed to save snapshot: %w", err)
|
|
}
|
|
if err := snap.VerifyIntegrity(); err != nil {
|
|
return fmt.Errorf("failed to verify snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// defaultServiceLoop saves a Snapshot whenever a mutation occurs
|
|
func (sm *SnapshotManager) defaultServiceLoop(mutationRequests chan mutationRequest, done chan error, tm TimeMonitor) {
|
|
// True if we have elided writes since the last actual write.
|
|
hasElidedWrites := false
|
|
tm.Start()
|
|
|
|
// Service each mutation request in turn.
|
|
serviceLoop:
|
|
for {
|
|
select {
|
|
case request := <-mutationRequests:
|
|
var err error
|
|
if request.mutator() {
|
|
tm.StartChild()
|
|
defer tm.StopChild()
|
|
|
|
err = sm.saveSnapshot()
|
|
hasElidedWrites = false
|
|
} else {
|
|
hasElidedWrites = true
|
|
}
|
|
request.result <- err
|
|
case <-sm.cancel:
|
|
break serviceLoop
|
|
}
|
|
}
|
|
|
|
tm.Stop()
|
|
|
|
totalDuration, childDuration, childCount := tm.GetTimes()
|
|
snapshotPercent := 100 * childDuration / totalDuration
|
|
logging.V(9).Infof("Saving %d checkpoints took %d%% of the total time (%s of %s)",
|
|
childCount,
|
|
snapshotPercent,
|
|
childDuration.String(),
|
|
totalDuration.String())
|
|
|
|
// TODO (proberts) Determine the correct percentage at which to warn
|
|
if totalDuration > (30*time.Second) && snapshotPercent >= 50 {
|
|
cmdutil.Diag().Warningf(diag.Message("", "Saving checkpoints took %d%% "+
|
|
"of the total time. You can mitigate this by setting the config option "+
|
|
"\"pulumi:disable-checkpoints\" to \"true\". See "+
|
|
"<<TODO(proberts):permalink>> for more information."),
|
|
snapshotPercent)
|
|
}
|
|
|
|
// If we still have elided writes once the channel has closed, flush the snapshot.
|
|
var err error
|
|
if hasElidedWrites {
|
|
logging.V(9).Infof("SnapshotManager: flushing elided writes...")
|
|
err = sm.saveSnapshot()
|
|
}
|
|
done <- err
|
|
}
|
|
|
|
// unsafeServiceLoop doesn't save Snapshots when mutations occur and instead saves Snapshots when
|
|
// SnapshotManager.Close() is invoked. It trades reliability for speed as every mutation does not
|
|
// cause a Snapshot to be serialized to the user's state backend.
|
|
func (sm *SnapshotManager) unsafeServiceLoop(mutationRequests chan mutationRequest, done chan error, _ TimeMonitor) {
|
|
for {
|
|
select {
|
|
case request := <-mutationRequests:
|
|
request.mutator()
|
|
request.result <- nil
|
|
case <-sm.cancel:
|
|
done <- sm.saveSnapshot()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewSnapshotManager creates a new SnapshotManager for the given stack name, using the given persister, default secrets
|
|
// manager and base snapshot.
|
|
//
|
|
// It is *very important* that the baseSnap pointer refers to the same Snapshot given to the engine! The engine will
|
|
// mutate this object and correctness of the SnapshotManager depends on being able to observe this mutation. (This is
|
|
// not ideal...)
|
|
func NewSnapshotManager(
|
|
persister SnapshotPersister,
|
|
secretsManager secrets.Manager,
|
|
baseSnap *deploy.Snapshot,
|
|
stackConfig config.Map,
|
|
timeMonitor TimeMonitor,
|
|
) *SnapshotManager {
|
|
mutationRequests, cancel, done := make(chan mutationRequest), make(chan bool), make(chan error)
|
|
|
|
manager := &SnapshotManager{
|
|
persister: persister,
|
|
secretsManager: secretsManager,
|
|
baseSnapshot: baseSnap,
|
|
dones: make(map[*resource.State]bool),
|
|
completeOps: make(map[*resource.State]bool),
|
|
mutationRequests: mutationRequests,
|
|
cancel: cancel,
|
|
done: done,
|
|
}
|
|
|
|
serviceLoop := manager.defaultServiceLoop
|
|
|
|
if env.SkipCheckpoints.Value() {
|
|
// If the environment variable PULUMI_SKIP_CHECKPOINTS is true (and experiments are enabled),
|
|
// then use the unsafe service loop (which doesn't save checkpoints)
|
|
serviceLoop = manager.unsafeServiceLoop
|
|
} else if value, ok := stackConfig[config.MustMakeKey("pulumi", "disable-checkpoints")]; ok {
|
|
disableCheckpointsValue, err := value.Value(config.NopDecrypter)
|
|
contract.AssertNoErrorf(err, "error fetching config value for pulumi:disable-checkpoints")
|
|
|
|
if cmdutil.IsTruthy(disableCheckpointsValue) {
|
|
// If the configuration value pulumi:disable-checkpoints is
|
|
// explicitly true, then use the unsafe service loop
|
|
serviceLoop = manager.unsafeServiceLoop
|
|
}
|
|
}
|
|
|
|
go serviceLoop(mutationRequests, done, timeMonitor)
|
|
|
|
return manager
|
|
}
|