mirror of https://github.com/pulumi/pulumi.git
183 lines
5.6 KiB
Go
183 lines
5.6 KiB
Go
// Copyright 2016-2023, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/fsutil"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/result"
|
|
)
|
|
|
|
type QueryOptions struct {
|
|
Events eventEmitter // the channel to write events from the engine to.
|
|
Diag diag.Sink // the sink to use for diag'ing.
|
|
StatusDiag diag.Sink // the sink to use for diag'ing status messages.
|
|
host plugin.Host // the plugin host to use for this query.
|
|
pwd, main string
|
|
plugctx *plugin.Context
|
|
tracingSpan opentracing.Span
|
|
}
|
|
|
|
func Query(ctx *Context, q QueryInfo, opts UpdateOptions) error {
|
|
contract.Requiref(q != nil, "update", "cannot be nil")
|
|
contract.Requiref(ctx != nil, "ctx", "cannot be nil")
|
|
|
|
defer func() { ctx.Events <- NewCancelEvent() }()
|
|
|
|
tracingSpan := func(opName string, parentSpan opentracing.SpanContext) opentracing.Span {
|
|
// Create a root span for the operation
|
|
opts := []opentracing.StartSpanOption{}
|
|
if opName != "" {
|
|
opts = append(opts, opentracing.Tag{Key: "operation", Value: opName})
|
|
}
|
|
if parentSpan != nil {
|
|
opts = append(opts, opentracing.ChildOf(parentSpan))
|
|
}
|
|
return opentracing.StartSpan("pulumi-query", opts...)
|
|
}("query", ctx.ParentSpan)
|
|
defer tracingSpan.Finish()
|
|
|
|
emitter, err := makeQueryEventEmitter(ctx.Events)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer emitter.Close()
|
|
|
|
// First, load the package metadata and the deployment target in preparation for executing the package's program
|
|
// and creating resources. This includes fetching its pwd and main overrides.
|
|
diag := newEventSink(emitter, false)
|
|
statusDiag := newEventSink(emitter, true)
|
|
|
|
proj := q.GetProject()
|
|
contract.Assertf(proj != nil, "query project cannot be nil")
|
|
|
|
pwd, main, plugctx, err := ProjectInfoContext(&Projinfo{Proj: proj, Root: q.GetRoot()},
|
|
opts.Host, diag, statusDiag, nil, false, tracingSpan, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer plugctx.Close()
|
|
|
|
return query(ctx, q, QueryOptions{
|
|
Events: emitter,
|
|
Diag: diag,
|
|
StatusDiag: statusDiag,
|
|
host: opts.Host,
|
|
pwd: pwd,
|
|
main: main,
|
|
plugctx: plugctx,
|
|
tracingSpan: tracingSpan,
|
|
})
|
|
}
|
|
|
|
func newQuerySource(cancel context.Context, client deploy.BackendClient, q QueryInfo,
|
|
opts QueryOptions,
|
|
) (deploy.QuerySource, error) {
|
|
allPlugins, defaultProviderVersions, err := installPlugins(
|
|
cancel,
|
|
q.GetProject(),
|
|
opts.pwd,
|
|
opts.main,
|
|
nil, /*target*/
|
|
nil, /*opts*/
|
|
opts.plugctx,
|
|
false, /*returnInstallErrors*/
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Once we've installed all of the plugins we need, make sure that all analyzers and language plugins are
|
|
// loaded up and ready to go. Provider plugins are loaded lazily by the provider registry and thus don't
|
|
// need to be loaded here.
|
|
const kinds = plugin.LanguagePlugins
|
|
if err := ensurePluginsAreLoaded(opts.plugctx, allPlugins, kinds); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if opts.tracingSpan != nil {
|
|
cancel = opentracing.ContextWithSpan(cancel, opts.tracingSpan)
|
|
}
|
|
|
|
// If that succeeded, create a new source that will perform interpretation of the compiled program.
|
|
// TODO[pulumi/pulumi#88]: we are passing `nil` as the arguments map; we need to allow a way to pass these.
|
|
return deploy.NewQuerySource(cancel, opts.plugctx, client, &deploy.EvalRunInfo{
|
|
ProjectRoot: q.GetRoot(),
|
|
Proj: q.GetProject(),
|
|
Pwd: opts.pwd,
|
|
Program: opts.main,
|
|
}, defaultProviderVersions, nil)
|
|
}
|
|
|
|
func query(ctx *Context, q QueryInfo, opts QueryOptions) error {
|
|
// Make the current working directory the same as the program's, and restore it upon exit.
|
|
done, err := fsutil.Chdir(opts.plugctx.Pwd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer done()
|
|
|
|
if err := runQuery(ctx, q, opts); err != nil {
|
|
if result.IsBail(err) {
|
|
return err
|
|
}
|
|
return fmt.Errorf("an error occurred while running the query: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runQuery(cancelCtx *Context, q QueryInfo, opts QueryOptions) error {
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
// Set up a goroutine that will signal cancellation to the plan's plugins if the caller context
|
|
// is cancelled.
|
|
go func() {
|
|
<-cancelCtx.Cancel.Canceled()
|
|
|
|
logging.V(4).Infof("engine.runQuery(...): signalling cancellation to providers...")
|
|
cancelFunc()
|
|
cancelErr := opts.plugctx.Host.SignalCancellation()
|
|
if cancelErr != nil {
|
|
logging.V(4).Infof("engine.runQuery(...): failed to signal cancellation to providers: %v", cancelErr)
|
|
}
|
|
}()
|
|
|
|
src, err := newQuerySource(ctx, cancelCtx.BackendClient, q, opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
done := make(chan error)
|
|
go func() {
|
|
done <- src.Wait()
|
|
}()
|
|
|
|
// Block until query completes.
|
|
select {
|
|
case <-cancelCtx.Cancel.Terminated():
|
|
return cancelCtx.Cancel.TerminateErr()
|
|
case err := <-done:
|
|
return err
|
|
}
|
|
}
|