pulumi/pkg/engine/events.go

670 lines
21 KiB
Go

// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"bytes"
"time"
"github.com/pulumi/pulumi/pkg/v3/display"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag/colors"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/deepcopy"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)
// Event represents an event generated by the engine during an operation. The underlying
// type for the `Payload` field will differ depending on the value of the `Type` field
type Event struct {
Type EventType
payload interface{}
}
func NewEvent(typ EventType, payload interface{}) Event {
ok := false
switch typ {
case CancelEvent:
ok = payload == nil
case StdoutColorEvent:
_, ok = payload.(StdoutEventPayload)
case DiagEvent:
_, ok = payload.(DiagEventPayload)
case PreludeEvent:
_, ok = payload.(PreludeEventPayload)
case SummaryEvent:
_, ok = payload.(SummaryEventPayload)
case ResourcePreEvent:
_, ok = payload.(ResourcePreEventPayload)
case ResourceOutputsEvent:
_, ok = payload.(ResourceOutputsEventPayload)
case ResourceOperationFailed:
_, ok = payload.(ResourceOperationFailedPayload)
case PolicyViolationEvent:
_, ok = payload.(PolicyViolationEventPayload)
case PolicyRemediationEvent:
_, ok = payload.(PolicyRemediationEventPayload)
default:
contract.Failf("unknown event type %v", typ)
}
contract.Assertf(ok, "invalid payload of type %T for event type %v", payload, typ)
return Event{
Type: typ,
payload: deepcopy.Copy(payload),
}
}
// EventType is the kind of event being emitted.
type EventType string
const (
CancelEvent EventType = "cancel"
StdoutColorEvent EventType = "stdoutcolor"
DiagEvent EventType = "diag"
PreludeEvent EventType = "prelude"
SummaryEvent EventType = "summary"
ResourcePreEvent EventType = "resource-pre"
ResourceOutputsEvent EventType = "resource-outputs"
ResourceOperationFailed EventType = "resource-operationfailed"
PolicyViolationEvent EventType = "policy-violation"
PolicyRemediationEvent EventType = "policy-remediation"
)
func (e Event) Payload() interface{} {
return e.payload
}
func cancelEvent() Event {
return Event{Type: CancelEvent}
}
// DiagEventPayload is the payload for an event with type `diag`
type DiagEventPayload struct {
URN resource.URN
Prefix string
Message string
Color colors.Colorization
Severity diag.Severity
StreamID int32
Ephemeral bool
}
// PolicyViolationEventPayload is the payload for an event with type `policy-violation`.
type PolicyViolationEventPayload struct {
ResourceURN resource.URN
Message string
Color colors.Colorization
PolicyName string
PolicyPackName string
PolicyPackVersion string
EnforcementLevel apitype.EnforcementLevel
Prefix string
}
// PolicyRemediationEventPayload is the payload for an event with type `policy-remediation`.
type PolicyRemediationEventPayload struct {
ResourceURN resource.URN
Color colors.Colorization
PolicyName string
PolicyPackName string
PolicyPackVersion string
Before resource.PropertyMap
After resource.PropertyMap
}
type StdoutEventPayload struct {
Message string
Color colors.Colorization
}
type PreludeEventPayload struct {
IsPreview bool // true if this prelude is for a plan operation
Config map[string]string // the keys and values for config. For encrypted config, the values may be blinded
}
type SummaryEventPayload struct {
IsPreview bool // true if this summary is for a plan operation
MaybeCorrupt bool // true if one or more resources may be corrupt
Duration time.Duration // the duration of the entire update operation (zero values for previews)
ResourceChanges display.ResourceChanges // count of changed resources, useful for reporting
PolicyPacks map[string]string // {policy-pack: version} for each policy pack applied
}
type ResourceOperationFailedPayload struct {
Metadata StepEventMetadata
Status resource.Status
Steps int
}
type ResourceOutputsEventPayload struct {
Metadata StepEventMetadata
Planning bool
Debug bool
}
type ResourcePreEventPayload struct {
Metadata StepEventMetadata
Planning bool
Debug bool
}
// StepEventMetadata contains the metadata associated with a step the engine is performing.
type StepEventMetadata struct {
Op display.StepOp // the operation performed by this step.
URN resource.URN // the resource URN (for before and after).
Type tokens.Type // the type affected by this step.
Old *StepEventStateMetadata // the state of the resource before performing this step.
New *StepEventStateMetadata // the state of the resource after performing this step.
Res *StepEventStateMetadata // the latest state for the resource that is known (worst case, old).
Keys []resource.PropertyKey // the keys causing replacement (only for CreateStep and ReplaceStep).
Diffs []resource.PropertyKey // the keys causing diffs
DetailedDiff map[string]plugin.PropertyDiff // the rich, structured diff
Logical bool // true if this step represents a logical operation in the program.
Provider string // the provider that performed this step.
}
// StepEventStateMetadata contains detailed metadata about a resource's state pertaining to a given step.
type StepEventStateMetadata struct {
// State contains the raw, complete state, for this resource.
State *resource.State
// the resource's type.
Type tokens.Type
// the resource's object urn, a human-friendly, unique name for the resource.
URN resource.URN
// true if the resource is custom, managed by a plugin.
Custom bool
// true if this resource is pending deletion due to a replacement.
Delete bool
// the resource's unique ID, assigned by the resource provider (or blank if none/uncreated).
ID resource.ID
// an optional parent URN that this resource belongs to.
Parent resource.URN
// true to "protect" this resource (protected resources cannot be deleted).
Protect bool
// RetainOnDelete is true if the resource is not physically deleted when it is logically deleted.
RetainOnDelete bool `json:"retainOnDelete"`
// the resource's input properties (as specified by the program). Note: because this will cross
// over rpc boundaries it will be slightly different than the Inputs found in resource_state.
// Specifically, secrets will have been filtered out, and large values (like assets) will be
// have a simple hash-based representation. This allows clients to display this information
// properly, without worrying about leaking sensitive data, and without having to transmit huge
// amounts of data.
Inputs resource.PropertyMap
// the resource's complete output state (as returned by the resource provider). See "Inputs"
// for additional details about how data will be transformed before going into this map.
Outputs resource.PropertyMap
// the resource's provider reference
Provider string
// InitErrors is the set of errors encountered in the process of initializing resource (i.e.,
// during create or update).
InitErrors []string
}
func makeEventEmitter(events chan<- Event, update UpdateInfo) (eventEmitter, error) {
target := update.GetTarget()
var secrets []string
if target != nil && target.Config.HasSecureValue() {
for k, v := range target.Config {
if !v.Secure() {
continue
}
secureValues, err := v.SecureValues(target.Decrypter)
if err != nil {
return eventEmitter{}, DecryptError{
Key: k,
Err: err,
}
}
secrets = append(secrets, secureValues...)
}
}
logging.AddGlobalFilter(logging.CreateFilter(secrets, "[secret]"))
buffer, done := make(chan Event), make(chan bool)
go queueEvents(events, buffer, done)
return eventEmitter{
done: done,
ch: buffer,
}, nil
}
func makeQueryEventEmitter(events chan<- Event) (eventEmitter, error) {
buffer, done := make(chan Event), make(chan bool)
go queueEvents(events, buffer, done)
return eventEmitter{
done: done,
ch: buffer,
}, nil
}
type eventEmitter struct {
done <-chan bool
ch chan<- Event
}
func queueEvents(events chan<- Event, buffer chan Event, done chan bool) {
// Instead of sending to the source channel directly, buffer events to account for slow receivers.
//
// Buffering is done by a goroutine that concurrently receives from the senders and attempts to send events to the
// receiver. Events that are received while waiting for the receiver to catch up are buffered in a slice.
//
// We do not use a buffered channel because it is empirically less likely that the goroutine reading from a
// buffered channel will be scheduled when new data is placed in the channel.
defer close(done)
contract.Assertf(buffer != nil, "buffer channel must not be nil")
var queue []Event
for {
e, ok := <-buffer
if !ok {
return
}
queue = append(queue, e)
// While there are events in the queue, attempt to send them to the waiting receiver. If the receiver is
// blocked and an event is received from the event senders, stick that event in the queue.
for len(queue) > 0 {
select {
case e, ok := <-buffer:
if !ok {
// If the event source has been closed, flush the queue.
for _, e := range queue {
trySendEvent(events, e)
}
return
}
queue = append(queue, e)
case events <- queue[0]:
queue = queue[1:]
}
}
}
}
func makeStepEventMetadata(op display.StepOp, step deploy.Step, debug bool) StepEventMetadata {
contract.Assertf(op == step.Op() || step.Op() == deploy.OpRefresh,
"step must be %v or %v, got %v", op, deploy.OpRefresh, step.Op())
var keys, diffs []resource.PropertyKey
if keyer, hasKeys := step.(interface{ Keys() []resource.PropertyKey }); hasKeys {
keys = keyer.Keys()
}
if differ, hasDiffs := step.(interface{ Diffs() []resource.PropertyKey }); hasDiffs {
diffs = differ.Diffs()
}
var detailedDiff map[string]plugin.PropertyDiff
if detailedDiffer, hasDetailedDiff := step.(interface {
DetailedDiff() map[string]plugin.PropertyDiff
}); hasDetailedDiff {
detailedDiff = detailedDiffer.DetailedDiff()
}
return StepEventMetadata{
Op: op,
URN: step.URN(),
Type: step.Type(),
Keys: keys,
Diffs: diffs,
DetailedDiff: detailedDiff,
Old: makeStepEventStateMetadata(step.Old(), debug),
New: makeStepEventStateMetadata(step.New(), debug),
Res: makeStepEventStateMetadata(step.Res(), debug),
Logical: step.Logical(),
Provider: step.Provider(),
}
}
func makeStepEventStateMetadata(state *resource.State, debug bool) *StepEventStateMetadata {
if state == nil {
return nil
}
return &StepEventStateMetadata{
State: state,
Type: state.Type,
URN: state.URN,
Custom: state.Custom,
Delete: state.Delete,
ID: state.ID,
Parent: state.Parent,
Protect: state.Protect,
RetainOnDelete: state.RetainOnDelete,
Inputs: filterResourceProperties(state.Inputs, debug),
Outputs: filterResourceProperties(state.Outputs, debug),
Provider: state.Provider,
InitErrors: state.InitErrors,
}
}
func (e *eventEmitter) Close() {
tryCloseEventChan(e.ch)
<-e.done
}
func (e *eventEmitter) sendEvent(event Event) {
trySendEvent(e.ch, event)
}
func (e *eventEmitter) resourceOperationFailedEvent(
step deploy.Step, status resource.Status, steps int, debug bool,
) {
contract.Requiref(e != nil, "e", "!= nil")
e.sendEvent(NewEvent(ResourceOperationFailed, ResourceOperationFailedPayload{
Metadata: makeStepEventMetadata(step.Op(), step, debug),
Status: status,
Steps: steps,
}))
}
func (e *eventEmitter) resourceOutputsEvent(op display.StepOp, step deploy.Step, planning bool, debug bool) {
contract.Requiref(e != nil, "e", "!= nil")
e.sendEvent(NewEvent(ResourceOutputsEvent, ResourceOutputsEventPayload{
Metadata: makeStepEventMetadata(op, step, debug),
Planning: planning,
Debug: debug,
}))
}
func (e *eventEmitter) resourcePreEvent(
step deploy.Step, planning bool, debug bool,
) {
contract.Requiref(e != nil, "e", "!= nil")
e.sendEvent(NewEvent(ResourcePreEvent, ResourcePreEventPayload{
Metadata: makeStepEventMetadata(step.Op(), step, debug),
Planning: planning,
Debug: debug,
}))
}
func (e *eventEmitter) preludeEvent(isPreview bool, cfg config.Map) {
contract.Requiref(e != nil, "e", "!= nil")
configStringMap := make(map[string]string, len(cfg))
for k, v := range cfg {
keyString := k.String()
valueString, err := v.Value(config.NewBlindingDecrypter())
contract.AssertNoErrorf(err, "error getting configuration value for entry %q", keyString)
configStringMap[keyString] = valueString
}
e.sendEvent(NewEvent(PreludeEvent, PreludeEventPayload{
IsPreview: isPreview,
Config: configStringMap,
}))
}
func (e *eventEmitter) summaryEvent(preview, maybeCorrupt bool, duration time.Duration,
resourceChanges display.ResourceChanges, policyPacks map[string]string,
) {
contract.Requiref(e != nil, "e", "!= nil")
e.sendEvent(NewEvent(SummaryEvent, SummaryEventPayload{
IsPreview: preview,
MaybeCorrupt: maybeCorrupt,
Duration: duration,
ResourceChanges: resourceChanges,
PolicyPacks: policyPacks,
}))
}
func (e *eventEmitter) policyViolationEvent(urn resource.URN, d plugin.AnalyzeDiagnostic) {
contract.Requiref(e != nil, "e", "!= nil")
// Write prefix.
var prefix bytes.Buffer
switch d.EnforcementLevel {
case apitype.Mandatory:
prefix.WriteString(colors.SpecError)
case apitype.Advisory:
prefix.WriteString(colors.SpecWarning)
default:
contract.Failf("Unrecognized diagnostic severity: %v", d)
}
prefix.WriteString(string(d.EnforcementLevel))
prefix.WriteString(": ")
prefix.WriteString(colors.Reset)
// Write the message itself.
var buffer bytes.Buffer
buffer.WriteString(colors.SpecNote)
buffer.WriteString(d.Message)
buffer.WriteString(colors.Reset)
buffer.WriteRune('\n')
e.sendEvent(NewEvent(PolicyViolationEvent, PolicyViolationEventPayload{
ResourceURN: urn,
Message: logging.FilterString(buffer.String()),
Color: colors.Raw,
PolicyName: d.PolicyName,
PolicyPackName: d.PolicyPackName,
PolicyPackVersion: d.PolicyPackVersion,
EnforcementLevel: d.EnforcementLevel,
Prefix: logging.FilterString(prefix.String()),
}))
}
func (e *eventEmitter) policyRemediationEvent(urn resource.URN, t plugin.Remediation,
before resource.PropertyMap, after resource.PropertyMap,
) {
contract.Requiref(e != nil, "e", "!= nil")
e.sendEvent(NewEvent(PolicyRemediationEvent, PolicyRemediationEventPayload{
ResourceURN: urn,
Color: colors.Raw,
PolicyName: t.PolicyName,
PolicyPackName: t.PolicyPackName,
PolicyPackVersion: t.PolicyPackVersion,
Before: before,
After: after,
}))
}
func diagEvent(e *eventEmitter, d *diag.Diag, prefix, msg string, sev diag.Severity,
ephemeral bool,
) {
contract.Requiref(e != nil, "e", "!= nil")
e.sendEvent(NewEvent(DiagEvent, DiagEventPayload{
URN: d.URN,
Prefix: logging.FilterString(prefix),
Message: logging.FilterString(msg),
Color: colors.Raw,
Severity: sev,
StreamID: d.StreamID,
Ephemeral: ephemeral,
}))
}
func (e *eventEmitter) diagDebugEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
diagEvent(e, d, prefix, msg, diag.Debug, ephemeral)
}
func (e *eventEmitter) diagInfoEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
diagEvent(e, d, prefix, msg, diag.Info, ephemeral)
}
func (e *eventEmitter) diagInfoerrEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
diagEvent(e, d, prefix, msg, diag.Infoerr, ephemeral)
}
func (e *eventEmitter) diagErrorEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
diagEvent(e, d, prefix, msg, diag.Error, ephemeral)
}
func (e *eventEmitter) diagWarningEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
diagEvent(e, d, prefix, msg, diag.Warning, ephemeral)
}
func filterResourceProperties(m resource.PropertyMap, debug bool) resource.PropertyMap {
return filterPropertyValue(resource.NewObjectProperty(m), debug).ObjectValue()
}
func filterPropertyValue(v resource.PropertyValue, debug bool) resource.PropertyValue {
switch {
case v.IsNull(), v.IsBool(), v.IsNumber():
return v
case v.IsString():
// have to ensure we filter out secrets.
return resource.NewStringProperty(logging.FilterString(v.StringValue()))
case v.IsAsset():
return resource.NewAssetProperty(filterAsset(v.AssetValue(), debug))
case v.IsArchive():
return resource.NewArchiveProperty(filterArchive(v.ArchiveValue(), debug))
case v.IsArray():
arr := make([]resource.PropertyValue, len(v.ArrayValue()))
for i, v := range v.ArrayValue() {
arr[i] = filterPropertyValue(v, debug)
}
return resource.NewArrayProperty(arr)
case v.IsObject():
obj := make(resource.PropertyMap, len(v.ObjectValue()))
for k, v := range v.ObjectValue() {
obj[k] = filterPropertyValue(v, debug)
}
return resource.NewObjectProperty(obj)
case v.IsComputed():
return resource.MakeComputed(filterPropertyValue(v.Input().Element, debug))
case v.IsOutput():
return resource.MakeComputed(filterPropertyValue(v.OutputValue().Element, debug))
case v.IsSecret():
return resource.MakeSecret(resource.NewStringProperty("[secret]"))
case v.IsResourceReference():
ref := v.ResourceReferenceValue()
return resource.NewResourceReferenceProperty(resource.ResourceReference{
URN: resource.URN(logging.FilterString(string(ref.URN))),
ID: filterPropertyValue(ref.ID, debug),
PackageVersion: logging.FilterString(ref.PackageVersion),
})
default:
contract.Failf("unexpected property value type %T", v.V)
return resource.PropertyValue{}
}
}
func filterAsset(v *resource.Asset, debug bool) *resource.Asset {
if !v.IsText() {
return v
}
// we don't want to include the full text of an asset as we serialize it over as
// events. They represent user files and are thus are unbounded in size. Instead,
// we only include the text if it represents a user's serialized program code, as
// that is something we want the receiver to see to display as part of
// progress/diffs/etc.
var text string
if v.IsUserProgramCode() {
// also make sure we filter this in case there are any secrets in the code.
text = logging.FilterString(resource.MassageIfUserProgramCodeAsset(v, debug).Text)
} else {
// We need to have some string here so that we preserve that this is a
// text-asset
text = "<contents elided>"
}
return &resource.Asset{
Sig: v.Sig,
Hash: v.Hash,
Text: text,
}
}
func filterArchive(v *resource.Archive, debug bool) *resource.Archive {
if !v.IsAssets() {
return v
}
assets := make(map[string]interface{})
for k, v := range v.Assets {
switch v := v.(type) {
case *resource.Asset:
assets[k] = filterAsset(v, debug)
case *resource.Archive:
assets[k] = filterArchive(v, debug)
default:
contract.Failf("Unrecognized asset map type %T", v)
}
}
return &resource.Archive{
Sig: v.Sig,
Hash: v.Hash,
Assets: assets,
}
}
// Sends an event like a normal send but recovers from a panic on a
// closed channel. This is generally a design smell and should be used
// very sparingly and every use of this function needs to document the
// need.
//
// eventEmitter uses tryEventSend to recover in the scenario of
// cancelSource.Terminate being called (such as user pressing Ctrl+C
// twice), when straggler stepExecutor workers are sending diag events
// but the engine is shutting down.
//
// See https://github.com/pulumi/pulumi/issues/10431 for the details.
func trySendEvent(ch chan<- Event, ev Event) (sent bool) {
sent = true
defer func() {
if recover() != nil {
sent = false
if logging.V(9) {
logging.V(9).Infof(
"Ignoring %v send on a closed channel %p",
ev.Type, ch)
}
}
}()
ch <- ev
return sent
}
// Tries to close a channel but recovers from a panic of closing a
// closed channel. Restrictions on use are similarly to those of
// trySendEvent.
func tryCloseEventChan(ch chan<- Event) (closed bool) {
closed = true
defer func() {
if recover() != nil {
closed = false
if logging.V(9) {
logging.V(9).Infof(
"Ignoring close of a closed event channel %p",
ch)
}
}
}()
close(ch)
return closed
}