pulumi/pkg/backend/diy/state.go

586 lines
17 KiB
Go

// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package diy
import (
"context"
"errors"
"fmt"
"io"
"path"
"path/filepath"
"strings"
"time"
"github.com/pulumi/pulumi/sdk/v3/go/common/env"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/retry"
"github.com/pulumi/pulumi/pkg/v3/engine"
"gocloud.dev/blob"
"gocloud.dev/gcerrors"
"github.com/pulumi/pulumi/pkg/v3/backend"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/encoding"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)
// DisableIntegrityChecking can be set to true to disable checkpoint state integrity verification. This is not
// recommended, because it could mean proceeding even in the face of a corrupted checkpoint state file, but can
// be used as a last resort when a command absolutely must be run.
var DisableIntegrityChecking bool
type diyQuery struct {
root string
proj *workspace.Project
}
func (q *diyQuery) GetRoot() string {
return q.root
}
func (q *diyQuery) GetProject() *workspace.Project {
return q.proj
}
// update is an implementation of engine.Update backed by diy state.
type update struct {
root string
proj *workspace.Project
target *deploy.Target
backend *diyBackend
}
func (u *update) GetRoot() string {
return u.root
}
func (u *update) GetProject() *workspace.Project {
return u.proj
}
func (u *update) GetTarget() *deploy.Target {
return u.target
}
func (b *diyBackend) newQuery(
ctx context.Context,
op backend.QueryOperation,
) (engine.QueryInfo, error) {
return &diyQuery{root: op.Root, proj: op.Proj}, nil
}
func (b *diyBackend) newUpdate(
ctx context.Context,
secretsProvider secrets.Provider,
ref *diyBackendReference,
op backend.UpdateOperation,
) (*update, error) {
contract.Requiref(ref != nil, "ref", "must not be nil")
// Construct the deployment target.
target, err := b.getTarget(ctx, secretsProvider, ref,
op.StackConfiguration.Config, op.StackConfiguration.Decrypter)
if err != nil {
return nil, err
}
// Construct and return a new update.
return &update{
root: op.Root,
proj: op.Proj,
target: target,
backend: b,
}, nil
}
func (b *diyBackend) getTarget(
ctx context.Context,
secretsProvider secrets.Provider,
ref *diyBackendReference,
cfg config.Map,
dec config.Decrypter,
) (*deploy.Target, error) {
contract.Requiref(ref != nil, "ref", "must not be nil")
stack, err := b.GetStack(ctx, ref)
if err != nil {
return nil, err
}
snapshot, err := stack.Snapshot(ctx, secretsProvider)
if err != nil {
return nil, err
}
return &deploy.Target{
Name: ref.Name(),
Organization: "organization", // diy has no organizations really, but we just always say it's "organization"
Config: cfg,
Decrypter: dec,
Snapshot: snapshot,
}, nil
}
var errCheckpointNotFound = errors.New("checkpoint does not exist")
// stackExists simply does a check that the checkpoint file we expect for this stack exists.
func (b *diyBackend) stackExists(
ctx context.Context,
ref *diyBackendReference,
) (string, error) {
contract.Requiref(ref != nil, "ref", "must not be nil")
chkpath := b.stackPath(ctx, ref)
exists, err := b.bucket.Exists(ctx, chkpath)
if err != nil {
return chkpath, fmt.Errorf("failed to load checkpoint: %w", err)
}
if !exists {
return chkpath, errCheckpointNotFound
}
return chkpath, nil
}
func (b *diyBackend) getSnapshot(ctx context.Context,
secretsProvider secrets.Provider, ref *diyBackendReference,
) (*deploy.Snapshot, error) {
contract.Requiref(ref != nil, "ref", "must not be nil")
checkpoint, err := b.getCheckpoint(ctx, ref)
if err != nil {
return nil, fmt.Errorf("failed to load checkpoint: %w", err)
}
// Materialize an actual snapshot object.
snapshot, err := stack.DeserializeCheckpoint(ctx, secretsProvider, checkpoint)
if err != nil {
return nil, err
}
// Ensure the snapshot passes verification before returning it, to catch bugs early.
if !backend.DisableIntegrityChecking {
if err := snapshot.VerifyIntegrity(); err != nil {
if sie, ok := deploy.AsSnapshotIntegrityError(err); ok {
return nil, fmt.Errorf("snapshot integrity failure; refusing to use it: %w", sie.ForRead(snapshot))
}
return nil, fmt.Errorf("snapshot integrity failure; refusing to use it: %w", err)
}
}
return snapshot, nil
}
// GetCheckpoint loads a checkpoint file for the given stack in this project, from the current project workspace.
func (b *diyBackend) getCheckpoint(ctx context.Context, ref *diyBackendReference) (*apitype.CheckpointV3, error) {
chkpath := b.stackPath(ctx, ref)
bytes, err := b.bucket.ReadAll(ctx, chkpath)
if err != nil {
return nil, err
}
m := encoding.JSON
if encoding.IsCompressed(bytes) {
m = encoding.Gzip(m)
}
return stack.UnmarshalVersionedCheckpointToLatestCheckpoint(m, bytes)
}
func (b *diyBackend) saveCheckpoint(
ctx context.Context,
ref *diyBackendReference,
checkpoint *apitype.VersionedCheckpoint,
) (backupFile string, file string, _ error) {
// Make a serializable stack and then use the encoder to encode it.
file = b.stackPath(ctx, ref)
m, ext := encoding.Detect(strings.TrimSuffix(file, ".gz"))
if m == nil {
return "", "", fmt.Errorf("resource serialization failed; illegal markup extension: '%v'", ext)
}
if filepath.Ext(file) == "" {
file = file + ext
}
if b.gzip {
if filepath.Ext(file) != encoding.GZIPExt {
file = file + ".gz"
}
m = encoding.Gzip(m)
} else {
file = strings.TrimSuffix(file, ".gz")
}
byts, err := m.Marshal(checkpoint)
if err != nil {
return "", "", fmt.Errorf("An IO error occurred while marshalling the checkpoint: %w", err)
}
// Back up the existing file if it already exists. Don't delete the original, the following WriteAll will
// atomically replace it anyway and various other bits of the system depend on being able to find the
// .json file to know the stack currently exists (see https://github.com/pulumi/pulumi/issues/9033 for
// context).
filePlain := strings.TrimSuffix(file, ".gz")
fileGzip := filePlain + ".gz"
// We need to make sure that an out of date state file doesn't exist so we
// only keep the file of the type we are working with.
bckGzip := backupTarget(ctx, b.bucket, fileGzip, b.gzip)
bckPlain := backupTarget(ctx, b.bucket, filePlain, !b.gzip)
if b.gzip {
backupFile = bckGzip
} else {
backupFile = bckPlain
}
// And now write out the new snapshot file, overwriting that location.
if err = b.bucket.WriteAll(ctx, file, byts, nil); err != nil {
b.mutex.Lock()
defer b.mutex.Unlock()
// FIXME: Would be nice to make these configurable
delay, _ := time.ParseDuration("1s")
maxDelay, _ := time.ParseDuration("30s")
backoff := 1.2
// Retry the write 10 times in case of upstream bucket errors
_, _, err = retry.Until(ctx, retry.Acceptor{
Delay: &delay,
MaxDelay: &maxDelay,
Backoff: &backoff,
Accept: func(try int, nextRetryTime time.Duration) (bool, interface{}, error) {
// And now write out the new snapshot file, overwriting that location.
err := b.bucket.WriteAll(ctx, file, byts, nil)
if err != nil {
logging.V(7).Infof("Error while writing snapshot to: %s (attempt=%d, error=%s)", file, try, err)
if try > 10 {
return false, nil, fmt.Errorf("An IO error occurred while writing the new snapshot file: %w", err)
}
return false, nil, nil
}
return true, nil, nil
},
})
if err != nil {
return backupFile, "", err
}
}
logging.V(7).Infof("Saved stack %s checkpoint to: %s (backup=%s)", ref.FullyQualifiedName(), file, backupFile)
// And if we are retaining historical checkpoint information, write it out again
if b.Env.GetBool(env.DIYBackendRetainCheckpoints) {
if err = b.bucket.WriteAll(ctx, fmt.Sprintf("%v.%v", file, time.Now().UnixNano()), byts, nil); err != nil {
return backupFile, "", fmt.Errorf("An IO error occurred while writing the new snapshot file: %w", err)
}
}
return backupFile, file, nil
}
func (b *diyBackend) saveStack(
ctx context.Context,
ref *diyBackendReference, snap *deploy.Snapshot,
) (string, error) {
contract.Requiref(ref != nil, "ref", "ref was nil")
chk, err := stack.SerializeCheckpoint(ref.FullyQualifiedName(), snap, false /* showSecrets */)
if err != nil {
return "", fmt.Errorf("serializaing checkpoint: %w", err)
}
backup, file, err := b.saveCheckpoint(ctx, ref, chk)
if err != nil {
return "", err
}
if !backend.DisableIntegrityChecking {
// Finally, *after* writing the checkpoint, check the integrity. This is done afterwards so that we write
// out the checkpoint file since it may contain resource state updates. But we will warn the user that the
// file is already written and might be bad.
if verifyerr := snap.VerifyIntegrity(); verifyerr != nil {
return "", fmt.Errorf(
"%s: snapshot integrity failure; it was already written, but is invalid (backup available at %s): %w",
file, backup, verifyerr)
}
}
return file, nil
}
// removeStack removes information about a stack from the current workspace.
func (b *diyBackend) removeStack(ctx context.Context, ref *diyBackendReference) error {
contract.Requiref(ref != nil, "ref", "must not be nil")
// Just make a backup of the file and don't write out anything new.
file := b.stackPath(ctx, ref)
backupTarget(ctx, b.bucket, file, false)
historyDir := ref.HistoryDir()
return removeAllByPrefix(ctx, b.bucket, historyDir)
}
// backupTarget makes a backup of an existing file, in preparation for writing a new one.
func backupTarget(ctx context.Context, bucket Bucket, file string, keepOriginal bool) string {
contract.Requiref(file != "", "file", "must not be empty")
bck := file + ".bak"
err := bucket.Copy(ctx, bck, file, nil)
if err != nil {
logging.V(5).Infof("error copying %s to %s: %s", file, bck, err)
}
if !keepOriginal {
err = bucket.Delete(ctx, file)
if err != nil {
logging.V(5).Infof("error deleting source object after rename: %v (%v) skipping", file, err)
}
}
// IDEA: consider multiple backups (.bak.bak.bak...etc).
return bck
}
// backupStack copies the current Checkpoint file to ~/.pulumi/backups.
func (b *diyBackend) backupStack(ctx context.Context, ref *diyBackendReference) error {
contract.Requiref(ref != nil, "ref", "must not be nil")
// Exit early if backups are disabled.
if b.Env.GetBool(env.DIYBackendDisableCheckpointBackups) {
return nil
}
// Read the current checkpoint file. (Assuming it aleady exists.)
stackPath := b.stackPath(ctx, ref)
byts, err := b.bucket.ReadAll(ctx, stackPath)
if err != nil {
return err
}
// Get the backup directory.
backupDir := ref.BackupDir()
// Write out the new backup checkpoint file.
stackFile := filepath.Base(stackPath)
ext := filepath.Ext(stackFile)
base := strings.TrimSuffix(stackFile, ext)
if ext2 := filepath.Ext(base); ext2 != "" && ext == encoding.GZIPExt {
// base: stack-name.json, ext: .gz
// ->
// base: stack-name, ext: .json.gz
ext = ext2 + ext
base = strings.TrimSuffix(base, ext2)
}
backupFile := fmt.Sprintf("%s.%v%s", base, time.Now().UnixNano(), ext)
return b.bucket.WriteAll(ctx, filepath.Join(backupDir, backupFile), byts, nil)
}
func (b *diyBackend) stackPath(ctx context.Context, ref *diyBackendReference) string {
if ref == nil {
return StacksDir
}
// We can't use listBucket here for as we need to do a partial prefix match on filename, while the
// "dir" option to listBucket is always suffixed with "/". Also means we don't need to save any
// results in a slice.
plainPath := filepath.ToSlash(ref.StackBasePath()) + ".json"
gzipedPath := plainPath + ".gz"
bucketIter := b.bucket.List(&blob.ListOptions{
Delimiter: "/",
Prefix: plainPath,
})
var plainObj *blob.ListObject
for {
file, err := bucketIter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
// Error fetching the available ojects, assume .json
return plainPath
}
// plainObj will always come out first since allObjs is sorted by Key
if file.Key == plainPath {
plainObj = file
} else if file.Key == gzipedPath {
// We have a plain .json file and it was modified after this gzipped one so use it.
if plainObj != nil && plainObj.ModTime.After(file.ModTime) {
return plainPath
}
// else use the gzipped object
return gzipedPath
}
}
// Couldn't find any objects, assume nongzipped path?
return plainPath
}
// getHistory returns stored update history. The first element of the result will be
// the most recent update record.
func (b *diyBackend) getHistory(
ctx context.Context,
stack *diyBackendReference,
pageSize int, page int,
) ([]backend.UpdateInfo, error) {
contract.Requiref(stack != nil, "stack", "must not be nil")
dir := stack.HistoryDir()
// TODO: we could consider optimizing the list operation using `page` and `pageSize`.
// Unfortunately, this is mildly invasive given the gocloud List API.
allFiles, err := listBucket(ctx, b.bucket, dir)
if err != nil {
// History doesn't exist until a stack has been updated.
if gcerrors.Code(err) == gcerrors.NotFound {
return nil, nil
}
return nil, err
}
var historyEntries []*blob.ListObject
// filter down to just history entries, reversing list to be in most recent order.
// listBucket returns the array sorted by file name, but because of how we name files, older updates come before
// newer ones.
for i := len(allFiles) - 1; i >= 0; i-- {
file := allFiles[i]
filepath := file.Key
// ignore checkpoints
if !strings.HasSuffix(filepath, ".history.json") &&
!strings.HasSuffix(filepath, ".history.json.gz") {
continue
}
historyEntries = append(historyEntries, file)
}
start := 0
end := len(historyEntries) - 1
if pageSize > 0 {
if page < 1 {
page = 1
}
start = (page - 1) * pageSize
end = start + pageSize - 1
if end > len(historyEntries)-1 {
end = len(historyEntries) - 1
}
}
var updates []backend.UpdateInfo
for i := start; i <= end; i++ {
file := historyEntries[i]
filepath := file.Key
var update backend.UpdateInfo
b, err := b.bucket.ReadAll(ctx, filepath)
if err != nil {
return nil, fmt.Errorf("reading history file %s: %w", filepath, err)
}
m := encoding.JSON
if encoding.IsCompressed(b) {
m = encoding.Gzip(m)
}
err = m.Unmarshal(b, &update)
if err != nil {
return nil, fmt.Errorf("reading history file %s: %w", filepath, err)
}
updates = append(updates, update)
}
return updates, nil
}
func (b *diyBackend) renameHistory(ctx context.Context, oldName, newName *diyBackendReference) error {
contract.Requiref(oldName != nil, "oldName", "must not be nil")
contract.Requiref(newName != nil, "newName", "must not be nil")
oldHistory := oldName.HistoryDir()
newHistory := newName.HistoryDir()
allFiles, err := listBucket(ctx, b.bucket, oldHistory)
if err != nil {
// if there's nothing there, we don't really need to do a rename.
if gcerrors.Code(err) == gcerrors.NotFound {
return nil
}
return err
}
for _, file := range allFiles {
fileName := objectName(file)
oldBlob := path.Join(oldHistory, fileName)
// The filename format is <stack-name>-<timestamp>.[checkpoint|history].json[.gz], we need to change
// the stack name part but retain the other parts. If we find files that don't match this format
// ignore them.
dashIndex := strings.LastIndex(fileName, "-")
if dashIndex == -1 || (fileName[:dashIndex] != oldName.name.String()) {
// No dash or the string up to the dash isn't the old name
continue
}
newFileName := newName.name.String() + fileName[dashIndex:]
newBlob := path.Join(newHistory, newFileName)
if err := b.bucket.Copy(ctx, newBlob, oldBlob, nil); err != nil {
return fmt.Errorf("copying history file: %w", err)
}
if err := b.bucket.Delete(ctx, oldBlob); err != nil {
return fmt.Errorf("deleting existing history file: %w", err)
}
}
return nil
}
// addToHistory saves the UpdateInfo and makes a copy of the current Checkpoint file.
func (b *diyBackend) addToHistory(ctx context.Context, ref *diyBackendReference, update backend.UpdateInfo) error {
contract.Requiref(ref != nil, "ref", "must not be nil")
dir := ref.HistoryDir()
// Prefix for the update and checkpoint files.
pathPrefix := path.Join(dir, fmt.Sprintf("%s-%d", ref.name, time.Now().UnixNano()))
m, ext := encoding.JSON, "json"
if b.gzip {
m = encoding.Gzip(m)
ext += ".gz"
}
// Save the history file.
byts, err := m.Marshal(&update)
if err != nil {
return err
}
historyFile := fmt.Sprintf("%s.history.%s", pathPrefix, ext)
if err = b.bucket.WriteAll(ctx, historyFile, byts, nil); err != nil {
return err
}
// Make a copy of the checkpoint file. (Assuming it already exists.)
checkpointFile := fmt.Sprintf("%s.checkpoint.%s", pathPrefix, ext)
return b.bucket.Copy(ctx, checkpointFile, b.stackPath(ctx, ref), nil)
}