pulumi/pkg/engine/query.go

174 lines
5.5 KiB
Go

// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package engine
import (
"context"
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/fsutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/result"
)
type QueryOptions struct {
Events eventEmitter // the channel to write events from the engine to.
Diag diag.Sink // the sink to use for diag'ing.
StatusDiag diag.Sink // the sink to use for diag'ing status messages.
host plugin.Host // the plugin host to use for this query.
pwd, main string
plugctx *plugin.Context
tracingSpan opentracing.Span
}
func Query(ctx *Context, q QueryInfo, opts UpdateOptions) error {
contract.Requiref(q != nil, "update", "cannot be nil")
contract.Requiref(ctx != nil, "ctx", "cannot be nil")
defer func() { ctx.Events <- cancelEvent() }()
tracingSpan := func(opName string, parentSpan opentracing.SpanContext) opentracing.Span {
// Create a root span for the operation
opts := []opentracing.StartSpanOption{}
if opName != "" {
opts = append(opts, opentracing.Tag{Key: "operation", Value: opName})
}
if parentSpan != nil {
opts = append(opts, opentracing.ChildOf(parentSpan))
}
return opentracing.StartSpan("pulumi-query", opts...)
}("query", ctx.ParentSpan)
defer tracingSpan.Finish()
emitter, err := makeQueryEventEmitter(ctx.Events)
if err != nil {
return err
}
defer emitter.Close()
// First, load the package metadata and the deployment target in preparation for executing the package's program
// and creating resources. This includes fetching its pwd and main overrides.
diag := newEventSink(emitter, false)
statusDiag := newEventSink(emitter, true)
proj := q.GetProject()
contract.Assertf(proj != nil, "query project cannot be nil")
pwd, main, plugctx, err := ProjectInfoContext(&Projinfo{Proj: proj, Root: q.GetRoot()},
opts.Host, diag, statusDiag, false, tracingSpan, nil)
if err != nil {
return err
}
defer plugctx.Close()
return query(ctx, q, QueryOptions{
Events: emitter,
Diag: diag,
StatusDiag: statusDiag,
host: opts.Host,
pwd: pwd,
main: main,
plugctx: plugctx,
tracingSpan: tracingSpan,
})
}
func newQuerySource(cancel context.Context, client deploy.BackendClient, q QueryInfo,
opts QueryOptions,
) (deploy.QuerySource, error) {
allPlugins, defaultProviderVersions, err := installPlugins(cancel, q.GetProject(), opts.pwd, opts.main,
nil, opts.plugctx, false /*returnInstallErrors*/)
if err != nil {
return nil, err
}
// Once we've installed all of the plugins we need, make sure that all analyzers and language plugins are
// loaded up and ready to go. Provider plugins are loaded lazily by the provider registry and thus don't
// need to be loaded here.
const kinds = plugin.LanguagePlugins
if err := ensurePluginsAreLoaded(opts.plugctx, allPlugins, kinds); err != nil {
return nil, err
}
if opts.tracingSpan != nil {
cancel = opentracing.ContextWithSpan(cancel, opts.tracingSpan)
}
// If that succeeded, create a new source that will perform interpretation of the compiled program.
// TODO[pulumi/pulumi#88]: we are passing `nil` as the arguments map; we need to allow a way to pass these.
return deploy.NewQuerySource(cancel, opts.plugctx, client, &deploy.EvalRunInfo{
Proj: q.GetProject(),
Pwd: opts.pwd,
Program: opts.main,
}, defaultProviderVersions, nil)
}
func query(ctx *Context, q QueryInfo, opts QueryOptions) error {
// Make the current working directory the same as the program's, and restore it upon exit.
done, err := fsutil.Chdir(opts.plugctx.Pwd)
if err != nil {
return err
}
defer done()
if err := runQuery(ctx, q, opts); err != nil {
if result.IsBail(err) {
return err
}
return fmt.Errorf("an error occurred while running the query: %w", err)
}
return nil
}
func runQuery(cancelCtx *Context, q QueryInfo, opts QueryOptions) error {
ctx, cancelFunc := context.WithCancel(context.Background())
// Set up a goroutine that will signal cancellation to the plan's plugins if the caller context
// is cancelled.
go func() {
<-cancelCtx.Cancel.Canceled()
logging.V(4).Infof("engine.runQuery(...): signalling cancellation to providers...")
cancelFunc()
cancelErr := opts.plugctx.Host.SignalCancellation()
if cancelErr != nil {
logging.V(4).Infof("engine.runQuery(...): failed to signal cancellation to providers: %v", cancelErr)
}
}()
src, err := newQuerySource(ctx, cancelCtx.BackendClient, q, opts)
if err != nil {
return err
}
done := make(chan error)
go func() {
done <- src.Wait()
}()
// Block until query completes.
select {
case <-cancelCtx.Cancel.Terminated():
return cancelCtx.Cancel.TerminateErr()
case err := <-done:
return err
}
}