pulumi/pkg/resource/deploy/plan.go

292 lines
12 KiB
Go

// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deploy
import (
"bufio"
"bytes"
"fmt"
"github.com/blang/semver"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"github.com/pulumi/pulumi/pkg/diag"
"github.com/pulumi/pulumi/pkg/resource"
"github.com/pulumi/pulumi/pkg/resource/deploy/providers"
"github.com/pulumi/pulumi/pkg/resource/graph"
"github.com/pulumi/pulumi/pkg/resource/plugin"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
)
// Options controls the planning and deployment process.
type Options struct {
Events Events // an optional events callback interface.
Parallel int // the degree of parallelism for resource operations (<=1 for serial).
}
// DegreeOfParallelism returns the degree of parallelism that should be used during the
// planning and deployment process.
func (o Options) DegreeOfParallelism() int {
if o.Parallel <= 1 {
return 1
}
return o.Parallel
}
// Events is an interface that can be used to hook interesting engine/planning events.
type Events interface {
OnResourceStepPre(step Step) (interface{}, error)
OnResourceStepPost(ctx interface{}, step Step, status resource.Status, err error) error
OnResourceOutputs(step Step) error
}
// PlanSummary is an interface for summarizing the progress of a plan.
type PlanSummary interface {
Steps() int
Creates() map[resource.URN]bool
Updates() map[resource.URN]bool
Replaces() map[resource.URN]bool
Deletes() map[resource.URN]bool
Sames() map[resource.URN]bool
}
// InvalidResourceError is returned by deploy.NewPlan when the engine observes
// that one or more resources are in an invalid state. The engine populates `InvalidResources`
// with the URNs of every resource that was invalid.
type InvalidResourceError struct {
InvalidResources []*resource.State
}
func (ire InvalidResourceError) Error() string {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
fmt.Fprintf(writer,
"error: the current deployment has %d resource(s) whose statuses are unknown:\n", len(ire.InvalidResources))
for _, res := range ire.InvalidResources {
fmt.Fprintf(writer, " * %s, interrupted while %s\n", res.URN, res.Status)
}
fmt.Fprintf(writer, "\n")
fmt.Fprintf(writer, "These resources have unknown statuses because the Pulumi CLI was interrupted before it\n")
fmt.Fprintf(writer, "had the opportunity to see if an operation that it initiated was successful. You should\n")
fmt.Fprintf(writer, "confirm whether or not the above operations completed successfully with your cloud provider.\n")
fmt.Fprintf(writer, "\n")
fmt.Fprintf(writer, "Once you have confirmed the status of the interrupted operations, you can repair your stack\n")
fmt.Fprintf(writer, "using `pulumi stack export` to export your stack to a file. For each operation that succeeded,\n")
fmt.Fprintf(writer, "remove the `status` field. Once this is complete, use `pulumi stack import` to import the\n")
fmt.Fprintf(writer, "repaired stack.")
contract.IgnoreError(writer.Flush())
return buf.String()
}
// Plan is the output of analyzing resource graphs and contains the steps necessary to perform an infrastructure
// deployment. A plan can be generated out of whole cloth from a resource graph -- in the case of new deployments --
// however, it can alternatively be generated by diffing two resource graphs -- in the case of updates to existing
// stacks (presumably more common). The plan contains step objects that can be used to drive a deployment.
type Plan struct {
ctx *plugin.Context // the plugin context (for provider operations).
target *Target // the deployment target.
prev *Snapshot // the old resource snapshot for comparison.
olds map[resource.URN]*resource.State // a map of all old resources.
source Source // the source of new resources.
analyzers []tokens.QName // the analyzers to run during this plan's generation.
preview bool // true if this plan is to be previewed rather than applied.
depGraph *graph.DependencyGraph // the dependency graph of the old snapshot
providers *providers.Registry // the provider registry for this plan.
}
// addDefaultProviders adds any necessary default provider definitions and references to the given snapshot. Version
// information for these providers is sourced from the snapshot's manifest; inputs parameters are sourced from the
// stack's configuration.
func addDefaultProviders(target *Target, source Source, prev *Snapshot) error {
if prev == nil {
return nil
}
// Pull the versions we'll use for default providers from the snapshot's manifest.
defaultProviderVersions := make(map[tokens.Package]*semver.Version)
for _, p := range prev.Manifest.Plugins {
defaultProviderVersions[tokens.Package(p.Name)] = p.Version
}
// Determine the necessary set of default providers and inject references to default providers as appropriate.
//
// We do this by scraping the snapshot for custom resources that does not reference a provider and adding
// default providers for these resources' packages. Each of these resources is rewritten to reference the default
// provider for its package.
//
// The configuration for each default provider is pulled from the stack's configuration information.
var defaultProviders []*resource.State
defaultProviderRefs := make(map[tokens.Package]providers.Reference)
for _, res := range prev.Resources {
if providers.IsProviderType(res.URN.Type()) || !res.Custom || res.Provider != "" {
continue
}
pkg := res.URN.Type().Package()
ref, ok := defaultProviderRefs[pkg]
if !ok {
cfg, err := target.GetPackageConfig(pkg)
if err != nil {
return errors.Errorf("could not fetch configuration for default provider '%v'", pkg)
}
inputs := make(resource.PropertyMap)
for k, v := range cfg {
inputs[resource.PropertyKey(k.Name())] = resource.NewStringProperty(v)
}
if version, ok := defaultProviderVersions[pkg]; ok {
inputs["version"] = resource.NewStringProperty(version.String())
}
urn, id := defaultProviderURN(target, source, pkg), resource.ID(uuid.NewV4().String())
ref, err = providers.NewReference(urn, id)
contract.Assert(err == nil)
provider := &resource.State{
Type: urn.Type(),
URN: urn,
Custom: true,
ID: id,
Inputs: inputs,
}
defaultProviders = append(defaultProviders, provider)
defaultProviderRefs[pkg] = ref
}
res.Provider = ref.String()
}
// If any default providers are necessary, prepend their definitions to the snapshot's resources. This trivially
// guarantees that all default provider references name providers that precede the referent in the snapshot.
if len(defaultProviders) != 0 {
prev.Resources = append(defaultProviders, prev.Resources...)
}
return nil
}
// NewPlan creates a new deployment plan from a resource snapshot plus a package to evaluate.
//
// From the old and new states, it understands how to orchestrate an evaluation and analyze the resulting resources.
// The plan may be used to simply inspect a series of operations, or actually perform them; these operations are
// generated based on analysis of the old and new states. If a resource exists in new, but not old, for example, it
// results in a create; if it exists in both, but is different, it results in an update; and so on and so forth.
//
// Note that a plan uses internal concurrency and parallelism in various ways, so it must be closed if for some reason
// a plan isn't carried out to its final conclusion. This will result in cancelation and reclamation of OS resources.
func NewPlan(ctx *plugin.Context, target *Target, prev *Snapshot, source Source, analyzers []tokens.QName,
preview bool) (*Plan, error) {
contract.Assert(ctx != nil)
contract.Assert(target != nil)
contract.Assert(source != nil)
// Add any necessary default provider references to the previous snapshot in order to accommodate stacks that were
// created prior to the changes that added first-class providers. We do this here rather than in the migration
// package s.t. the inputs to any default providers (which we fetch from the stacks's configuration) are as
// accurate as possible.
if err := addDefaultProviders(target, source, prev); err != nil {
return nil, err
}
var depGraph *graph.DependencyGraph
var oldResources []*resource.State
var invalidResources []*resource.State
// Produce a map of all old resources for fast resources.
olds := make(map[resource.URN]*resource.State)
if prev != nil {
oldResources = prev.Resources
for _, oldres := range oldResources {
// Ignore resources that are pending deletion; these should not be recorded in the LUT.
if oldres.Delete {
continue
}
if oldres.Status != resource.OperationStatusEmpty {
invalidResources = append(invalidResources, oldres)
continue
}
urn := oldres.URN
contract.Assert(olds[urn] == nil)
olds[urn] = oldres
}
depGraph = graph.NewDependencyGraph(oldResources)
}
// Create a new provider registry. Although we really only need to pass in any providers that were present in the
// old resource list, the registry itself will filter out other sorts of resources when processing the prior state,
// so we just pass all of the old resources.
reg, err := providers.NewRegistry(ctx.Host, oldResources, preview)
if err != nil {
return nil, err
}
if invalidResources != nil {
return nil, InvalidResourceError{invalidResources}
}
return &Plan{
ctx: ctx,
target: target,
prev: prev,
olds: olds,
source: source,
analyzers: analyzers,
preview: preview,
depGraph: depGraph,
providers: reg,
}, nil
}
func (p *Plan) Ctx() *plugin.Context { return p.ctx }
func (p *Plan) Target() *Target { return p.target }
func (p *Plan) Diag() diag.Sink { return p.ctx.Diag }
func (p *Plan) Prev() *Snapshot { return p.prev }
func (p *Plan) Olds() map[resource.URN]*resource.State { return p.olds }
func (p *Plan) Source() Source { return p.source }
func (p *Plan) IsRefresh() bool { return p.source.IsRefresh() }
func (p *Plan) SignalCancellation() error {
return p.ctx.Host.SignalCancellation()
}
func (p *Plan) GetProvider(ref providers.Reference) (plugin.Provider, bool) {
return p.providers.GetProvider(ref)
}
// generateURN generates a resource's URN from its parent, type, and name under the scope of the plan's stack and
// project.
func (p *Plan) generateURN(parent resource.URN, ty tokens.Type, name tokens.QName) resource.URN {
// Use the resource goal state name to produce a globally unique URN.
parentType := tokens.Type("")
if parent != "" && parent.Type() != resource.RootStackType {
// Skip empty parents and don't use the root stack type; otherwise, use the full qualified type.
parentType = parent.QualifiedType()
}
return resource.NewURN(p.Target().Name, p.source.Project(), parentType, ty, name)
}
// defaultProviderURN generates the URN for the global provider given a package.
func defaultProviderURN(target *Target, source Source, pkg tokens.Package) resource.URN {
return resource.NewURN(target.Name, source.Project(), "", providers.MakeProviderType(pkg), "default")
}