pulumi/pkg/engine/progress.go

107 lines
2.9 KiB
Go
Raw Normal View History

Use events to report downloads as system messages (#17019) When running a Pulumi operation such as a preview or an update, Pulumi will detect if plugins (e.g. an AWS provider) are missing and download and install them appropriately. Presently the user experience when this happens isn't great (see e.g. https://github.com/pulumi/pulumi/issues/14250), making it hard for the user to see what is happening/what is taking so long when required plugins are large. This commit attempts to rectify this by continuing the work in https://github.com/pulumi/pulumi/pull/16094 that tracks download progress using engine events. In doing so, we are able to render multiple downloads as part of the existing "system messages" panel in the Pulumi output, and provide a clean view of what is going on when downloads must occur before program execution. Moreover, we generalise that PR to handle any engine process, enabling us to play a similar trick with plugin installations (which can also take a while). To preserve existing behaviour, we introduce a new class for these events which we call "ephemeral", meaning that they are not persisted or rendered in contexts such as diffs, for instance. Specifically, ephemeral events are *not* sent to HTTP backends (i.e. the service), so this commit should not require any changes to the service before merging and releasing. Fixes #14250 Closes #16094 Closes #16937 https://github.com/user-attachments/assets/f0fac5e9-b3c8-4ea7-9cb7-075fc4b625d9 https://github.com/user-attachments/assets/7a761aa9-10ad-4f66-afa3-e4550b4553a5
2024-09-03 12:12:04 +00:00
// Copyright 2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"io"
"time"
)
// Creates a new ReadCloser that reports ProgressEvents as bytes are read and
// when it is closed. A Done ProgressEvent will only be reported once, on the
// first call to Close(). Subsequent calls to Close() will be forwarded to the
// underlying ReadCloser, but will not yield duplicate ProgressEvents.
func NewProgressReportingCloser(
events eventEmitter,
typ ProgressType,
id string,
message string,
size int64,
reportingInterval time.Duration,
closer io.ReadCloser,
) io.ReadCloser {
if size == -1 {
return closer
}
return &progressReportingCloser{
events: events,
typ: typ,
id: id,
message: message,
received: 0,
total: size,
lastReported: time.Now(),
reportingInterval: reportingInterval,
closed: false,
closer: closer,
}
}
// A ReadCloser implementation that reports ProgressEvents to an
// underlying eventEmitter as bytes are read and when it is closed.
type progressReportingCloser struct {
// The eventEmitter to report progress events to.
events eventEmitter
// The type of progress being reported.
typ ProgressType
// A unique ID for the download being reported.
id string
// A message to include in progress events.
message string
// The number of bytes received so far.
received int64
// The total number of bytes expected.
total int64
// The last time a progress event was reported.
lastReported time.Time
// The interval at which progress events should be reported.
reportingInterval time.Duration
// True if the underlying ReadCloser has been closed.
closed bool
// The underlying ReadCloser to read from.
closer io.ReadCloser
}
func (d *progressReportingCloser) Read(p []byte) (n int, err error) {
n, err = d.closer.Read(p)
if n != 0 {
d.received += int64(n)
now := time.Now()
interval := now.Sub(d.lastReported)
if interval > d.reportingInterval {
d.lastReported = now
d.events.progressEvent(d.typ, d.id, d.message, d.received, d.total, false)
}
}
return
}
func (d *progressReportingCloser) Close() error {
// We'll always forward the Close() call to the underlying ReadCloser, but
// we'll only report a Done event once.
err := d.closer.Close()
if !d.closed {
d.events.progressEvent(d.typ, d.id, d.message, d.received, d.total, true)
}
d.closed = true
return err
}