Add tokens.StackName (#14487)
<!---
Thanks so much for your contribution! If this is your first time
contributing, please ensure that you have read the
[CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md)
documentation.
-->
# Description
<!--- Please include a summary of the change and which issue is fixed.
Please also include relevant motivation and context. -->
This adds a new type `tokens.StackName` which is a relatively strongly
typed container for a stack name. The only weakly typed aspect of it is
Go will always allow the "zero" value to be created for a struct, which
for a stack name is the empty string which is invalid. To prevent
introducing unexpected empty strings when working with stack names the
`String()` method will panic for zero initialized stack names.
Apart from the zero value, all other instances of `StackName` are via
`ParseStackName` which returns a descriptive error if the string is not
valid.
This PR only updates "pkg/" to use this type. There are a number of
places in "sdk/" which could do with this type as well, but there's no
harm in doing a staggered roll out, and some parts of "sdk/" are user
facing and will probably have to stay on the current `tokens.Name` and
`tokens.QName` types.
There are two places in the system where we panic on invalid stack
names, both in the http backend. This _should_ be fine as we've had long
standing validation that stacks created in the service are valid stack
names.
Just in case people have managed to introduce invalid stack names, there
is the `PULUMI_DISABLE_VALIDATION` environment variable which will turn
off the validation _and_ panicing for stack names. Users can use that to
temporarily disable the validation and continue working, but it should
only be seen as a temporary measure. If they have invalid names they
should rename them, or if they think they should be valid raise an issue
with us to change the validation code.
## Checklist
- [x] I have run `make tidy` to update any new dependencies
- [x] I have run `make lint` to verify my code passes the lint check
- [ ] I have formatted my code using `gofumpt`
<!--- Please provide details if the checkbox below is to be left
unchecked. -->
- [x] I have added tests that prove my fix is effective or that my
feature works
<!---
User-facing changes require a CHANGELOG entry.
-->
- [ ] I have run `make changelog` and committed the
`changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the
Pulumi Cloud,
then the service should honor older versions of the CLI where this
change would not exist.
You must then bump the API version in
/pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi
Cloud API version
<!-- @Pulumi employees: If yes, you must submit corresponding changes in
the service repo. -->
2023-11-15 07:44:54 +00:00
|
|
|
// Copyright 2016-2023, Pulumi Corporation.
|
2018-05-22 19:43:36 +00:00
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2018-03-22 17:42:43 +00:00
|
|
|
|
2018-09-04 19:38:58 +00:00
|
|
|
package httpstate
|
2018-03-22 17:42:43 +00:00
|
|
|
|
|
|
|
import (
|
2018-05-08 01:23:03 +00:00
|
|
|
"context"
|
2018-03-22 17:42:43 +00:00
|
|
|
"fmt"
|
2018-12-20 23:34:30 +00:00
|
|
|
"sync"
|
2018-03-22 17:42:43 +00:00
|
|
|
"time"
|
|
|
|
|
2023-12-21 00:13:04 +00:00
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/channel"
|
2021-03-17 13:20:05 +00:00
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
|
2022-08-31 09:33:29 +00:00
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
|
2021-03-17 13:20:05 +00:00
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
2018-11-09 18:01:29 +00:00
|
|
|
|
2021-03-17 13:20:05 +00:00
|
|
|
"github.com/pulumi/pulumi/pkg/v3/backend"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/backend/display"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/backend/httpstate/client"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/engine"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
|
|
|
|
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
|
2023-01-11 16:04:14 +00:00
|
|
|
"github.com/pulumi/pulumi/pkg/v3/secrets"
|
2021-03-17 13:20:05 +00:00
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
|
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
|
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
|
2018-03-22 17:42:43 +00:00
|
|
|
)
|
|
|
|
|
2019-04-30 18:48:41 +00:00
|
|
|
type cloudQuery struct {
|
2019-08-12 07:22:42 +00:00
|
|
|
root string
|
|
|
|
proj *workspace.Project
|
2019-04-30 18:48:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (q *cloudQuery) GetRoot() string {
|
|
|
|
return q.root
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *cloudQuery) GetProject() *workspace.Project {
|
|
|
|
return q.proj
|
|
|
|
}
|
|
|
|
|
2018-03-22 17:42:43 +00:00
|
|
|
// cloudUpdate is an implementation of engine.Update backed by remote state and a local program.
|
|
|
|
type cloudUpdate struct {
|
2018-05-08 01:23:03 +00:00
|
|
|
context context.Context
|
2018-03-22 17:42:43 +00:00
|
|
|
backend *cloudBackend
|
|
|
|
|
|
|
|
update client.UpdateIdentifier
|
|
|
|
tokenSource *tokenSource
|
|
|
|
|
2018-04-17 06:04:56 +00:00
|
|
|
root string
|
|
|
|
proj *workspace.Project
|
|
|
|
target *deploy.Target
|
2018-03-22 17:42:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (u *cloudUpdate) GetRoot() string {
|
|
|
|
return u.root
|
|
|
|
}
|
|
|
|
|
|
|
|
func (u *cloudUpdate) GetProject() *workspace.Project {
|
|
|
|
return u.proj
|
|
|
|
}
|
|
|
|
|
|
|
|
func (u *cloudUpdate) GetTarget() *deploy.Target {
|
|
|
|
return u.target
|
|
|
|
}
|
|
|
|
|
|
|
|
func (u *cloudUpdate) Complete(status apitype.UpdateStatus) error {
|
|
|
|
defer u.tokenSource.Close()
|
|
|
|
|
2023-03-01 22:03:34 +00:00
|
|
|
return u.backend.client.CompleteUpdate(u.context, u.update, status, u.tokenSource)
|
2018-03-22 17:42:43 +00:00
|
|
|
}
|
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
// recordEngineEvents will record the events with the Pulumi Service, enabling things like viewing
|
2018-12-20 23:34:30 +00:00
|
|
|
// the update logs or drilling into the timeline of an update.
|
2019-06-28 16:40:21 +00:00
|
|
|
func (u *cloudUpdate) recordEngineEvents(startingSeqNumber int, events []engine.Event) error {
|
2023-02-16 20:36:43 +00:00
|
|
|
contract.Assertf(u.tokenSource != nil, "cloud update requires a token source")
|
2018-03-31 19:08:48 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
var apiEvents apitype.EngineEventBatch
|
|
|
|
for idx, event := range events {
|
2022-04-07 16:03:19 +00:00
|
|
|
apiEvent, convErr := display.ConvertEngineEvent(event, false /* showSecrets */)
|
2019-06-28 16:40:21 +00:00
|
|
|
if convErr != nil {
|
2021-11-13 02:37:17 +00:00
|
|
|
return fmt.Errorf("converting engine event: %w", convErr)
|
2019-06-28 16:40:21 +00:00
|
|
|
}
|
2018-12-20 23:34:30 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
// Each event within an update must have a unique sequence number. Any request to
|
|
|
|
// emit an update with the same sequence number will fail. (Read: the caller needs
|
|
|
|
// to be accurate about this.)
|
|
|
|
apiEvent.Sequence = idx + startingSeqNumber
|
|
|
|
apiEvent.Timestamp = int(time.Now().Unix())
|
2018-12-20 23:34:30 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
apiEvents.Events = append(apiEvents.Events, apiEvent)
|
|
|
|
}
|
2018-03-22 17:42:43 +00:00
|
|
|
|
2023-03-01 22:03:34 +00:00
|
|
|
return u.backend.client.RecordEngineEvents(u.context, u.update, apiEvents, u.tokenSource)
|
2019-01-15 02:19:59 +00:00
|
|
|
}
|
|
|
|
|
2018-11-09 18:01:29 +00:00
|
|
|
// RecordAndDisplayEvents inspects engine events from the given channel, and prints them to the CLI as well as
|
2019-05-22 19:22:40 +00:00
|
|
|
// posting them to the Pulumi service.
|
2018-10-30 22:42:33 +00:00
|
|
|
func (u *cloudUpdate) RecordAndDisplayEvents(
|
|
|
|
label string, action apitype.UpdateKind, stackRef backend.StackReference, op backend.UpdateOperation,
|
2023-03-06 17:06:48 +00:00
|
|
|
permalink string, events <-chan engine.Event, done chan<- bool, opts display.Options, isPreview bool,
|
2023-03-03 16:36:39 +00:00
|
|
|
) {
|
2019-06-28 16:40:21 +00:00
|
|
|
// We take the channel of engine events and pass them to separate components that will display
|
|
|
|
// them to the console or persist them on the Pulumi Service. Both should terminate as soon as
|
|
|
|
// they see a CancelEvent, and when finished, close the "done" channel.
|
|
|
|
displayEvents := make(chan engine.Event) // Note: unbuffered, but we assume it won't matter in practice.
|
|
|
|
displayEventsDone := make(chan bool)
|
|
|
|
|
|
|
|
persistEvents := make(chan engine.Event, 100)
|
|
|
|
persistEventsDone := make(chan bool)
|
2018-03-22 17:42:43 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
// We close our own done channel when both of the dependent components have finished.
|
2018-12-14 03:58:26 +00:00
|
|
|
defer func() {
|
2019-06-28 16:40:21 +00:00
|
|
|
<-displayEventsDone
|
|
|
|
<-persistEventsDone
|
2018-12-14 03:58:26 +00:00
|
|
|
close(done)
|
|
|
|
}()
|
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
// Start the Go-routines for displaying and persisting events.
|
|
|
|
go display.ShowEvents(
|
2023-03-06 17:06:48 +00:00
|
|
|
label, action, stackRef.Name(), op.Proj.Name, permalink,
|
2019-06-28 16:40:21 +00:00
|
|
|
displayEvents, displayEventsDone, opts, isPreview)
|
|
|
|
go persistEngineEvents(
|
|
|
|
u, opts.Debug, /* persist debug events */
|
|
|
|
persistEvents, persistEventsDone)
|
2018-12-20 23:34:30 +00:00
|
|
|
|
|
|
|
for e := range events {
|
2018-03-22 17:42:43 +00:00
|
|
|
displayEvents <- e
|
2019-06-28 16:40:21 +00:00
|
|
|
persistEvents <- e
|
2018-03-22 17:42:43 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
// We stop reading from the event stream as soon as we see the CancelEvent,
|
|
|
|
// which will also signal the display/persist components to shutdown too.
|
2018-03-22 17:42:43 +00:00
|
|
|
if e.Type == engine.CancelEvent {
|
2018-12-24 18:23:32 +00:00
|
|
|
break
|
2018-03-22 17:42:43 +00:00
|
|
|
}
|
|
|
|
}
|
2018-12-20 23:34:30 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
// Note that we don't return immediately, the defer'd function will block until
|
|
|
|
// the display and persistence go-routines are finished processing events.
|
2018-03-22 17:42:43 +00:00
|
|
|
}
|
|
|
|
|
2019-08-12 07:22:42 +00:00
|
|
|
func (b *cloudBackend) newQuery(ctx context.Context,
|
2023-03-03 16:36:39 +00:00
|
|
|
op backend.QueryOperation,
|
|
|
|
) (engine.QueryInfo, error) {
|
2019-08-12 07:22:42 +00:00
|
|
|
return &cloudQuery{root: op.Root, proj: op.Proj}, nil
|
2019-04-30 18:48:41 +00:00
|
|
|
}
|
|
|
|
|
2019-04-18 22:57:54 +00:00
|
|
|
func (b *cloudBackend) newUpdate(ctx context.Context, stackRef backend.StackReference, op backend.UpdateOperation,
|
2023-03-03 16:36:39 +00:00
|
|
|
update client.UpdateIdentifier, token string,
|
|
|
|
) (*cloudUpdate, error) {
|
2018-03-22 17:42:43 +00:00
|
|
|
// Create a token source for this update if necessary.
|
|
|
|
var tokenSource *tokenSource
|
|
|
|
if token != "" {
|
2022-08-30 18:04:42 +00:00
|
|
|
|
|
|
|
// TODO[pulumi/pulumi#10482] instead of assuming
|
|
|
|
// expiration, consider expiration times returned by
|
|
|
|
// the backend, if any.
|
|
|
|
duration := 5 * time.Minute
|
|
|
|
assumedExpires := func() time.Time {
|
|
|
|
return time.Now().Add(duration)
|
|
|
|
}
|
|
|
|
|
|
|
|
renewLease := func(
|
|
|
|
ctx context.Context,
|
|
|
|
duration time.Duration,
|
2023-03-03 16:36:39 +00:00
|
|
|
currentToken string,
|
|
|
|
) (string, time.Time, error) {
|
2022-08-30 18:04:42 +00:00
|
|
|
tok, err := b.Client().RenewUpdateLease(
|
|
|
|
ctx, update, currentToken, duration)
|
|
|
|
if err != nil {
|
|
|
|
return "", time.Time{}, err
|
|
|
|
}
|
|
|
|
return tok, assumedExpires(), err
|
|
|
|
}
|
|
|
|
|
|
|
|
ts, err := newTokenSource(ctx, token, assumedExpires(), duration, renewLease)
|
2018-03-22 17:42:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
tokenSource = ts
|
|
|
|
}
|
|
|
|
|
|
|
|
// Construct the deployment target.
|
2023-01-11 16:04:14 +00:00
|
|
|
target, err := b.getTarget(ctx, op.SecretsProvider, stackRef,
|
|
|
|
op.StackConfiguration.Config, op.StackConfiguration.Decrypter)
|
2018-03-22 17:42:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Construct and return a new update.
|
2018-04-17 06:04:56 +00:00
|
|
|
return &cloudUpdate{
|
2018-05-08 01:23:03 +00:00
|
|
|
context: ctx,
|
2018-03-22 17:42:43 +00:00
|
|
|
backend: b,
|
|
|
|
update: update,
|
|
|
|
tokenSource: tokenSource,
|
2019-04-18 22:57:54 +00:00
|
|
|
root: op.Root,
|
|
|
|
proj: op.Proj,
|
2018-03-22 17:42:43 +00:00
|
|
|
target: target,
|
2018-04-17 06:04:56 +00:00
|
|
|
}, nil
|
2018-03-22 17:42:43 +00:00
|
|
|
}
|
|
|
|
|
2023-01-11 16:04:14 +00:00
|
|
|
func (b *cloudBackend) getSnapshot(ctx context.Context,
|
2023-03-03 16:36:39 +00:00
|
|
|
secretsProvider secrets.Provider, stackRef backend.StackReference,
|
|
|
|
) (*deploy.Snapshot, error) {
|
2020-02-13 20:25:57 +00:00
|
|
|
untypedDeployment, err := b.exportDeployment(ctx, stackRef, nil /* get latest */)
|
2018-03-22 17:42:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-05-25 20:29:59 +00:00
|
|
|
|
2023-01-11 16:04:14 +00:00
|
|
|
snapshot, err := stack.DeserializeUntypedDeployment(ctx, untypedDeployment, secretsProvider)
|
2018-03-22 17:42:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-12-04 15:12:56 +00:00
|
|
|
// Ensure the snapshot passes verification before returning it, to catch bugs early.
|
|
|
|
if !backend.DisableIntegrityChecking {
|
|
|
|
if err := snapshot.VerifyIntegrity(); err != nil {
|
|
|
|
return nil, fmt.Errorf("snapshot integrity failure; refusing to use it: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-22 22:39:13 +00:00
|
|
|
return snapshot, nil
|
|
|
|
}
|
|
|
|
|
2023-01-11 16:04:14 +00:00
|
|
|
func (b *cloudBackend) getTarget(ctx context.Context, secretsProvider secrets.Provider, stackRef backend.StackReference,
|
2023-03-03 16:36:39 +00:00
|
|
|
cfg config.Map, dec config.Decrypter,
|
|
|
|
) (*deploy.Target, error) {
|
2022-08-31 09:33:29 +00:00
|
|
|
stackID, err := b.getCloudStackIdentifier(stackRef)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-08-12 07:22:42 +00:00
|
|
|
|
2023-01-11 16:04:14 +00:00
|
|
|
snapshot, err := b.getSnapshot(ctx, secretsProvider, stackRef)
|
2018-03-22 17:42:43 +00:00
|
|
|
if err != nil {
|
2018-05-25 20:29:59 +00:00
|
|
|
switch err {
|
|
|
|
case stack.ErrDeploymentSchemaVersionTooOld:
|
|
|
|
return nil, fmt.Errorf("the stack '%s' is too old to be used by this version of the Pulumi CLI",
|
2018-09-05 14:20:25 +00:00
|
|
|
stackRef.Name())
|
2018-05-25 20:29:59 +00:00
|
|
|
case stack.ErrDeploymentSchemaVersionTooNew:
|
|
|
|
return nil, fmt.Errorf("the stack '%s' is newer than what this version of the Pulumi CLI understands. "+
|
2018-09-05 14:20:25 +00:00
|
|
|
"Please update your version of the Pulumi CLI", stackRef.Name())
|
2018-05-25 20:29:59 +00:00
|
|
|
default:
|
2021-11-13 02:37:17 +00:00
|
|
|
return nil, fmt.Errorf("could not deserialize deployment: %w", err)
|
2018-05-25 20:29:59 +00:00
|
|
|
}
|
2018-03-22 17:42:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return &deploy.Target{
|
Add tokens.StackName (#14487)
<!---
Thanks so much for your contribution! If this is your first time
contributing, please ensure that you have read the
[CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md)
documentation.
-->
# Description
<!--- Please include a summary of the change and which issue is fixed.
Please also include relevant motivation and context. -->
This adds a new type `tokens.StackName` which is a relatively strongly
typed container for a stack name. The only weakly typed aspect of it is
Go will always allow the "zero" value to be created for a struct, which
for a stack name is the empty string which is invalid. To prevent
introducing unexpected empty strings when working with stack names the
`String()` method will panic for zero initialized stack names.
Apart from the zero value, all other instances of `StackName` are via
`ParseStackName` which returns a descriptive error if the string is not
valid.
This PR only updates "pkg/" to use this type. There are a number of
places in "sdk/" which could do with this type as well, but there's no
harm in doing a staggered roll out, and some parts of "sdk/" are user
facing and will probably have to stay on the current `tokens.Name` and
`tokens.QName` types.
There are two places in the system where we panic on invalid stack
names, both in the http backend. This _should_ be fine as we've had long
standing validation that stacks created in the service are valid stack
names.
Just in case people have managed to introduce invalid stack names, there
is the `PULUMI_DISABLE_VALIDATION` environment variable which will turn
off the validation _and_ panicing for stack names. Users can use that to
temporarily disable the validation and continue working, but it should
only be seen as a temporary measure. If they have invalid names they
should rename them, or if they think they should be valid raise an issue
with us to change the validation code.
## Checklist
- [x] I have run `make tidy` to update any new dependencies
- [x] I have run `make lint` to verify my code passes the lint check
- [ ] I have formatted my code using `gofumpt`
<!--- Please provide details if the checkbox below is to be left
unchecked. -->
- [x] I have added tests that prove my fix is effective or that my
feature works
<!---
User-facing changes require a CHANGELOG entry.
-->
- [ ] I have run `make changelog` and committed the
`changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the
Pulumi Cloud,
then the service should honor older versions of the CLI where this
change would not exist.
You must then bump the API version in
/pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi
Cloud API version
<!-- @Pulumi employees: If yes, you must submit corresponding changes in
the service repo. -->
2023-11-15 07:44:54 +00:00
|
|
|
Name: stackID.Stack,
|
2022-08-31 09:33:29 +00:00
|
|
|
Organization: tokens.Name(stackID.Owner),
|
|
|
|
Config: cfg,
|
|
|
|
Decrypter: dec,
|
|
|
|
Snapshot: snapshot,
|
2018-03-22 17:42:43 +00:00
|
|
|
}, nil
|
|
|
|
}
|
2018-11-09 18:01:29 +00:00
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
func isDebugDiagEvent(e engine.Event) bool {
|
2020-07-17 06:52:31 +00:00
|
|
|
return e.Type == engine.DiagEvent && (e.Payload().(engine.DiagEventPayload)).Severity == diag.Debug
|
2019-06-28 16:40:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type engineEventBatch struct {
|
|
|
|
sequenceStart int
|
|
|
|
events []engine.Event
|
|
|
|
}
|
|
|
|
|
|
|
|
// persistEngineEvents reads from a channel of engine events and persists them on the
|
|
|
|
// Pulumi Service. This is the data that powers the logs display.
|
|
|
|
func persistEngineEvents(
|
|
|
|
update *cloudUpdate, persistDebugEvents bool,
|
2023-03-03 16:36:39 +00:00
|
|
|
events <-chan engine.Event, done chan<- bool,
|
|
|
|
) {
|
2019-06-28 16:40:21 +00:00
|
|
|
// A single update can emit hundreds, if not thousands, or tens of thousands of
|
|
|
|
// engine events. We transmit engine events in large batches to reduce the overhead
|
|
|
|
// associated with each HTTP request to the service. We also send multiple HTTP
|
|
|
|
// requests concurrently, as to not block processing subsequent engine events.
|
|
|
|
|
|
|
|
// Maximum number of events to batch up before transmitting.
|
|
|
|
const maxEventsToTransmit = 50
|
|
|
|
// Maximum wait time before sending all batched events.
|
|
|
|
const maxTransmissionDelay = 4 * time.Second
|
|
|
|
// Maximum number of concurrent requests to the Pulumi Service to persist
|
|
|
|
// engine events.
|
|
|
|
const maxConcurrentRequests = 3
|
|
|
|
|
|
|
|
// We don't want to indicate that we are done processing every engine event in the
|
|
|
|
// provided channel until every HTTP request has completed. We use a wait group to
|
|
|
|
// track all of those requests.
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
wg.Wait()
|
|
|
|
close(done)
|
|
|
|
}()
|
|
|
|
|
2023-12-21 00:13:04 +00:00
|
|
|
// Need to filter the engine events here to exclude any internal events.
|
|
|
|
events = channel.FilterRead(events, func(e engine.Event) bool {
|
|
|
|
return !e.Internal()
|
|
|
|
})
|
|
|
|
|
2019-06-28 16:40:21 +00:00
|
|
|
var eventBatch []engine.Event
|
|
|
|
maxDelayTicker := time.NewTicker(maxTransmissionDelay)
|
|
|
|
|
|
|
|
// We maintain a sequence counter for each event to ensure that the Pulumi Service can
|
2023-12-21 00:13:04 +00:00
|
|
|
// ensure events can be reconstructed in the same order they were emitted. (And not
|
2019-06-28 16:40:21 +00:00
|
|
|
// out of order from parallel writes and/or network delays.)
|
|
|
|
eventIdx := 0
|
|
|
|
|
|
|
|
// As we identify batches of engine events to transmit, we put them into a channel.
|
|
|
|
// This will allow us to issue HTTP requests concurrently, but also limit the maximum
|
|
|
|
// number of requests in-flight at any one time.
|
|
|
|
//
|
|
|
|
// This channel isn't buffered, so adding a new batch of events to persist will block
|
|
|
|
// until a go-routine is available to send the batch.
|
|
|
|
batchesToTransmit := make(chan engineEventBatch)
|
|
|
|
|
|
|
|
transmitBatchLoop := func() {
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
for eventBatch := range batchesToTransmit {
|
|
|
|
err := update.recordEngineEvents(eventBatch.sequenceStart, eventBatch.events)
|
|
|
|
if err != nil {
|
2019-07-31 18:23:33 +00:00
|
|
|
logging.V(3).Infof("error recording engine events: %s", err)
|
2019-06-28 16:40:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Start N different go-routines which will all pull from the batchesToTransmit channel
|
|
|
|
// and persist those engine events until the channel is closed.
|
|
|
|
for i := 0; i < maxConcurrentRequests; i++ {
|
2024-04-11 15:58:42 +00:00
|
|
|
wg.Add(1)
|
2019-06-28 16:40:21 +00:00
|
|
|
go transmitBatchLoop()
|
|
|
|
}
|
|
|
|
|
|
|
|
// transmitBatch sends off the current batch of engine events (eventIdx, eventBatch) to the
|
|
|
|
// batchesToTransmit channel. Will mutate eventIdx, eventBatch as a side effect.
|
|
|
|
transmitBatch := func() {
|
|
|
|
if len(eventBatch) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
batch := engineEventBatch{
|
|
|
|
sequenceStart: eventIdx,
|
|
|
|
events: eventBatch,
|
|
|
|
}
|
|
|
|
// This will block until one of the spawned go-routines is available to read the data.
|
|
|
|
// Effectively providing a global rate limit for how quickly we can send data to the
|
|
|
|
// Pulumi Service, if an update is particularly chatty.
|
|
|
|
batchesToTransmit <- batch
|
|
|
|
|
|
|
|
// With the values of eventIdx and eventBatch copied into engineEventBatch,
|
|
|
|
// we now modify their values for the next time transmitBatch is called.
|
|
|
|
eventIdx += len(eventBatch)
|
|
|
|
eventBatch = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var sawCancelEvent bool
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case e := <-events:
|
|
|
|
// Ignore debug events unless asked to.
|
|
|
|
if isDebugDiagEvent(e) && !persistDebugEvents {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stop processing once we see the CancelEvent.
|
|
|
|
if e.Type == engine.CancelEvent {
|
|
|
|
sawCancelEvent = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
eventBatch = append(eventBatch, e)
|
|
|
|
if len(eventBatch) >= maxEventsToTransmit {
|
|
|
|
transmitBatch()
|
|
|
|
}
|
|
|
|
|
|
|
|
case <-maxDelayTicker.C:
|
|
|
|
// If the ticker has fired, send any batched events. This sets an upper bound for
|
|
|
|
// the delay between the event being observed and persisted.
|
|
|
|
transmitBatch()
|
|
|
|
}
|
|
|
|
|
|
|
|
if sawCancelEvent {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Transmit any lingering events.
|
|
|
|
transmitBatch()
|
|
|
|
// Closing the batchesToTransmit channel will signal the worker persistence routines to
|
|
|
|
// terminate, which will trigger the `wg` WaitGroup to be marked as complete, which will
|
|
|
|
// finally close the `done` channel so the caller knows we are finished processing the
|
|
|
|
// engine event stream.
|
|
|
|
close(batchesToTransmit)
|
|
|
|
}
|