mirror of https://github.com/pulumi/pulumi.git
670 lines
21 KiB
Go
670 lines
21 KiB
Go
// Copyright 2016-2022, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package engine
|
|
|
|
import (
|
|
"bytes"
|
|
"time"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/display"
|
|
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/diag/colors"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/deepcopy"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
|
)
|
|
|
|
// Event represents an event generated by the engine during an operation. The underlying
|
|
// type for the `Payload` field will differ depending on the value of the `Type` field
|
|
type Event struct {
|
|
Type EventType
|
|
payload interface{}
|
|
}
|
|
|
|
func NewEvent(typ EventType, payload interface{}) Event {
|
|
ok := false
|
|
switch typ {
|
|
case CancelEvent:
|
|
ok = payload == nil
|
|
case StdoutColorEvent:
|
|
_, ok = payload.(StdoutEventPayload)
|
|
case DiagEvent:
|
|
_, ok = payload.(DiagEventPayload)
|
|
case PreludeEvent:
|
|
_, ok = payload.(PreludeEventPayload)
|
|
case SummaryEvent:
|
|
_, ok = payload.(SummaryEventPayload)
|
|
case ResourcePreEvent:
|
|
_, ok = payload.(ResourcePreEventPayload)
|
|
case ResourceOutputsEvent:
|
|
_, ok = payload.(ResourceOutputsEventPayload)
|
|
case ResourceOperationFailed:
|
|
_, ok = payload.(ResourceOperationFailedPayload)
|
|
case PolicyViolationEvent:
|
|
_, ok = payload.(PolicyViolationEventPayload)
|
|
case PolicyRemediationEvent:
|
|
_, ok = payload.(PolicyRemediationEventPayload)
|
|
default:
|
|
contract.Failf("unknown event type %v", typ)
|
|
}
|
|
contract.Assertf(ok, "invalid payload of type %T for event type %v", payload, typ)
|
|
return Event{
|
|
Type: typ,
|
|
payload: deepcopy.Copy(payload),
|
|
}
|
|
}
|
|
|
|
// EventType is the kind of event being emitted.
|
|
type EventType string
|
|
|
|
const (
|
|
CancelEvent EventType = "cancel"
|
|
StdoutColorEvent EventType = "stdoutcolor"
|
|
DiagEvent EventType = "diag"
|
|
PreludeEvent EventType = "prelude"
|
|
SummaryEvent EventType = "summary"
|
|
ResourcePreEvent EventType = "resource-pre"
|
|
ResourceOutputsEvent EventType = "resource-outputs"
|
|
ResourceOperationFailed EventType = "resource-operationfailed"
|
|
PolicyViolationEvent EventType = "policy-violation"
|
|
PolicyRemediationEvent EventType = "policy-remediation"
|
|
)
|
|
|
|
func (e Event) Payload() interface{} {
|
|
return e.payload
|
|
}
|
|
|
|
func cancelEvent() Event {
|
|
return Event{Type: CancelEvent}
|
|
}
|
|
|
|
// DiagEventPayload is the payload for an event with type `diag`
|
|
type DiagEventPayload struct {
|
|
URN resource.URN
|
|
Prefix string
|
|
Message string
|
|
Color colors.Colorization
|
|
Severity diag.Severity
|
|
StreamID int32
|
|
Ephemeral bool
|
|
}
|
|
|
|
// PolicyViolationEventPayload is the payload for an event with type `policy-violation`.
|
|
type PolicyViolationEventPayload struct {
|
|
ResourceURN resource.URN
|
|
Message string
|
|
Color colors.Colorization
|
|
PolicyName string
|
|
PolicyPackName string
|
|
PolicyPackVersion string
|
|
EnforcementLevel apitype.EnforcementLevel
|
|
Prefix string
|
|
}
|
|
|
|
// PolicyRemediationEventPayload is the payload for an event with type `policy-remediation`.
|
|
type PolicyRemediationEventPayload struct {
|
|
ResourceURN resource.URN
|
|
Color colors.Colorization
|
|
PolicyName string
|
|
PolicyPackName string
|
|
PolicyPackVersion string
|
|
Before resource.PropertyMap
|
|
After resource.PropertyMap
|
|
}
|
|
|
|
type StdoutEventPayload struct {
|
|
Message string
|
|
Color colors.Colorization
|
|
}
|
|
|
|
type PreludeEventPayload struct {
|
|
IsPreview bool // true if this prelude is for a plan operation
|
|
Config map[string]string // the keys and values for config. For encrypted config, the values may be blinded
|
|
}
|
|
|
|
type SummaryEventPayload struct {
|
|
IsPreview bool // true if this summary is for a plan operation
|
|
MaybeCorrupt bool // true if one or more resources may be corrupt
|
|
Duration time.Duration // the duration of the entire update operation (zero values for previews)
|
|
ResourceChanges display.ResourceChanges // count of changed resources, useful for reporting
|
|
PolicyPacks map[string]string // {policy-pack: version} for each policy pack applied
|
|
}
|
|
|
|
type ResourceOperationFailedPayload struct {
|
|
Metadata StepEventMetadata
|
|
Status resource.Status
|
|
Steps int
|
|
}
|
|
|
|
type ResourceOutputsEventPayload struct {
|
|
Metadata StepEventMetadata
|
|
Planning bool
|
|
Debug bool
|
|
}
|
|
|
|
type ResourcePreEventPayload struct {
|
|
Metadata StepEventMetadata
|
|
Planning bool
|
|
Debug bool
|
|
}
|
|
|
|
// StepEventMetadata contains the metadata associated with a step the engine is performing.
|
|
type StepEventMetadata struct {
|
|
Op display.StepOp // the operation performed by this step.
|
|
URN resource.URN // the resource URN (for before and after).
|
|
Type tokens.Type // the type affected by this step.
|
|
Old *StepEventStateMetadata // the state of the resource before performing this step.
|
|
New *StepEventStateMetadata // the state of the resource after performing this step.
|
|
Res *StepEventStateMetadata // the latest state for the resource that is known (worst case, old).
|
|
Keys []resource.PropertyKey // the keys causing replacement (only for CreateStep and ReplaceStep).
|
|
Diffs []resource.PropertyKey // the keys causing diffs
|
|
DetailedDiff map[string]plugin.PropertyDiff // the rich, structured diff
|
|
Logical bool // true if this step represents a logical operation in the program.
|
|
Provider string // the provider that performed this step.
|
|
}
|
|
|
|
// StepEventStateMetadata contains detailed metadata about a resource's state pertaining to a given step.
|
|
type StepEventStateMetadata struct {
|
|
// State contains the raw, complete state, for this resource.
|
|
State *resource.State
|
|
// the resource's type.
|
|
Type tokens.Type
|
|
// the resource's object urn, a human-friendly, unique name for the resource.
|
|
URN resource.URN
|
|
// true if the resource is custom, managed by a plugin.
|
|
Custom bool
|
|
// true if this resource is pending deletion due to a replacement.
|
|
Delete bool
|
|
// the resource's unique ID, assigned by the resource provider (or blank if none/uncreated).
|
|
ID resource.ID
|
|
// an optional parent URN that this resource belongs to.
|
|
Parent resource.URN
|
|
// true to "protect" this resource (protected resources cannot be deleted).
|
|
Protect bool
|
|
// RetainOnDelete is true if the resource is not physically deleted when it is logically deleted.
|
|
RetainOnDelete bool `json:"retainOnDelete"`
|
|
// the resource's input properties (as specified by the program). Note: because this will cross
|
|
// over rpc boundaries it will be slightly different than the Inputs found in resource_state.
|
|
// Specifically, secrets will have been filtered out, and large values (like assets) will be
|
|
// have a simple hash-based representation. This allows clients to display this information
|
|
// properly, without worrying about leaking sensitive data, and without having to transmit huge
|
|
// amounts of data.
|
|
Inputs resource.PropertyMap
|
|
// the resource's complete output state (as returned by the resource provider). See "Inputs"
|
|
// for additional details about how data will be transformed before going into this map.
|
|
Outputs resource.PropertyMap
|
|
// the resource's provider reference
|
|
Provider string
|
|
// InitErrors is the set of errors encountered in the process of initializing resource (i.e.,
|
|
// during create or update).
|
|
InitErrors []string
|
|
}
|
|
|
|
func makeEventEmitter(events chan<- Event, update UpdateInfo) (eventEmitter, error) {
|
|
target := update.GetTarget()
|
|
var secrets []string
|
|
if target != nil && target.Config.HasSecureValue() {
|
|
for k, v := range target.Config {
|
|
if !v.Secure() {
|
|
continue
|
|
}
|
|
|
|
secureValues, err := v.SecureValues(target.Decrypter)
|
|
if err != nil {
|
|
return eventEmitter{}, DecryptError{
|
|
Key: k,
|
|
Err: err,
|
|
}
|
|
}
|
|
secrets = append(secrets, secureValues...)
|
|
}
|
|
}
|
|
|
|
logging.AddGlobalFilter(logging.CreateFilter(secrets, "[secret]"))
|
|
|
|
buffer, done := make(chan Event), make(chan bool)
|
|
go queueEvents(events, buffer, done)
|
|
|
|
return eventEmitter{
|
|
done: done,
|
|
ch: buffer,
|
|
}, nil
|
|
}
|
|
|
|
func makeQueryEventEmitter(events chan<- Event) (eventEmitter, error) {
|
|
buffer, done := make(chan Event), make(chan bool)
|
|
|
|
go queueEvents(events, buffer, done)
|
|
|
|
return eventEmitter{
|
|
done: done,
|
|
ch: buffer,
|
|
}, nil
|
|
}
|
|
|
|
type eventEmitter struct {
|
|
done <-chan bool
|
|
ch chan<- Event
|
|
}
|
|
|
|
func queueEvents(events chan<- Event, buffer chan Event, done chan bool) {
|
|
// Instead of sending to the source channel directly, buffer events to account for slow receivers.
|
|
//
|
|
// Buffering is done by a goroutine that concurrently receives from the senders and attempts to send events to the
|
|
// receiver. Events that are received while waiting for the receiver to catch up are buffered in a slice.
|
|
//
|
|
// We do not use a buffered channel because it is empirically less likely that the goroutine reading from a
|
|
// buffered channel will be scheduled when new data is placed in the channel.
|
|
|
|
defer close(done)
|
|
contract.Assertf(buffer != nil, "buffer channel must not be nil")
|
|
|
|
var queue []Event
|
|
for {
|
|
e, ok := <-buffer
|
|
if !ok {
|
|
return
|
|
}
|
|
queue = append(queue, e)
|
|
|
|
// While there are events in the queue, attempt to send them to the waiting receiver. If the receiver is
|
|
// blocked and an event is received from the event senders, stick that event in the queue.
|
|
for len(queue) > 0 {
|
|
select {
|
|
case e, ok := <-buffer:
|
|
if !ok {
|
|
// If the event source has been closed, flush the queue.
|
|
for _, e := range queue {
|
|
trySendEvent(events, e)
|
|
}
|
|
return
|
|
}
|
|
queue = append(queue, e)
|
|
case events <- queue[0]:
|
|
queue = queue[1:]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func makeStepEventMetadata(op display.StepOp, step deploy.Step, debug bool) StepEventMetadata {
|
|
contract.Assertf(op == step.Op() || step.Op() == deploy.OpRefresh,
|
|
"step must be %v or %v, got %v", op, deploy.OpRefresh, step.Op())
|
|
|
|
var keys, diffs []resource.PropertyKey
|
|
if keyer, hasKeys := step.(interface{ Keys() []resource.PropertyKey }); hasKeys {
|
|
keys = keyer.Keys()
|
|
}
|
|
if differ, hasDiffs := step.(interface{ Diffs() []resource.PropertyKey }); hasDiffs {
|
|
diffs = differ.Diffs()
|
|
}
|
|
|
|
var detailedDiff map[string]plugin.PropertyDiff
|
|
if detailedDiffer, hasDetailedDiff := step.(interface {
|
|
DetailedDiff() map[string]plugin.PropertyDiff
|
|
}); hasDetailedDiff {
|
|
detailedDiff = detailedDiffer.DetailedDiff()
|
|
}
|
|
|
|
return StepEventMetadata{
|
|
Op: op,
|
|
URN: step.URN(),
|
|
Type: step.Type(),
|
|
Keys: keys,
|
|
Diffs: diffs,
|
|
DetailedDiff: detailedDiff,
|
|
Old: makeStepEventStateMetadata(step.Old(), debug),
|
|
New: makeStepEventStateMetadata(step.New(), debug),
|
|
Res: makeStepEventStateMetadata(step.Res(), debug),
|
|
Logical: step.Logical(),
|
|
Provider: step.Provider(),
|
|
}
|
|
}
|
|
|
|
func makeStepEventStateMetadata(state *resource.State, debug bool) *StepEventStateMetadata {
|
|
if state == nil {
|
|
return nil
|
|
}
|
|
|
|
return &StepEventStateMetadata{
|
|
State: state,
|
|
Type: state.Type,
|
|
URN: state.URN,
|
|
Custom: state.Custom,
|
|
Delete: state.Delete,
|
|
ID: state.ID,
|
|
Parent: state.Parent,
|
|
Protect: state.Protect,
|
|
RetainOnDelete: state.RetainOnDelete,
|
|
Inputs: filterResourceProperties(state.Inputs, debug),
|
|
Outputs: filterResourceProperties(state.Outputs, debug),
|
|
Provider: state.Provider,
|
|
InitErrors: state.InitErrors,
|
|
}
|
|
}
|
|
|
|
func (e *eventEmitter) Close() {
|
|
tryCloseEventChan(e.ch)
|
|
<-e.done
|
|
}
|
|
|
|
func (e *eventEmitter) sendEvent(event Event) {
|
|
trySendEvent(e.ch, event)
|
|
}
|
|
|
|
func (e *eventEmitter) resourceOperationFailedEvent(
|
|
step deploy.Step, status resource.Status, steps int, debug bool,
|
|
) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
e.sendEvent(NewEvent(ResourceOperationFailed, ResourceOperationFailedPayload{
|
|
Metadata: makeStepEventMetadata(step.Op(), step, debug),
|
|
Status: status,
|
|
Steps: steps,
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) resourceOutputsEvent(op display.StepOp, step deploy.Step, planning bool, debug bool) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
e.sendEvent(NewEvent(ResourceOutputsEvent, ResourceOutputsEventPayload{
|
|
Metadata: makeStepEventMetadata(op, step, debug),
|
|
Planning: planning,
|
|
Debug: debug,
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) resourcePreEvent(
|
|
step deploy.Step, planning bool, debug bool,
|
|
) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
e.sendEvent(NewEvent(ResourcePreEvent, ResourcePreEventPayload{
|
|
Metadata: makeStepEventMetadata(step.Op(), step, debug),
|
|
Planning: planning,
|
|
Debug: debug,
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) preludeEvent(isPreview bool, cfg config.Map) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
configStringMap := make(map[string]string, len(cfg))
|
|
for k, v := range cfg {
|
|
keyString := k.String()
|
|
valueString, err := v.Value(config.NewBlindingDecrypter())
|
|
contract.AssertNoErrorf(err, "error getting configuration value for entry %q", keyString)
|
|
configStringMap[keyString] = valueString
|
|
}
|
|
|
|
e.sendEvent(NewEvent(PreludeEvent, PreludeEventPayload{
|
|
IsPreview: isPreview,
|
|
Config: configStringMap,
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) summaryEvent(preview, maybeCorrupt bool, duration time.Duration,
|
|
resourceChanges display.ResourceChanges, policyPacks map[string]string,
|
|
) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
e.sendEvent(NewEvent(SummaryEvent, SummaryEventPayload{
|
|
IsPreview: preview,
|
|
MaybeCorrupt: maybeCorrupt,
|
|
Duration: duration,
|
|
ResourceChanges: resourceChanges,
|
|
PolicyPacks: policyPacks,
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) policyViolationEvent(urn resource.URN, d plugin.AnalyzeDiagnostic) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
// Write prefix.
|
|
var prefix bytes.Buffer
|
|
switch d.EnforcementLevel {
|
|
case apitype.Mandatory:
|
|
prefix.WriteString(colors.SpecError)
|
|
case apitype.Advisory:
|
|
prefix.WriteString(colors.SpecWarning)
|
|
default:
|
|
contract.Failf("Unrecognized diagnostic severity: %v", d)
|
|
}
|
|
|
|
prefix.WriteString(string(d.EnforcementLevel))
|
|
prefix.WriteString(": ")
|
|
prefix.WriteString(colors.Reset)
|
|
|
|
// Write the message itself.
|
|
var buffer bytes.Buffer
|
|
buffer.WriteString(colors.SpecNote)
|
|
|
|
buffer.WriteString(d.Message)
|
|
|
|
buffer.WriteString(colors.Reset)
|
|
buffer.WriteRune('\n')
|
|
|
|
e.sendEvent(NewEvent(PolicyViolationEvent, PolicyViolationEventPayload{
|
|
ResourceURN: urn,
|
|
Message: logging.FilterString(buffer.String()),
|
|
Color: colors.Raw,
|
|
PolicyName: d.PolicyName,
|
|
PolicyPackName: d.PolicyPackName,
|
|
PolicyPackVersion: d.PolicyPackVersion,
|
|
EnforcementLevel: d.EnforcementLevel,
|
|
Prefix: logging.FilterString(prefix.String()),
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) policyRemediationEvent(urn resource.URN, t plugin.Remediation,
|
|
before resource.PropertyMap, after resource.PropertyMap,
|
|
) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
e.sendEvent(NewEvent(PolicyRemediationEvent, PolicyRemediationEventPayload{
|
|
ResourceURN: urn,
|
|
Color: colors.Raw,
|
|
PolicyName: t.PolicyName,
|
|
PolicyPackName: t.PolicyPackName,
|
|
PolicyPackVersion: t.PolicyPackVersion,
|
|
Before: before,
|
|
After: after,
|
|
}))
|
|
}
|
|
|
|
func diagEvent(e *eventEmitter, d *diag.Diag, prefix, msg string, sev diag.Severity,
|
|
ephemeral bool,
|
|
) {
|
|
contract.Requiref(e != nil, "e", "!= nil")
|
|
|
|
e.sendEvent(NewEvent(DiagEvent, DiagEventPayload{
|
|
URN: d.URN,
|
|
Prefix: logging.FilterString(prefix),
|
|
Message: logging.FilterString(msg),
|
|
Color: colors.Raw,
|
|
Severity: sev,
|
|
StreamID: d.StreamID,
|
|
Ephemeral: ephemeral,
|
|
}))
|
|
}
|
|
|
|
func (e *eventEmitter) diagDebugEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
|
|
diagEvent(e, d, prefix, msg, diag.Debug, ephemeral)
|
|
}
|
|
|
|
func (e *eventEmitter) diagInfoEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
|
|
diagEvent(e, d, prefix, msg, diag.Info, ephemeral)
|
|
}
|
|
|
|
func (e *eventEmitter) diagInfoerrEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
|
|
diagEvent(e, d, prefix, msg, diag.Infoerr, ephemeral)
|
|
}
|
|
|
|
func (e *eventEmitter) diagErrorEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
|
|
diagEvent(e, d, prefix, msg, diag.Error, ephemeral)
|
|
}
|
|
|
|
func (e *eventEmitter) diagWarningEvent(d *diag.Diag, prefix, msg string, ephemeral bool) {
|
|
diagEvent(e, d, prefix, msg, diag.Warning, ephemeral)
|
|
}
|
|
|
|
func filterResourceProperties(m resource.PropertyMap, debug bool) resource.PropertyMap {
|
|
return filterPropertyValue(resource.NewObjectProperty(m), debug).ObjectValue()
|
|
}
|
|
|
|
func filterPropertyValue(v resource.PropertyValue, debug bool) resource.PropertyValue {
|
|
switch {
|
|
case v.IsNull(), v.IsBool(), v.IsNumber():
|
|
return v
|
|
case v.IsString():
|
|
// have to ensure we filter out secrets.
|
|
return resource.NewStringProperty(logging.FilterString(v.StringValue()))
|
|
case v.IsAsset():
|
|
return resource.NewAssetProperty(filterAsset(v.AssetValue(), debug))
|
|
case v.IsArchive():
|
|
return resource.NewArchiveProperty(filterArchive(v.ArchiveValue(), debug))
|
|
case v.IsArray():
|
|
arr := make([]resource.PropertyValue, len(v.ArrayValue()))
|
|
for i, v := range v.ArrayValue() {
|
|
arr[i] = filterPropertyValue(v, debug)
|
|
}
|
|
return resource.NewArrayProperty(arr)
|
|
case v.IsObject():
|
|
obj := make(resource.PropertyMap, len(v.ObjectValue()))
|
|
for k, v := range v.ObjectValue() {
|
|
obj[k] = filterPropertyValue(v, debug)
|
|
}
|
|
return resource.NewObjectProperty(obj)
|
|
case v.IsComputed():
|
|
return resource.MakeComputed(filterPropertyValue(v.Input().Element, debug))
|
|
case v.IsOutput():
|
|
return resource.MakeComputed(filterPropertyValue(v.OutputValue().Element, debug))
|
|
case v.IsSecret():
|
|
return resource.MakeSecret(resource.NewStringProperty("[secret]"))
|
|
case v.IsResourceReference():
|
|
ref := v.ResourceReferenceValue()
|
|
return resource.NewResourceReferenceProperty(resource.ResourceReference{
|
|
URN: resource.URN(logging.FilterString(string(ref.URN))),
|
|
ID: filterPropertyValue(ref.ID, debug),
|
|
PackageVersion: logging.FilterString(ref.PackageVersion),
|
|
})
|
|
default:
|
|
contract.Failf("unexpected property value type %T", v.V)
|
|
return resource.PropertyValue{}
|
|
}
|
|
}
|
|
|
|
func filterAsset(v *resource.Asset, debug bool) *resource.Asset {
|
|
if !v.IsText() {
|
|
return v
|
|
}
|
|
|
|
// we don't want to include the full text of an asset as we serialize it over as
|
|
// events. They represent user files and are thus are unbounded in size. Instead,
|
|
// we only include the text if it represents a user's serialized program code, as
|
|
// that is something we want the receiver to see to display as part of
|
|
// progress/diffs/etc.
|
|
var text string
|
|
if v.IsUserProgramCode() {
|
|
// also make sure we filter this in case there are any secrets in the code.
|
|
text = logging.FilterString(resource.MassageIfUserProgramCodeAsset(v, debug).Text)
|
|
} else {
|
|
// We need to have some string here so that we preserve that this is a
|
|
// text-asset
|
|
text = "<contents elided>"
|
|
}
|
|
|
|
return &resource.Asset{
|
|
Sig: v.Sig,
|
|
Hash: v.Hash,
|
|
Text: text,
|
|
}
|
|
}
|
|
|
|
func filterArchive(v *resource.Archive, debug bool) *resource.Archive {
|
|
if !v.IsAssets() {
|
|
return v
|
|
}
|
|
|
|
assets := make(map[string]interface{})
|
|
for k, v := range v.Assets {
|
|
switch v := v.(type) {
|
|
case *resource.Asset:
|
|
assets[k] = filterAsset(v, debug)
|
|
case *resource.Archive:
|
|
assets[k] = filterArchive(v, debug)
|
|
default:
|
|
contract.Failf("Unrecognized asset map type %T", v)
|
|
}
|
|
}
|
|
return &resource.Archive{
|
|
Sig: v.Sig,
|
|
Hash: v.Hash,
|
|
Assets: assets,
|
|
}
|
|
}
|
|
|
|
// Sends an event like a normal send but recovers from a panic on a
|
|
// closed channel. This is generally a design smell and should be used
|
|
// very sparingly and every use of this function needs to document the
|
|
// need.
|
|
//
|
|
// eventEmitter uses tryEventSend to recover in the scenario of
|
|
// cancelSource.Terminate being called (such as user pressing Ctrl+C
|
|
// twice), when straggler stepExecutor workers are sending diag events
|
|
// but the engine is shutting down.
|
|
//
|
|
// See https://github.com/pulumi/pulumi/issues/10431 for the details.
|
|
func trySendEvent(ch chan<- Event, ev Event) (sent bool) {
|
|
sent = true
|
|
defer func() {
|
|
if recover() != nil {
|
|
sent = false
|
|
if logging.V(9) {
|
|
logging.V(9).Infof(
|
|
"Ignoring %v send on a closed channel %p",
|
|
ev.Type, ch)
|
|
}
|
|
}
|
|
}()
|
|
ch <- ev
|
|
return sent
|
|
}
|
|
|
|
// Tries to close a channel but recovers from a panic of closing a
|
|
// closed channel. Restrictions on use are similarly to those of
|
|
// trySendEvent.
|
|
func tryCloseEventChan(ch chan<- Event) (closed bool) {
|
|
closed = true
|
|
defer func() {
|
|
if recover() != nil {
|
|
closed = false
|
|
if logging.V(9) {
|
|
logging.V(9).Infof(
|
|
"Ignoring close of a closed event channel %p",
|
|
ch)
|
|
}
|
|
}
|
|
}()
|
|
close(ch)
|
|
return closed
|
|
}
|