// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package plugin import ( "bufio" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "strconv" "strings" "sync/atomic" "syscall" "time" multierror "github.com/hashicorp/go-multierror" opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" "github.com/pulumi/pulumi/sdk/v3/go/common/apitype" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" ) // PulumiPluginJSON represents additional information about a package's associated Pulumi plugin. // For Python, the content is inside a pulumi-plugin.json file inside the package. // For Node.js, the content is within the package.json file, under the "pulumi" node. // For .NET, the content is inside a pulumi-plugin.json file inside the NuGet package. // For Go, the content is inside a pulumi-plugin.json file inside the module. type PulumiPluginJSON struct { // Indicates whether the package has an associated resource plugin. Set to false to indicate no plugin. Resource bool `json:"resource"` // Optional plugin name. If not set, the plugin name is derived from the package name. Name string `json:"name,omitempty"` // Optional plugin version. If not set, the version is derived from the package version (if possible). Version string `json:"version,omitempty"` // Optional plugin server. If not set, the default server is used when installing the plugin. Server string `json:"server,omitempty"` } func (plugin *PulumiPluginJSON) JSON() ([]byte, error) { json, err := json.MarshalIndent(plugin, "", " ") if err != nil { return nil, err } return append(json, '\n'), nil } func LoadPulumiPluginJSON(path string) (*PulumiPluginJSON, error) { b, err := os.ReadFile(path) if err != nil { // Deliberately not wrapping the error here so that os.IsNotExist checks can be used to determine // if the file could not be opened due to it not existing. return nil, err } plugin := &PulumiPluginJSON{} if err := json.Unmarshal(b, plugin); err != nil { return nil, err } return plugin, nil } type plugin struct { stdoutDone <-chan bool stderrDone <-chan bool Bin string Args []string // Env specifies the environment of the plugin in the same format as go's os/exec.Cmd.Env // https://golang.org/pkg/os/exec/#Cmd (each entry is of the form "key=value"). Env []string Conn *grpc.ClientConn // Function to trigger the plugin to be killed. If the plugin is a process this will just SIGKILL it. Kill func() error Stdin io.WriteCloser Stdout io.ReadCloser Stderr io.ReadCloser } // pluginRPCConnectionTimeout dictates how long we wait for the plugin's RPC to become available. var pluginRPCConnectionTimeout = time.Second * 10 // A unique ID provided to the output stream of each plugin. This allows the output of the plugin // to be streamed to the display, while still allowing that output to be sent a small piece at a // time. var nextStreamID int32 // errRunPolicyModuleNotFound is returned when we determine that the plugin failed to load because // the stack's Pulumi SDK did not have the required modules. i.e. is too old. var errRunPolicyModuleNotFound = errors.New("pulumi SDK does not support policy as code") // errPluginNotFound is returned when we try to execute a plugin but it is not found on disk. var errPluginNotFound = errors.New("plugin not found") func dialPlugin(portNum int, bin, prefix string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) { port := strconv.Itoa(portNum) // Now that we have the port, go ahead and create a gRPC client connection to it. conn, err := grpc.Dial("127.0.0.1:"+port, dialOptions...) if err != nil { return nil, fmt.Errorf("could not dial plugin [%v] over RPC: %w", bin, err) } // Now wait for the gRPC connection to the plugin to become ready. // TODO[pulumi/pulumi#337]: in theory, this should be unnecessary. gRPC's default WaitForReady behavior // should auto-retry appropriately. On Linux, however, we are observing different behavior. In the meantime // while this bug exists, we'll simply do a bit of waiting of our own up front. timeout, _ := context.WithTimeout(context.Background(), pluginRPCConnectionTimeout) for { s := conn.GetState() if s == connectivity.Ready { // The connection is supposedly ready; but we will make sure it is *actually* ready by sending a dummy // method invocation to the server. Until it responds successfully, we can't safely proceed. outer: for { err = grpc.Invoke(timeout, "", nil, nil, conn) if err == nil { break // successful connect } // We have an error; see if it's a known status and, if so, react appropriately. status, ok := status.FromError(err) if ok { //nolint:exhaustive // we have a default case for other statuses switch status.Code() { case codes.Unavailable: // The server is unavailable. This is the Linux bug. Wait a little and retry. time.Sleep(time.Millisecond * 10) continue // keep retrying default: // Since we sent "" as the method above, this is the expected response. Ready to go. break outer } } // Unexpected error; get outta dodge. return nil, fmt.Errorf("%v plugin [%v] did not come alive: %w", prefix, bin, err) } break } // Not ready yet; ask the gRPC client APIs to block until the state transitions again so we can retry. if !conn.WaitForStateChange(timeout, s) { return nil, fmt.Errorf("%v plugin [%v] did not begin responding to RPC connections", prefix, bin) } } return conn, nil } func newPlugin(ctx *Context, pwd, bin, prefix string, kind apitype.PluginKind, args, env []string, dialOptions []grpc.DialOption, ) (*plugin, error) { if logging.V(9) { var argstr string for i, arg := range args { if i > 0 { argstr += "," } argstr += arg } logging.V(9).Infof("newPlugin(): Launching plugin '%v' from '%v' with args: %v", prefix, bin, argstr) } // Create a span for the plugin initialization opts := []opentracing.StartSpanOption{ opentracing.Tag{Key: "prefix", Value: prefix}, opentracing.Tag{Key: "bin", Value: bin}, opentracing.Tag{Key: "pulumi-decorator", Value: prefix + ":" + bin}, } if ctx != nil && ctx.tracingSpan != nil { opts = append(opts, opentracing.ChildOf(ctx.tracingSpan.Context())) } tracingSpan := opentracing.StartSpan("newPlugin", opts...) defer tracingSpan.Finish() // Try to execute the binary. plug, err := execPlugin(ctx, bin, prefix, kind, args, pwd, env) if err != nil { return nil, fmt.Errorf("failed to load plugin %s: %w", bin, err) } contract.Assertf(plug != nil, "plugin %v canot be nil", bin) // If we did not successfully launch the plugin, we still need to wait for stderr and stdout to drain. defer func() { if plug.Conn == nil { contract.IgnoreError(plug.Close()) } }() outStreamID := atomic.AddInt32(&nextStreamID, 1) errStreamID := atomic.AddInt32(&nextStreamID, 1) // For now, we will spawn goroutines that will spew STDOUT/STDERR to the relevant diag streams. var sawPolicyModuleNotFoundErr bool runtrace := func(t io.Reader, stderr bool, done chan<- bool) { reader := bufio.NewReader(t) for { msg, readerr := reader.ReadString('\n') // Even if we've hit the end of the stream, we want to check for non-empty content. // The reason is that if the last line is missing a \n, we still want to include it. if strings.TrimSpace(msg) != "" { // We may be trying to run a plugin that isn't present in the SDK installed with the Policy Pack. // e.g. the stack's package.json does not contain a recent enough @pulumi/pulumi. // // Rather than fail with an opaque error because we didn't get the gRPC port, inspect if it // is a well-known problem and return a better error as appropriate. if strings.Contains(msg, "Cannot find module '@pulumi/pulumi/cmd/run-policy-pack'") { sawPolicyModuleNotFoundErr = true } if stderr { ctx.Diag.Infoerrf(diag.StreamMessage("" /*urn*/, msg, errStreamID)) } else { ctx.Diag.Infof(diag.StreamMessage("" /*urn*/, msg, outStreamID)) } } // If we've hit the end of the stream, break out and close the channel. if readerr != nil { break } } close(done) } // Set up a tracer on stderr before going any further, since important errors might get communicated this way. stderrDone := make(chan bool) plug.stderrDone = stderrDone go runtrace(plug.Stderr, true, stderrDone) // Now that we have a process, we expect it to write a single line to STDOUT: the port it's listening on. We only // read a byte at a time so that STDOUT contains everything after the first newline. var portString string b := make([]byte, 1) for { n, readerr := plug.Stdout.Read(b) if readerr != nil { killerr := plug.Kill() contract.IgnoreError(killerr) // We are ignoring because the readerr trumps it. // If from the output we have seen, return a specific error if possible. if sawPolicyModuleNotFoundErr { return nil, errRunPolicyModuleNotFound } // Fall back to a generic, opaque error. if portString == "" { return nil, fmt.Errorf("could not read plugin [%v] stdout: %w", bin, readerr) } return nil, fmt.Errorf("failure reading plugin [%v] stdout (read '%v'): %w", bin, portString, readerr) } if n > 0 && b[0] == '\n' { break } portString += string(b[:n]) } // Trim any whitespace from the first line (this is to handle things like windows that will write // "1234\r\n", or slightly odd providers that might add whitespace like "1234 ") portString = strings.TrimSpace(portString) // Parse the output line (minus the '\n') to ensure it's a numeric port. var port int if port, err = strconv.Atoi(portString); err != nil { killerr := plug.Kill() contract.IgnoreError(killerr) // ignoring the error because the existing one trumps it. return nil, fmt.Errorf( "%v plugin [%v] wrote a non-numeric port to stdout ('%v'): %w", prefix, bin, port, err) } // After reading the port number, set up a tracer on stdout just so other output doesn't disappear. stdoutDone := make(chan bool) plug.stdoutDone = stdoutDone go runtrace(plug.Stdout, false, stdoutDone) conn, err := dialPlugin(port, bin, prefix, dialOptions) if err != nil { return nil, err } // Done; store the connection and return the plugin info. plug.Conn = conn return plug, nil } // execPlugin starts the plugin executable. func execPlugin(ctx *Context, bin, prefix string, kind apitype.PluginKind, pluginArgs []string, pwd string, env []string, ) (*plugin, error) { args := buildPluginArguments(pluginArgumentOptions{ pluginArgs: pluginArgs, tracingEndpoint: cmdutil.TracingEndpoint, logFlow: logging.LogFlow, logToStderr: logging.LogToStderr, verbose: logging.Verbose, }) // Check to see if we have a binary we can invoke directly if _, err := os.Stat(bin); os.IsNotExist(err) { // If we don't have the expected binary, see if we have a "PulumiPlugin.yaml" or "PulumiPolicy.yaml" pluginDir := filepath.Dir(bin) var runtimeInfo workspace.ProjectRuntimeInfo if kind == apitype.ResourcePlugin || kind == apitype.ConverterPlugin { proj, err := workspace.LoadPluginProject(filepath.Join(pluginDir, "PulumiPlugin.yaml")) if err != nil { return nil, fmt.Errorf("loading PulumiPlugin.yaml: %w", err) } runtimeInfo = proj.Runtime } else if kind == apitype.AnalyzerPlugin { proj, err := workspace.LoadPluginProject(filepath.Join(pluginDir, "PulumiPolicy.yaml")) if err != nil { return nil, fmt.Errorf("loading PulumiPolicy.yaml: %w", err) } runtimeInfo = proj.Runtime } else { return nil, errors.New("language plugins must be executable binaries") } logging.V(9).Infof("Launching plugin '%v' from '%v' via runtime '%s'", prefix, pluginDir, runtimeInfo.Name()) info := NewProgramInfo(pluginDir, pluginDir, ".", runtimeInfo.Options()) runtime, err := ctx.Host.LanguageRuntime(runtimeInfo.Name(), info) if err != nil { return nil, fmt.Errorf("loading runtime: %w", err) } stdout, stderr, kill, err := runtime.RunPlugin(RunPluginInfo{ Info: info, WorkingDirectory: ctx.Pwd, Args: pluginArgs, Env: env, }) if err != nil { return nil, err } return &plugin{ Bin: bin, Args: args, Env: env, Kill: func() error { kill(); return nil }, Stdout: io.NopCloser(stdout), Stderr: io.NopCloser(stderr), }, nil } cmd := exec.Command(bin, args...) cmdutil.RegisterProcessGroup(cmd) cmd.Dir = pwd if len(env) > 0 { cmd.Env = env } in, _ := cmd.StdinPipe() out, _ := cmd.StdoutPipe() err, _ := cmd.StderrPipe() if err := cmd.Start(); err != nil { // If we try to run a plugin that isn't found, intercept the error // and instead return a custom one so we can more easily check for // it upstream // // In the case of PAC, note that the plugin usually _does_ exist. // It is a shell script like "pulumi-analyzer-policy". But during // the execution of that script, it fails with the ENOENT error. if pathErr, ok := err.(*os.PathError); ok { syscallErr, ok := pathErr.Err.(syscall.Errno) if ok && syscallErr == syscall.ENOENT { return nil, errPluginNotFound } } return nil, err } kill := func() error { var result *multierror.Error // On each platform, plugins are not loaded directly, instead a shell launches each plugin as a child process, so // instead we need to kill all the children of the PID we have recorded, as well. Otherwise we will block waiting // for the child processes to close. if err := cmdutil.KillChildren(cmd.Process.Pid); err != nil { result = multierror.Append(result, err) } // IDEA: consider a more graceful termination than just SIGKILL. if err := cmd.Process.Kill(); err != nil { result = multierror.Append(result, err) } return result.ErrorOrNil() } return &plugin{ Bin: bin, Args: args, Env: env, Kill: kill, Stdin: in, Stdout: out, Stderr: err, }, nil } type pluginArgumentOptions struct { pluginArgs []string tracingEndpoint string logFlow, logToStderr bool verbose int } func buildPluginArguments(opts pluginArgumentOptions) []string { var args []string // Flow the logging information if set. if opts.logFlow { if opts.logToStderr { args = append(args, "--logtostderr") } if opts.verbose > 0 { args = append(args, "-v="+strconv.Itoa(opts.verbose)) } } if opts.tracingEndpoint != "" { args = append(args, "--tracing", opts.tracingEndpoint) } args = append(args, opts.pluginArgs...) return args } func (p *plugin) Close() error { if p.Conn != nil { contract.IgnoreClose(p.Conn) } result := p.Kill() // Wait for stdout and stderr to drain if we attached to the plugin we won't have a stdout/err if p.stdoutDone != nil { <-p.stdoutDone } if p.stderrDone != nil { <-p.stderrDone } return result }