// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package diy import ( "context" "errors" "fmt" "io" "path" "path/filepath" "strings" "time" "github.com/pulumi/pulumi/sdk/v3/go/common/env" "github.com/pulumi/pulumi/sdk/v3/go/common/util/retry" "github.com/pulumi/pulumi/pkg/v3/engine" "gocloud.dev/blob" "gocloud.dev/gcerrors" "github.com/pulumi/pulumi/pkg/v3/backend" "github.com/pulumi/pulumi/pkg/v3/resource/deploy" "github.com/pulumi/pulumi/pkg/v3/resource/stack" "github.com/pulumi/pulumi/pkg/v3/secrets" "github.com/pulumi/pulumi/sdk/v3/go/common/apitype" "github.com/pulumi/pulumi/sdk/v3/go/common/encoding" "github.com/pulumi/pulumi/sdk/v3/go/common/resource/config" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" ) // DisableIntegrityChecking can be set to true to disable checkpoint state integrity verification. This is not // recommended, because it could mean proceeding even in the face of a corrupted checkpoint state file, but can // be used as a last resort when a command absolutely must be run. var DisableIntegrityChecking bool type diyQuery struct { root string proj *workspace.Project } func (q *diyQuery) GetRoot() string { return q.root } func (q *diyQuery) GetProject() *workspace.Project { return q.proj } // update is an implementation of engine.Update backed by diy state. type update struct { root string proj *workspace.Project target *deploy.Target backend *diyBackend } func (u *update) GetRoot() string { return u.root } func (u *update) GetProject() *workspace.Project { return u.proj } func (u *update) GetTarget() *deploy.Target { return u.target } func (b *diyBackend) newQuery( ctx context.Context, op backend.QueryOperation, ) (engine.QueryInfo, error) { return &diyQuery{root: op.Root, proj: op.Proj}, nil } func (b *diyBackend) newUpdate( ctx context.Context, secretsProvider secrets.Provider, ref *diyBackendReference, op backend.UpdateOperation, ) (*update, error) { contract.Requiref(ref != nil, "ref", "must not be nil") // Construct the deployment target. target, err := b.getTarget(ctx, secretsProvider, ref, op.StackConfiguration.Config, op.StackConfiguration.Decrypter) if err != nil { return nil, err } // Construct and return a new update. return &update{ root: op.Root, proj: op.Proj, target: target, backend: b, }, nil } func (b *diyBackend) getTarget( ctx context.Context, secretsProvider secrets.Provider, ref *diyBackendReference, cfg config.Map, dec config.Decrypter, ) (*deploy.Target, error) { contract.Requiref(ref != nil, "ref", "must not be nil") stack, err := b.GetStack(ctx, ref) if err != nil { return nil, err } snapshot, err := stack.Snapshot(ctx, secretsProvider) if err != nil { return nil, err } return &deploy.Target{ Name: ref.Name(), Organization: "organization", // diy has no organizations really, but we just always say it's "organization" Config: cfg, Decrypter: dec, Snapshot: snapshot, }, nil } var errCheckpointNotFound = errors.New("checkpoint does not exist") // stackExists simply does a check that the checkpoint file we expect for this stack exists. func (b *diyBackend) stackExists( ctx context.Context, ref *diyBackendReference, ) (string, error) { contract.Requiref(ref != nil, "ref", "must not be nil") chkpath := b.stackPath(ctx, ref) exists, err := b.bucket.Exists(ctx, chkpath) if err != nil { return chkpath, fmt.Errorf("failed to load checkpoint: %w", err) } if !exists { return chkpath, errCheckpointNotFound } return chkpath, nil } func (b *diyBackend) getSnapshot(ctx context.Context, secretsProvider secrets.Provider, ref *diyBackendReference, ) (*deploy.Snapshot, error) { contract.Requiref(ref != nil, "ref", "must not be nil") checkpoint, err := b.getCheckpoint(ctx, ref) if err != nil { return nil, fmt.Errorf("failed to load checkpoint: %w", err) } // Materialize an actual snapshot object. snapshot, err := stack.DeserializeCheckpoint(ctx, secretsProvider, checkpoint) if err != nil { return nil, err } // Ensure the snapshot passes verification before returning it, to catch bugs early. if !backend.DisableIntegrityChecking { if err := snapshot.VerifyIntegrity(); err != nil { return nil, fmt.Errorf("snapshot integrity failure; refusing to use it: %w", err) } } return snapshot, nil } // GetCheckpoint loads a checkpoint file for the given stack in this project, from the current project workspace. func (b *diyBackend) getCheckpoint(ctx context.Context, ref *diyBackendReference) (*apitype.CheckpointV3, error) { chkpath := b.stackPath(ctx, ref) bytes, err := b.bucket.ReadAll(ctx, chkpath) if err != nil { return nil, err } m := encoding.JSON if encoding.IsCompressed(bytes) { m = encoding.Gzip(m) } return stack.UnmarshalVersionedCheckpointToLatestCheckpoint(m, bytes) } func (b *diyBackend) saveCheckpoint( ctx context.Context, ref *diyBackendReference, checkpoint *apitype.VersionedCheckpoint, ) (backupFile string, file string, _ error) { // Make a serializable stack and then use the encoder to encode it. file = b.stackPath(ctx, ref) m, ext := encoding.Detect(strings.TrimSuffix(file, ".gz")) if m == nil { return "", "", fmt.Errorf("resource serialization failed; illegal markup extension: '%v'", ext) } if filepath.Ext(file) == "" { file = file + ext } if b.gzip { if filepath.Ext(file) != encoding.GZIPExt { file = file + ".gz" } m = encoding.Gzip(m) } else { file = strings.TrimSuffix(file, ".gz") } byts, err := m.Marshal(checkpoint) if err != nil { return "", "", fmt.Errorf("An IO error occurred while marshalling the checkpoint: %w", err) } // Back up the existing file if it already exists. Don't delete the original, the following WriteAll will // atomically replace it anyway and various other bits of the system depend on being able to find the // .json file to know the stack currently exists (see https://github.com/pulumi/pulumi/issues/9033 for // context). filePlain := strings.TrimSuffix(file, ".gz") fileGzip := filePlain + ".gz" // We need to make sure that an out of date state file doesn't exist so we // only keep the file of the type we are working with. bckGzip := backupTarget(ctx, b.bucket, fileGzip, b.gzip) bckPlain := backupTarget(ctx, b.bucket, filePlain, !b.gzip) if b.gzip { backupFile = bckGzip } else { backupFile = bckPlain } // And now write out the new snapshot file, overwriting that location. if err = b.bucket.WriteAll(ctx, file, byts, nil); err != nil { b.mutex.Lock() defer b.mutex.Unlock() // FIXME: Would be nice to make these configurable delay, _ := time.ParseDuration("1s") maxDelay, _ := time.ParseDuration("30s") backoff := 1.2 // Retry the write 10 times in case of upstream bucket errors _, _, err = retry.Until(ctx, retry.Acceptor{ Delay: &delay, MaxDelay: &maxDelay, Backoff: &backoff, Accept: func(try int, nextRetryTime time.Duration) (bool, interface{}, error) { // And now write out the new snapshot file, overwriting that location. err := b.bucket.WriteAll(ctx, file, byts, nil) if err != nil { logging.V(7).Infof("Error while writing snapshot to: %s (attempt=%d, error=%s)", file, try, err) if try > 10 { return false, nil, fmt.Errorf("An IO error occurred while writing the new snapshot file: %w", err) } return false, nil, nil } return true, nil, nil }, }) if err != nil { return backupFile, "", err } } logging.V(7).Infof("Saved stack %s checkpoint to: %s (backup=%s)", ref.FullyQualifiedName(), file, backupFile) // And if we are retaining historical checkpoint information, write it out again if b.Env.GetBool(env.DIYBackendRetainCheckpoints) { if err = b.bucket.WriteAll(ctx, fmt.Sprintf("%v.%v", file, time.Now().UnixNano()), byts, nil); err != nil { return backupFile, "", fmt.Errorf("An IO error occurred while writing the new snapshot file: %w", err) } } return backupFile, file, nil } func (b *diyBackend) saveStack( ctx context.Context, ref *diyBackendReference, snap *deploy.Snapshot, sm secrets.Manager, ) (string, error) { contract.Requiref(ref != nil, "ref", "ref was nil") chk, err := stack.SerializeCheckpoint(ref.FullyQualifiedName(), snap, false /* showSecrets */) if err != nil { return "", fmt.Errorf("serializaing checkpoint: %w", err) } backup, file, err := b.saveCheckpoint(ctx, ref, chk) if err != nil { return "", err } if !backend.DisableIntegrityChecking { // Finally, *after* writing the checkpoint, check the integrity. This is done afterwards so that we write // out the checkpoint file since it may contain resource state updates. But we will warn the user that the // file is already written and might be bad. if verifyerr := snap.VerifyIntegrity(); verifyerr != nil { return "", fmt.Errorf( "%s: snapshot integrity failure; it was already written, but is invalid (backup available at %s): %w", file, backup, verifyerr) } } return file, nil } // removeStack removes information about a stack from the current workspace. func (b *diyBackend) removeStack(ctx context.Context, ref *diyBackendReference) error { contract.Requiref(ref != nil, "ref", "must not be nil") // Just make a backup of the file and don't write out anything new. file := b.stackPath(ctx, ref) backupTarget(ctx, b.bucket, file, false) historyDir := ref.HistoryDir() return removeAllByPrefix(ctx, b.bucket, historyDir) } // backupTarget makes a backup of an existing file, in preparation for writing a new one. func backupTarget(ctx context.Context, bucket Bucket, file string, keepOriginal bool) string { contract.Requiref(file != "", "file", "must not be empty") bck := file + ".bak" err := bucket.Copy(ctx, bck, file, nil) if err != nil { logging.V(5).Infof("error copying %s to %s: %s", file, bck, err) } if !keepOriginal { err = bucket.Delete(ctx, file) if err != nil { logging.V(5).Infof("error deleting source object after rename: %v (%v) skipping", file, err) } } // IDEA: consider multiple backups (.bak.bak.bak...etc). return bck } // backupStack copies the current Checkpoint file to ~/.pulumi/backups. func (b *diyBackend) backupStack(ctx context.Context, ref *diyBackendReference) error { contract.Requiref(ref != nil, "ref", "must not be nil") // Exit early if backups are disabled. if b.Env.GetBool(env.DIYBackendDisableCheckpointBackups) { return nil } // Read the current checkpoint file. (Assuming it aleady exists.) stackPath := b.stackPath(ctx, ref) byts, err := b.bucket.ReadAll(ctx, stackPath) if err != nil { return err } // Get the backup directory. backupDir := ref.BackupDir() // Write out the new backup checkpoint file. stackFile := filepath.Base(stackPath) ext := filepath.Ext(stackFile) base := strings.TrimSuffix(stackFile, ext) if ext2 := filepath.Ext(base); ext2 != "" && ext == encoding.GZIPExt { // base: stack-name.json, ext: .gz // -> // base: stack-name, ext: .json.gz ext = ext2 + ext base = strings.TrimSuffix(base, ext2) } backupFile := fmt.Sprintf("%s.%v%s", base, time.Now().UnixNano(), ext) return b.bucket.WriteAll(ctx, filepath.Join(backupDir, backupFile), byts, nil) } func (b *diyBackend) stackPath(ctx context.Context, ref *diyBackendReference) string { if ref == nil { return StacksDir } // We can't use listBucket here for as we need to do a partial prefix match on filename, while the // "dir" option to listBucket is always suffixed with "/". Also means we don't need to save any // results in a slice. plainPath := filepath.ToSlash(ref.StackBasePath()) + ".json" gzipedPath := plainPath + ".gz" bucketIter := b.bucket.List(&blob.ListOptions{ Delimiter: "/", Prefix: plainPath, }) var plainObj *blob.ListObject for { file, err := bucketIter.Next(ctx) if err == io.EOF { break } if err != nil { // Error fetching the available ojects, assume .json return plainPath } // plainObj will always come out first since allObjs is sorted by Key if file.Key == plainPath { plainObj = file } else if file.Key == gzipedPath { // We have a plain .json file and it was modified after this gzipped one so use it. if plainObj != nil && plainObj.ModTime.After(file.ModTime) { return plainPath } // else use the gzipped object return gzipedPath } } // Couldn't find any objects, assume nongzipped path? return plainPath } // getHistory returns stored update history. The first element of the result will be // the most recent update record. func (b *diyBackend) getHistory( ctx context.Context, stack *diyBackendReference, pageSize int, page int, ) ([]backend.UpdateInfo, error) { contract.Requiref(stack != nil, "stack", "must not be nil") dir := stack.HistoryDir() // TODO: we could consider optimizing the list operation using `page` and `pageSize`. // Unfortunately, this is mildly invasive given the gocloud List API. allFiles, err := listBucket(ctx, b.bucket, dir) if err != nil { // History doesn't exist until a stack has been updated. if gcerrors.Code(err) == gcerrors.NotFound { return nil, nil } return nil, err } var historyEntries []*blob.ListObject // filter down to just history entries, reversing list to be in most recent order. // listBucket returns the array sorted by file name, but because of how we name files, older updates come before // newer ones. for i := len(allFiles) - 1; i >= 0; i-- { file := allFiles[i] filepath := file.Key // ignore checkpoints if !strings.HasSuffix(filepath, ".history.json") && !strings.HasSuffix(filepath, ".history.json.gz") { continue } historyEntries = append(historyEntries, file) } start := 0 end := len(historyEntries) - 1 if pageSize > 0 { if page < 1 { page = 1 } start = (page - 1) * pageSize end = start + pageSize - 1 if end > len(historyEntries)-1 { end = len(historyEntries) - 1 } } var updates []backend.UpdateInfo for i := start; i <= end; i++ { file := historyEntries[i] filepath := file.Key var update backend.UpdateInfo b, err := b.bucket.ReadAll(ctx, filepath) if err != nil { return nil, fmt.Errorf("reading history file %s: %w", filepath, err) } m := encoding.JSON if encoding.IsCompressed(b) { m = encoding.Gzip(m) } err = m.Unmarshal(b, &update) if err != nil { return nil, fmt.Errorf("reading history file %s: %w", filepath, err) } updates = append(updates, update) } return updates, nil } func (b *diyBackend) renameHistory(ctx context.Context, oldName, newName *diyBackendReference) error { contract.Requiref(oldName != nil, "oldName", "must not be nil") contract.Requiref(newName != nil, "newName", "must not be nil") oldHistory := oldName.HistoryDir() newHistory := newName.HistoryDir() allFiles, err := listBucket(ctx, b.bucket, oldHistory) if err != nil { // if there's nothing there, we don't really need to do a rename. if gcerrors.Code(err) == gcerrors.NotFound { return nil } return err } for _, file := range allFiles { fileName := objectName(file) oldBlob := path.Join(oldHistory, fileName) // The filename format is <stack-name>-<timestamp>.[checkpoint|history].json[.gz], we need to change // the stack name part but retain the other parts. If we find files that don't match this format // ignore them. dashIndex := strings.LastIndex(fileName, "-") if dashIndex == -1 || (fileName[:dashIndex] != oldName.name.String()) { // No dash or the string up to the dash isn't the old name continue } newFileName := newName.name.String() + fileName[dashIndex:] newBlob := path.Join(newHistory, newFileName) if err := b.bucket.Copy(ctx, newBlob, oldBlob, nil); err != nil { return fmt.Errorf("copying history file: %w", err) } if err := b.bucket.Delete(ctx, oldBlob); err != nil { return fmt.Errorf("deleting existing history file: %w", err) } } return nil } // addToHistory saves the UpdateInfo and makes a copy of the current Checkpoint file. func (b *diyBackend) addToHistory(ctx context.Context, ref *diyBackendReference, update backend.UpdateInfo) error { contract.Requiref(ref != nil, "ref", "must not be nil") dir := ref.HistoryDir() // Prefix for the update and checkpoint files. pathPrefix := path.Join(dir, fmt.Sprintf("%s-%d", ref.name, time.Now().UnixNano())) m, ext := encoding.JSON, "json" if b.gzip { m = encoding.Gzip(m) ext += ".gz" } // Save the history file. byts, err := m.Marshal(&update) if err != nil { return err } historyFile := fmt.Sprintf("%s.history.%s", pathPrefix, ext) if err = b.bucket.WriteAll(ctx, historyFile, byts, nil); err != nil { return err } // Make a copy of the checkpoint file. (Assuming it already exists.) checkpointFile := fmt.Sprintf("%s.checkpoint.%s", pathPrefix, ext) return b.bucket.Copy(ctx, checkpointFile, b.stackPath(ctx, ref), nil) }