// Copyright 2016-2023, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // pulumi-language-python serves as the "language host" for Pulumi programs written in Python. It is ultimately // responsible for spawning the language runtime that executes the program. // // The program being executed is executed by a shim script called `pulumi-language-python-exec`. This script is // written in the hosted language (in this case, Python) and is responsible for initiating RPC links to the resource // monitor and engine. // // It's therefore the responsibility of this program to implement the LanguageHostServer endpoint by spawning // instances of `pulumi-language-python-exec` and forwarding the RPC request arguments to the command-line. package main import ( "bytes" "context" "encoding/json" "errors" "flag" "fmt" "math/rand" "os" "os/exec" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" "unicode" "github.com/blang/semver" "github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin" "github.com/pulumi/pulumi/sdk/v3/go/common/slice" "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/fsutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil" "github.com/pulumi/pulumi/sdk/v3/go/common/version" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go" "github.com/pulumi/pulumi/sdk/v3/python/toolchain" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/emptypb" hclsyntax "github.com/pulumi/pulumi/pkg/v3/codegen/hcl2/syntax" "github.com/pulumi/pulumi/pkg/v3/codegen/pcl" codegen "github.com/pulumi/pulumi/pkg/v3/codegen/python" "github.com/pulumi/pulumi/pkg/v3/codegen/schema" ) const ( // By convention, the executor is the name of the current program (pulumi-language-python) plus this suffix. pythonDefaultExec = "pulumi-language-python-exec" // the exec shim for Pulumi to run Python programs. // The runtime expects the config object to be saved to this environment variable. pulumiConfigVar = "PULUMI_CONFIG" // The runtime expects the array of secret config keys to be saved to this environment variable. //nolint:gosec pulumiConfigSecretKeysVar = "PULUMI_CONFIG_SECRET_KEYS" // A exit-code we recognize when the python process exits. If we see this error, there's no // need for us to print any additional error messages since the user already got a a good // one they can handle. pythonProcessExitedAfterShowingUserActionableMessage = 32 ) var ( // The minimum python version that Pulumi supports minimumSupportedPythonVersion = semver.MustParse("3.7.0") // Any version less then `eolPythonVersion` is EOL. eolPythonVersion = semver.MustParse("3.7.0") // An url to the issue discussing EOL. eolPythonVersionIssue = "https://github.com/pulumi/pulumi/issues/8131" ) // Launches the language host RPC endpoint, which in turn fires up an RPC server implementing the // LanguageRuntimeServer RPC endpoint. func main() { var tracing string flag.StringVar(&tracing, "tracing", "", "Emit tracing to a Zipkin-compatible tracing endpoint") flag.String("virtualenv", "", "[obsolete] Virtual environment path to use") flag.String("root", "", "[obsolete] Project root path to use") flag.String("typechecker", "", "[obsolete] Use a typechecker to type check") flag.String("toolchain", "pip", "[obsolete] Select the package manager to use for dependency management.") // You can use the below flag to request that the language host load a specific executor instead of probing the // PATH. This can be used during testing to override the default location. var givenExecutor string flag.StringVar(&givenExecutor, "use-executor", "", "Use the given program as the executor instead of looking for one on PATH") flag.Parse() args := flag.Args() logging.InitLogging(false, 0, false) cmdutil.InitTracing("pulumi-language-python", "pulumi-language-python", tracing) var pythonExec string if givenExecutor == "" { // By default, the -exec script is installed next to the language host. thisPath, err := os.Executable() if err != nil { err = fmt.Errorf("could not determine current executable: %w", err) cmdutil.Exit(err) } pathExec := filepath.Join(filepath.Dir(thisPath), pythonDefaultExec) if _, err = os.Stat(pathExec); os.IsNotExist(err) { err = fmt.Errorf("missing executor %s: %w", pathExec, err) cmdutil.Exit(err) } logging.V(3).Infof("language host identified executor from path: `%s`", pathExec) pythonExec = pathExec } else { logging.V(3).Infof("language host asked to use specific executor: `%s`", givenExecutor) pythonExec = givenExecutor } // Optionally pluck out the engine so we can do logging, etc. var engineAddress string if len(args) > 0 { engineAddress = args[0] } ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) // map the context Done channel to the rpcutil boolean cancel channel cancelChannel := make(chan bool) go func() { <-ctx.Done() cancel() // deregister signal handler close(cancelChannel) }() err := rpcutil.Healthcheck(ctx, engineAddress, 5*time.Minute, cancel) if err != nil { cmdutil.Exit(fmt.Errorf("could not start health check host RPC server: %w", err)) } // Fire up a gRPC server, letting the kernel choose a free port. handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ Cancel: cancelChannel, Init: func(srv *grpc.Server) error { host := newLanguageHost(pythonExec, engineAddress, tracing, false) pulumirpc.RegisterLanguageRuntimeServer(srv, host) return nil }, Options: rpcutil.OpenTracingServerInterceptorOptions(nil), }) if err != nil { cmdutil.Exit(fmt.Errorf("could not start language host RPC server: %w", err)) } // Otherwise, print out the port so that the spawner knows how to reach us. fmt.Printf("%d\n", handle.Port) // And finally wait for the server to stop serving. if err := <-handle.Done; err != nil { cmdutil.Exit(fmt.Errorf("language host RPC stopped serving: %w", err)) } } // pythonLanguageHost implements the LanguageRuntimeServer interface // for use as an API endpoint. type pythonLanguageHost struct { pulumirpc.UnimplementedLanguageRuntimeServer exec string engineAddress string tracing string // used by language conformance tests to enable the PyProject.Enabled schema option useToml bool } func parseOptions(root string, options map[string]interface{}) (toolchain.PythonOptions, error) { pythonOptions := toolchain.PythonOptions{ Root: root, } if virtualenv, ok := options["virtualenv"]; ok { if virtualenv, ok := virtualenv.(string); ok { pythonOptions.Virtualenv = virtualenv } else { return pythonOptions, errors.New("virtualenv option must be a string") } } if typechecker, ok := options["typechecker"]; ok { if typechecker, ok := typechecker.(string); ok { switch typechecker { case "mypy": pythonOptions.Typechecker = toolchain.TypeCheckerMypy case "pyright": pythonOptions.Typechecker = toolchain.TypeCheckerPyright default: return pythonOptions, fmt.Errorf("unsupported typechecker option: %s", typechecker) } } else { return pythonOptions, errors.New("typechecker option must be a string") } } if tc, ok := options["toolchain"]; ok { if tc, ok := tc.(string); ok { switch tc { case "pip": pythonOptions.Toolchain = toolchain.Pip default: return pythonOptions, fmt.Errorf("unsupported toolchain option: %s", tc) } } else { return pythonOptions, errors.New("toolchain option must be a string") } } return pythonOptions, nil } func newLanguageHost(exec, engineAddress, tracing string, useToml bool, ) pulumirpc.LanguageRuntimeServer { return &pythonLanguageHost{ exec: exec, engineAddress: engineAddress, tracing: tracing, useToml: useToml, } } // GetRequiredPlugins computes the complete set of anticipated plugins required by a program. func (host *pythonLanguageHost) GetRequiredPlugins(ctx context.Context, req *pulumirpc.GetRequiredPluginsRequest, ) (*pulumirpc.GetRequiredPluginsResponse, error) { opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap()) if err != nil { return nil, err } // Prepare the virtual environment (if needed). err = host.prepareVirtualEnvironment(ctx, req.Info.RootDirectory, req.Info.ProgramDirectory, opts.Virtualenv) if err != nil { return nil, fmt.Errorf("preparing virtual environment: %w", err) } validateVersion(ctx, opts) // Now, determine which Pulumi packages are installed. pulumiPackages, err := determinePulumiPackages(ctx, opts) if err != nil { return nil, err } plugins := []*pulumirpc.PluginDependency{} for _, pkg := range pulumiPackages { plugin, err := determinePluginDependency(pkg) if err != nil { return nil, err } if plugin != nil { plugins = append(plugins, plugin) } } return &pulumirpc.GetRequiredPluginsResponse{Plugins: plugins}, nil } func (host *pythonLanguageHost) Pack(ctx context.Context, req *pulumirpc.PackRequest) (*pulumirpc.PackResponse, error) { tc, err := toolchain.ResolveToolchain(toolchain.PythonOptions{ Toolchain: toolchain.Pip, }) if err != nil { return nil, err } // ensure build is up-to-date buildUpgradeCmd, err := tc.ModuleCommand(ctx, "pip", "install", "--upgrade", "build") if err != nil { return nil, err } buildUpgradeCmd.Stdout = os.Stdout buildUpgradeCmd.Stderr = os.Stderr err = buildUpgradeCmd.Run() if err != nil { return nil, fmt.Errorf("install build tools: %w", err) } tmp, err := os.MkdirTemp("", "pulumi-python-pack") if err != nil { return nil, fmt.Errorf("create temporary directory: %w", err) } buildCmd, err := tc.ModuleCommand(ctx, "build", "--outdir", tmp) if err != nil { return nil, err } buildCmd.Dir = req.PackageDirectory var stdout, stderr bytes.Buffer buildCmd.Stdout = &stdout buildCmd.Stderr = &stderr err = buildCmd.Run() logging.V(5).Infof("Pack stdout: %s", stdout.String()) logging.V(5).Infof("Pack stderr: %s", stderr.String()) if err != nil { return nil, fmt.Errorf("run python build: %w\n%s\n%s", err, stdout.String(), stderr.String()) } // prefer .whl but return .tar.gz if no .whl is found files, err := os.ReadDir(tmp) if err != nil { return nil, fmt.Errorf("read temporary directory: %w", err) } var found string for _, file := range files { if strings.HasSuffix(file.Name(), ".whl") { found = file.Name() break } if strings.HasSuffix(file.Name(), ".tar.gz") { found = file.Name() } } // Copy the found file to the destination directory if found == "" { return nil, fmt.Errorf("no .whl or .tar.gz file found\n%s", stderr.String()) } src := filepath.Join(tmp, found) dst := filepath.Join(req.DestinationDirectory, found) err = fsutil.CopyFile(dst, src, nil) if err != nil { return nil, fmt.Errorf("copy file: %w", err) } return &pulumirpc.PackResponse{ ArtifactPath: dst, }, nil } // prepareVirtualEnvironment will create and install dependencies in the virtual environment if host.virtualenv is set. func (host *pythonLanguageHost) prepareVirtualEnvironment(ctx context.Context, root, cwd, virtualenv string) error { if virtualenv == "" { return nil } virtualenvAbsPath := virtualenv if !filepath.IsAbs(virtualenvAbsPath) { virtualenvAbsPath = filepath.Join(root, virtualenv) } // If the virtual environment directory doesn't exist, create it. var createVirtualEnv bool info, err := os.Stat(virtualenvAbsPath) if err != nil { if os.IsNotExist(err) { createVirtualEnv = true } else { return err } } else if !info.IsDir() { return fmt.Errorf("the 'virtualenv' option in Pulumi.yaml is set to %q but it is not a directory", virtualenvAbsPath) } // If the virtual environment directory exists, but is empty, it needs to be created. if !createVirtualEnv { empty, err := fsutil.IsDirEmpty(virtualenvAbsPath) if err != nil { return err } createVirtualEnv = empty } tc, err := toolchain.ResolveToolchain(toolchain.PythonOptions{ Toolchain: toolchain.Pip, Virtualenv: virtualenv, Root: root, }) if err != nil { return err } // Create the virtual environment and install dependencies into it, if needed. if createVirtualEnv { // Make a connection to the real engine that we will log messages to. conn, err := grpc.Dial( host.engineAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), rpcutil.GrpcChannelOptions(), ) if err != nil { return fmt.Errorf("language host could not make connection to engine: %w", err) } // Make a client around that connection. engineClient := pulumirpc.NewEngineClient(conn) // Create writers that log the output of the install operation as ephemeral messages. streamID := rand.Int31() //nolint:gosec infoWriter := &logWriter{ ctx: ctx, engineClient: engineClient, streamID: streamID, severity: pulumirpc.LogSeverity_INFO, } errorWriter := &logWriter{ ctx: ctx, engineClient: engineClient, streamID: streamID, severity: pulumirpc.LogSeverity_ERROR, } if err := tc.InstallDependencies(ctx, cwd, true /*showOutput*/, infoWriter, errorWriter); err != nil { return fmt.Errorf("installing dependencies: %w", err) } } if err := tc.ValidateVenv(ctx); err != nil { return err } return nil } type logWriter struct { ctx context.Context engineClient pulumirpc.EngineClient streamID int32 severity pulumirpc.LogSeverity } func (w *logWriter) Write(p []byte) (n int, err error) { val := string(p) if _, err := w.engineClient.Log(w.ctx, &pulumirpc.LogRequest{ Message: strings.ToValidUTF8(val, "�"), Urn: "", Ephemeral: true, StreamId: w.streamID, Severity: w.severity, }); err != nil { return 0, err } return len(val), nil } // These packages are known not to have any plugins. // TODO[pulumi/pulumi#5863]: Remove this once the `pulumi-policy` package includes a `pulumi-plugin.json` // file that indicates the package does not have an associated plugin, and enough time has passed. var packagesWithoutPlugins = map[string]struct{}{ "pulumi-policy": {}, } // Returns if pkg is a pulumi package. // // We check: // 1. If there is a pulumi-plugin.json file. // 2. If the first segment is "pulumi". This implies a first party package. func isPulumiPackage(pkg toolchain.PythonPackage) bool { plugin, err := readPulumiPluginJSON(pkg) if err == nil && plugin != nil { return true } return strings.HasPrefix(pkg.Name, "pulumi_") || strings.HasPrefix(pkg.Name, "pulumi-") } func readPulumiPluginJSON(pkg toolchain.PythonPackage) (*plugin.PulumiPluginJSON, error) { // The name of the module inside the package can be different from the package name. // However, our convention is to always use the same name, e.g. a package name of // "pulumi-aws" will have a module named "pulumi_aws", so we can determine the module // by replacing hyphens with underscores. packageModuleName := strings.ReplaceAll(pkg.Name, "-", "_") pulumiPluginFilePath := filepath.Join(pkg.Location, packageModuleName, "pulumi-plugin.json") logging.V(5).Infof("readPulumiPluginJSON: pulumi-plugin.json file path: %s", pulumiPluginFilePath) plugin, err := plugin.LoadPulumiPluginJSON(pulumiPluginFilePath) if os.IsNotExist(err) { return nil, nil } else if err != nil { return nil, err } return plugin, nil } func determinePulumiPackages(ctx context.Context, options toolchain.PythonOptions) ([]toolchain.PythonPackage, error) { logging.V(5).Infof("GetRequiredPlugins: Determining pulumi packages") tc, err := toolchain.ResolveToolchain(options) if err != nil { return nil, err } packages, err := tc.ListPackages(ctx, true /* transitive */) if err != nil { return nil, err } // Only return Pulumi packages. pulumiPackages := slice.Prealloc[toolchain.PythonPackage](len(packages)) for _, pkg := range packages { if !isPulumiPackage(pkg) { continue } // Skip packages that are known not have an associated plugin. if _, ok := packagesWithoutPlugins[pkg.Name]; ok { continue } pulumiPackages = append(pulumiPackages, pkg) } logging.V(5).Infof("GetRequiredPlugins: Pulumi packages: %#v", pulumiPackages) return pulumiPackages, nil } // determinePluginDependency attempts to determine a plugin associated with a package. It checks to see if the package // contains a pulumi-plugin.json file and uses the information in that file to determine the plugin. If `resource` in // pulumi-plugin.json is set to false, nil is returned. If the name or version aren't specified in the file, these // values are derived from the package name and version. If the plugin version cannot be determined from the package // version, nil is returned. func determinePluginDependency(pkg toolchain.PythonPackage) (*pulumirpc.PluginDependency, error) { var name, version, server string plugin, err := readPulumiPluginJSON(pkg) if plugin != nil && err == nil { // If `resource` is set to false, the Pulumi package has indicated that there is no associated plugin. // Ignore it. if !plugin.Resource { logging.V(5).Infof("GetRequiredPlugins: Ignoring package %s with resource set to false", pkg.Name) return nil, nil } name, version, server = plugin.Name, plugin.Version, plugin.Server } else if err != nil { logging.V(5).Infof("GetRequiredPlugins: err: %v", err) return nil, err } if name == "" { name = strings.TrimPrefix(pkg.Name, "pulumi-") } if version == "" { // The packageVersion may include additional pre-release tags (e.g. "2.14.0a1605583329" for an alpha // release, "2.14.0b1605583329" for a beta release, "2.14.0rc1605583329" for an rc release, etc.). // Unfortunately, this is not enough information to determine the plugin version. A package version of // "3.31.0a1605189729" will have an associated plugin with a version of "3.31.0-alpha.1605189729+42435656". // The "+42435656" suffix cannot be determined so the plugin version cannot be determined. In such cases, // log the issue and skip the package. version, err = determinePluginVersion(pkg.Version) if err != nil { logging.V(5).Infof( "GetRequiredPlugins: Could not determine plugin version for package %s with version %s: %s", pkg.Name, pkg.Version, err) return nil, nil } } if !strings.HasPrefix(version, "v") { // Add "v" prefix if not already present. version = "v" + version } result := &pulumirpc.PluginDependency{ Name: name, Version: version, Kind: "resource", Server: server, } logging.V(5).Infof("GetRequiredPlugins: Determining plugin dependency: %#v", result) return result, nil } // determinePluginVersion attempts to convert a PEP440 package version into a plugin version. // // Supported versions: // // PEP440 defines a version as `[N!]N(.N)*[{a|b|rc}N][.postN][.devN]`, but // determinePluginVersion only supports a subset of that. Translations are provided for // `N(.N)*[{a|b|rc}N][.postN][.devN]`. // // Translations: // // We ensure that there are at least 3 version segments. Missing segments are `0` // padded. // Example: 1.0 => 1.0.0 // // We translate a,b,rc to alpha,beta,rc respectively with a hyphen separator. // Example: 1.2.3a4 => 1.2.3-alpha.4, 1.2.3rc4 => 1.2.3-rc.4 // // We translate `.post` and `.dev` by replacing the `.` with a `+`. If both `.post` // and `.dev` are present, only one separator is used. // Example: 1.2.3.post4 => 1.2.3+post4, 1.2.3.post4.dev5 => 1.2.3+post4dev5 // // Reference on PEP440: https://www.python.org/dev/peps/pep-0440/ func determinePluginVersion(packageVersion string) (string, error) { if len(packageVersion) == 0 { return "", errors.New("cannot parse empty string") } // Verify ASCII for i := 0; i < len(packageVersion); i++ { c := packageVersion[i] if c > unicode.MaxASCII { return "", fmt.Errorf("byte %d is not ascii", i) } } parseNumber := func(s string) (string, string) { i := 0 for _, c := range s { if c > '9' || c < '0' { break } i++ } return s[:i], s[i:] } // Explicitly err on epochs if num, maybeEpoch := parseNumber(packageVersion); num != "" && strings.HasPrefix(maybeEpoch, "!") { return "", errors.New("epochs are not supported") } segments := []string{} num, rest := "", packageVersion foundDot := false for { if num, rest = parseNumber(rest); num != "" { foundDot = false segments = append(segments, num) if strings.HasPrefix(rest, ".") { rest = rest[1:] foundDot = true } else { break } } else { break } } if foundDot { rest = "." + rest } for len(segments) < 3 { segments = append(segments, "0") } if rest == "" { r := strings.Join(segments, ".") return r, nil } var preRelease string switch { case rest[0] == 'a': preRelease, rest = parseNumber(rest[1:]) preRelease = "-alpha." + preRelease case rest[0] == 'b': preRelease, rest = parseNumber(rest[1:]) preRelease = "-beta." + preRelease case strings.HasPrefix(rest, "rc"): preRelease, rest = parseNumber(rest[2:]) preRelease = "-rc." + preRelease } var postRelease string if strings.HasPrefix(rest, ".post") { postRelease, rest = parseNumber(rest[5:]) postRelease = "+post" + postRelease } var developmentRelease string if strings.HasPrefix(rest, ".dev") { developmentRelease, rest = parseNumber(rest[4:]) join := "" if postRelease == "" { join = "+" } developmentRelease = join + "dev" + developmentRelease } if rest != "" { return "", fmt.Errorf("'%s' still unparsed", rest) } result := strings.Join(segments, ".") + preRelease + postRelease + developmentRelease return result, nil } // Run is RPC endpoint for LanguageRuntimeServer::Run func (host *pythonLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) { opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap()) if err != nil { return nil, err } args := []string{host.exec} args = append(args, host.constructArguments(req)...) config, err := host.constructConfig(req) if err != nil { err = fmt.Errorf("failed to serialize configuration: %w", err) return nil, err } configSecretKeys, err := host.constructConfigSecretKeys(req) if err != nil { err = fmt.Errorf("failed to serialize configuration secret keys: %w", err) return nil, err } if logging.V(5) { commandStr := strings.Join(args, " ") logging.V(5).Infoln("Language host launching process: ", host.exec, commandStr) } // Now simply spawn a process to execute the requested program, wiring up stdout/stderr directly. mkCmd := func(args []string) (*exec.Cmd, error) { tc, err := toolchain.ResolveToolchain(opts) if err != nil { return nil, err } if err := tc.ValidateVenv(ctx); err != nil { return nil, err } return tc.Command(ctx, args...) } cmd, err := mkCmd(args) if err != nil { return nil, err } cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if config != "" || configSecretKeys != "" { env := cmd.Env if env == nil { env = os.Environ() } if config != "" { env = append(env, pulumiConfigVar+"="+config) } if configSecretKeys != "" { env = append(env, pulumiConfigSecretKeysVar+"="+configSecretKeys) } cmd.Env = env } // Before running the command, we might need to run typechecker first var typechecker string switch opts.Typechecker { case toolchain.TypeCheckerNone: break case toolchain.TypeCheckerMypy: typechecker = "mypy" case toolchain.TypeCheckerPyright: typechecker = "pyright" } if typechecker != "" { typecheckerCmd, err := mkCmd([]string{"-m", typechecker, req.Info.ProgramDirectory}) if err != nil { return nil, err } typecheckerCmd.Stdout = os.Stdout typecheckerCmd.Stderr = os.Stderr if err := typecheckerCmd.Run(); err != nil { return nil, fmt.Errorf("%s failed: %w", typechecker, err) } } var errResult string if err := cmd.Run(); err != nil { // Python does not explicitly flush standard out or standard error when exiting abnormally. For this reason, we // need to explicitly flush our output streams so that, when we exit, the engine picks up the child Python // process's stdout and stderr writes. // // This is especially crucial for Python because it is possible for the child Python process to crash very fast // if Pulumi is misconfigured, so we must be sure to present a high-quality error message to the user. contract.IgnoreError(os.Stdout.Sync()) contract.IgnoreError(os.Stderr.Sync()) if exiterr, ok := err.(*exec.ExitError); ok { // If the program ran, but exited with a non-zero error code. This will happen often, since user // errors will trigger this. So, the error message should look as nice as possible. if status, stok := exiterr.Sys().(syscall.WaitStatus); stok { switch status.ExitStatus() { case 0: // This really shouldn't happen, but if it does, we don't want to render "non-zero exit code" err = fmt.Errorf("program exited unexpectedly: %w", exiterr) case pythonProcessExitedAfterShowingUserActionableMessage: return &pulumirpc.RunResponse{Error: "", Bail: true}, nil default: err = fmt.Errorf("program exited with non-zero exit code: %d", status.ExitStatus()) } } else { err = fmt.Errorf("program exited unexpectedly: %w", exiterr) } } else { // Otherwise, we didn't even get to run the program. This ought to never happen unless there's // a bug or system condition that prevented us from running the language exec. Issue a scarier error. err = fmt.Errorf("problem executing program (could not run language executor): %w", err) } errResult = err.Error() } return &pulumirpc.RunResponse{Error: errResult}, nil } // constructArguments constructs a command-line for `pulumi-language-python` // by enumerating all of the optional and non-optional arguments present // in a RunRequest. func (host *pythonLanguageHost) constructArguments(req *pulumirpc.RunRequest) []string { var args []string maybeAppendArg := func(k, v string) { if v != "" { args = append(args, "--"+k, v) } } maybeAppendArg("monitor", req.GetMonitorAddress()) maybeAppendArg("engine", host.engineAddress) maybeAppendArg("project", req.GetProject()) maybeAppendArg("stack", req.GetStack()) maybeAppendArg("pwd", req.GetPwd()) maybeAppendArg("dry_run", strconv.FormatBool(req.GetDryRun())) maybeAppendArg("parallel", strconv.Itoa(int(req.GetParallel()))) maybeAppendArg("tracing", host.tracing) maybeAppendArg("organization", req.GetOrganization()) // The engine should always pass a name for entry point, even if its just "." for the program directory. args = append(args, req.Info.EntryPoint) args = append(args, req.GetArgs()...) return args } // constructConfig json-serializes the configuration data given as part of a RunRequest. func (host *pythonLanguageHost) constructConfig(req *pulumirpc.RunRequest) (string, error) { configMap := req.GetConfig() if configMap == nil { return "", nil } configJSON, err := json.Marshal(configMap) if err != nil { return "", err } return string(configJSON), nil } // constructConfigSecretKeys JSON-serializes the list of keys that contain secret values given as part of // a RunRequest. func (host *pythonLanguageHost) constructConfigSecretKeys(req *pulumirpc.RunRequest) (string, error) { configSecretKeys := req.GetConfigSecretKeys() if configSecretKeys == nil { return "[]", nil } configSecretKeysJSON, err := json.Marshal(configSecretKeys) if err != nil { return "", err } return string(configSecretKeysJSON), nil } func (host *pythonLanguageHost) GetPluginInfo(ctx context.Context, req *emptypb.Empty) (*pulumirpc.PluginInfo, error) { return &pulumirpc.PluginInfo{ Version: version.Version, }, nil } // validateVersion checks that python is running a valid version. If a version // is invalid, it prints to os.Stderr. This is interpreted as diagnostic message // by the Pulumi CLI program. func validateVersion(ctx context.Context, options toolchain.PythonOptions) { var versionCmd *exec.Cmd var err error versionArgs := []string{"--version"} tc, err := toolchain.ResolveToolchain(options) if err != nil { fmt.Fprintf(os.Stderr, "Failed to configure python toolchain: %s\n", err) return } versionCmd, err = tc.Command(ctx, versionArgs...) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create python version command: %s\n", err) return } var out []byte if out, err = versionCmd.Output(); err != nil { fmt.Fprintf(os.Stderr, "Failed to resolve python version command: %s\n", err) return } version := strings.TrimSpace(strings.TrimPrefix(string(out), "Python ")) parsed, err := semver.Parse(version) if err != nil { fmt.Fprintf(os.Stderr, "Failed to parse python version: '%s'\n", version) return } if parsed.LT(minimumSupportedPythonVersion) { fmt.Fprintf(os.Stderr, "Pulumi does not support Python %s."+ " Please upgrade to at least %s\n", parsed, minimumSupportedPythonVersion) } else if parsed.LT(eolPythonVersion) { fmt.Fprintf(os.Stderr, "Python %d.%d is approaching EOL and will not be supported in Pulumi soon."+ " Check %s for more details\n", parsed.Major, parsed.Minor, eolPythonVersionIssue) } } func (host *pythonLanguageHost) InstallDependencies( req *pulumirpc.InstallDependenciesRequest, server pulumirpc.LanguageRuntime_InstallDependenciesServer, ) error { opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap()) if err != nil { return err } closer, stdout, stderr, err := rpcutil.MakeInstallDependenciesStreams(server, req.IsTerminal) if err != nil { return err } // best effort close, but we try an explicit close and error check at the end as well defer closer.Close() stdout.Write([]byte("Installing dependencies...\n\n")) tc, err := toolchain.ResolveToolchain(opts) if err != nil { return err } if err := tc.InstallDependencies(server.Context(), req.Info.ProgramDirectory, true /*showOutput*/, stdout, stderr); err != nil { return err } stdout.Write([]byte("Finished installing dependencies\n\n")) return closer.Close() } func (host *pythonLanguageHost) About(ctx context.Context, req *emptypb.Empty) (*pulumirpc.AboutResponse, error) { tc, err := toolchain.ResolveToolchain(toolchain.PythonOptions{ Toolchain: toolchain.Pip, }) if err != nil { return nil, err } info, err := tc.About(ctx) if err != nil { return nil, err } return &pulumirpc.AboutResponse{ Executable: info.Executable, Version: info.Version, }, nil } func (host *pythonLanguageHost) GetProgramDependencies( ctx context.Context, req *pulumirpc.GetProgramDependenciesRequest, ) (*pulumirpc.GetProgramDependenciesResponse, error) { opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap()) if err != nil { return nil, err } tc, err := toolchain.ResolveToolchain(opts) if err != nil { return nil, err } result, err := tc.ListPackages(ctx, req.TransitiveDependencies /* transitive */) if err != nil { return nil, fmt.Errorf("failed to get python dependencies: %w", err) } dependencies := make([]*pulumirpc.DependencyInfo, len(result)) for i, dep := range result { dependencies[i] = &pulumirpc.DependencyInfo{ Name: dep.Name, Version: dep.Version, } } return &pulumirpc.GetProgramDependenciesResponse{ Dependencies: dependencies, }, nil } func (host *pythonLanguageHost) RunPlugin( req *pulumirpc.RunPluginRequest, server pulumirpc.LanguageRuntime_RunPluginServer, ) error { logging.V(5).Infof("Attempting to run python plugin in %s", req.Info.ProgramDirectory) opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap()) if err != nil { return err } args := []string{req.Info.ProgramDirectory} args = append(args, req.Args...) tc, err := toolchain.ResolveToolchain(opts) if err != nil { return err } cmd, err := tc.Command(server.Context(), args...) if err != nil { return err } closer, stdout, stderr, err := rpcutil.MakeRunPluginStreams(server, false) if err != nil { return err } // best effort close, but we try an explicit close and error check at the end as well defer closer.Close() cmd.Dir = req.Pwd cmd.Env = append(cmd.Env, req.Env...) cmd.Stdout, cmd.Stderr = stdout, stderr if err = cmd.Run(); err != nil { var exiterr *exec.ExitError if errors.As(err, &exiterr) { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { return server.Send(&pulumirpc.RunPluginResponse{ Output: &pulumirpc.RunPluginResponse_Exitcode{Exitcode: int32(status.ExitStatus())}, }) } return fmt.Errorf("program exited unexpectedly: %w", exiterr) } return fmt.Errorf("problem executing plugin program (could not run language executor): %w", err) } return closer.Close() } func (host *pythonLanguageHost) GenerateProject( ctx context.Context, req *pulumirpc.GenerateProjectRequest, ) (*pulumirpc.GenerateProjectResponse, error) { loader, err := schema.NewLoaderClient(req.LoaderTarget) if err != nil { return nil, err } var extraOptions []pcl.BindOption if !req.Strict { extraOptions = append(extraOptions, pcl.NonStrictBindOptions()...) } // for python, prefer output-versioned invokes extraOptions = append(extraOptions, pcl.PreferOutputVersionedInvokes) program, diags, err := pcl.BindDirectory(req.SourceDirectory, loader, extraOptions...) if err != nil { return nil, err } if diags.HasErrors() { rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags) return &pulumirpc.GenerateProjectResponse{ Diagnostics: rpcDiagnostics, }, nil } var project workspace.Project if err := json.Unmarshal([]byte(req.Project), &project); err != nil { return nil, err } err = codegen.GenerateProject(req.TargetDirectory, project, program, req.LocalDependencies) if err != nil { return nil, err } rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags) return &pulumirpc.GenerateProjectResponse{ Diagnostics: rpcDiagnostics, }, nil } func (host *pythonLanguageHost) GenerateProgram( ctx context.Context, req *pulumirpc.GenerateProgramRequest, ) (*pulumirpc.GenerateProgramResponse, error) { loader, err := schema.NewLoaderClient(req.LoaderTarget) if err != nil { return nil, err } parser := hclsyntax.NewParser() // Load all .pp files in the directory for path, contents := range req.Source { err = parser.ParseFile(strings.NewReader(contents), path) if err != nil { return nil, err } diags := parser.Diagnostics if diags.HasErrors() { return nil, diags } } program, diags, err := pcl.BindProgram(parser.Files, pcl.Loader(loader), pcl.PreferOutputVersionedInvokes) if err != nil { return nil, err } rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags) if diags.HasErrors() { return &pulumirpc.GenerateProgramResponse{ Diagnostics: rpcDiagnostics, }, nil } if program == nil { return nil, errors.New("internal error program was nil") } files, diags, err := codegen.GenerateProgram(program) if err != nil { return nil, err } rpcDiagnostics = append(rpcDiagnostics, plugin.HclDiagnosticsToRPCDiagnostics(diags)...) return &pulumirpc.GenerateProgramResponse{ Source: files, Diagnostics: rpcDiagnostics, }, nil } func (host *pythonLanguageHost) GeneratePackage( ctx context.Context, req *pulumirpc.GeneratePackageRequest, ) (*pulumirpc.GeneratePackageResponse, error) { loader, err := schema.NewLoaderClient(req.LoaderTarget) if err != nil { return nil, err } var spec schema.PackageSpec err = json.Unmarshal([]byte(req.Schema), &spec) if err != nil { return nil, err } pkg, diags, err := schema.BindSpec(spec, loader) if err != nil { return nil, err } rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags) if diags.HasErrors() { return &pulumirpc.GeneratePackageResponse{ Diagnostics: rpcDiagnostics, }, nil } if host.useToml { var info codegen.PackageInfo var ok bool if info, ok = pkg.Language["python"].(codegen.PackageInfo); !ok { info = codegen.PackageInfo{} } info.PyProject.Enabled = true pkg.Language["python"] = info } files, err := codegen.GeneratePackage("pulumi-language-python", pkg, req.ExtraFiles) if err != nil { return nil, err } for filename, data := range files { outPath := filepath.Join(req.Directory, filename) err := os.MkdirAll(filepath.Dir(outPath), 0o700) if err != nil { return nil, fmt.Errorf("could not create output directory %s: %w", filepath.Dir(filename), err) } err = os.WriteFile(outPath, data, 0o600) if err != nil { return nil, fmt.Errorf("could not write output file %s: %w", filename, err) } } return &pulumirpc.GeneratePackageResponse{ Diagnostics: rpcDiagnostics, }, nil }