// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// pulumi-language-python serves as the "language host" for Pulumi programs written in Python.  It is ultimately
// responsible for spawning the language runtime that executes the program.
//
// The program being executed is executed by a shim script called `pulumi-language-python-exec`. This script is
// written in the hosted language (in this case, Python) and is responsible for initiating RPC links to the resource
// monitor and engine.
//
// It's therefore the responsibility of this program to implement the LanguageHostServer endpoint by spawning
// instances of `pulumi-language-python-exec` and forwarding the RPC request arguments to the command-line.
package main

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"math/rand"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"strconv"
	"strings"
	"syscall"
	"time"
	"unicode"

	"github.com/blang/semver"
	"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
	"github.com/pulumi/pulumi/sdk/v3/go/common/slice"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/fsutil"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
	"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
	"github.com/pulumi/pulumi/sdk/v3/go/common/version"
	"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
	pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
	"github.com/pulumi/pulumi/sdk/v3/python/toolchain"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/emptypb"

	hclsyntax "github.com/pulumi/pulumi/pkg/v3/codegen/hcl2/syntax"
	"github.com/pulumi/pulumi/pkg/v3/codegen/pcl"
	codegen "github.com/pulumi/pulumi/pkg/v3/codegen/python"
	"github.com/pulumi/pulumi/pkg/v3/codegen/schema"
)

const (
	// By convention, the executor is the name of the current program (pulumi-language-python) plus this suffix.
	pythonDefaultExec = "pulumi-language-python-exec" // the exec shim for Pulumi to run Python programs.

	// The runtime expects the config object to be saved to this environment variable.
	pulumiConfigVar = "PULUMI_CONFIG"

	// The runtime expects the array of secret config keys to be saved to this environment variable.
	//nolint:gosec
	pulumiConfigSecretKeysVar = "PULUMI_CONFIG_SECRET_KEYS"

	// A exit-code we recognize when the python process exits.  If we see this error, there's no
	// need for us to print any additional error messages since the user already got a a good
	// one they can handle.
	pythonProcessExitedAfterShowingUserActionableMessage = 32
)

var (
	// The minimum python version that Pulumi supports
	minimumSupportedPythonVersion = semver.MustParse("3.7.0")
	// Any version less then `eolPythonVersion` is EOL.
	eolPythonVersion = semver.MustParse("3.7.0")
	// An url to the issue discussing EOL.
	eolPythonVersionIssue = "https://github.com/pulumi/pulumi/issues/8131"
)

// Launches the language host RPC endpoint, which in turn fires up an RPC server implementing the
// LanguageRuntimeServer RPC endpoint.
func main() {
	var tracing string
	flag.StringVar(&tracing, "tracing", "", "Emit tracing to a Zipkin-compatible tracing endpoint")
	flag.String("virtualenv", "", "[obsolete] Virtual environment path to use")
	flag.String("root", "", "[obsolete] Project root path to use")
	flag.String("typechecker", "", "[obsolete] Use a typechecker to type check")
	flag.String("toolchain", "pip", "[obsolete] Select the package manager to use for dependency management.")

	// You can use the below flag to request that the language host load a specific executor instead of probing the
	// PATH.  This can be used during testing to override the default location.
	var givenExecutor string
	flag.StringVar(&givenExecutor, "use-executor", "",
		"Use the given program as the executor instead of looking for one on PATH")

	flag.Parse()
	args := flag.Args()
	logging.InitLogging(false, 0, false)
	cmdutil.InitTracing("pulumi-language-python", "pulumi-language-python", tracing)

	var pythonExec string
	if givenExecutor == "" {
		// By default, the -exec script is installed next to the language host.
		thisPath, err := os.Executable()
		if err != nil {
			err = fmt.Errorf("could not determine current executable: %w", err)
			cmdutil.Exit(err)
		}

		pathExec := filepath.Join(filepath.Dir(thisPath), pythonDefaultExec)
		if _, err = os.Stat(pathExec); os.IsNotExist(err) {
			err = fmt.Errorf("missing executor %s: %w", pathExec, err)
			cmdutil.Exit(err)
		}

		logging.V(3).Infof("language host identified executor from path: `%s`", pathExec)
		pythonExec = pathExec
	} else {
		logging.V(3).Infof("language host asked to use specific executor: `%s`", givenExecutor)
		pythonExec = givenExecutor
	}

	// Optionally pluck out the engine so we can do logging, etc.
	var engineAddress string
	if len(args) > 0 {
		engineAddress = args[0]
	}

	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	// map the context Done channel to the rpcutil boolean cancel channel
	cancelChannel := make(chan bool)
	go func() {
		<-ctx.Done()
		cancel() // deregister signal handler
		close(cancelChannel)
	}()
	err := rpcutil.Healthcheck(ctx, engineAddress, 5*time.Minute, cancel)
	if err != nil {
		cmdutil.Exit(fmt.Errorf("could not start health check host RPC server: %w", err))
	}

	// Fire up a gRPC server, letting the kernel choose a free port.
	handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
		Cancel: cancelChannel,
		Init: func(srv *grpc.Server) error {
			host := newLanguageHost(pythonExec, engineAddress, tracing, false)
			pulumirpc.RegisterLanguageRuntimeServer(srv, host)
			return nil
		},
		Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
	})
	if err != nil {
		cmdutil.Exit(fmt.Errorf("could not start language host RPC server: %w", err))
	}

	// Otherwise, print out the port so that the spawner knows how to reach us.
	fmt.Printf("%d\n", handle.Port)

	// And finally wait for the server to stop serving.
	if err := <-handle.Done; err != nil {
		cmdutil.Exit(fmt.Errorf("language host RPC stopped serving: %w", err))
	}
}

// pythonLanguageHost implements the LanguageRuntimeServer interface
// for use as an API endpoint.
type pythonLanguageHost struct {
	pulumirpc.UnimplementedLanguageRuntimeServer

	exec          string
	engineAddress string
	tracing       string

	// used by language conformance tests to enable the PyProject.Enabled schema option
	useToml bool
}

func parseOptions(root string, options map[string]interface{}) (toolchain.PythonOptions, error) {
	pythonOptions := toolchain.PythonOptions{
		Root: root,
	}

	if virtualenv, ok := options["virtualenv"]; ok {
		if virtualenv, ok := virtualenv.(string); ok {
			pythonOptions.Virtualenv = virtualenv
		} else {
			return pythonOptions, errors.New("virtualenv option must be a string")
		}
	}

	if typechecker, ok := options["typechecker"]; ok {
		if typechecker, ok := typechecker.(string); ok {
			switch typechecker {
			case "mypy":
				pythonOptions.Typechecker = toolchain.TypeCheckerMypy
			case "pyright":
				pythonOptions.Typechecker = toolchain.TypeCheckerPyright
			default:
				return pythonOptions, fmt.Errorf("unsupported typechecker option: %s", typechecker)
			}
		} else {
			return pythonOptions, errors.New("typechecker option must be a string")
		}
	}

	if tc, ok := options["toolchain"]; ok {
		if tc, ok := tc.(string); ok {
			switch tc {
			case "pip":
				pythonOptions.Toolchain = toolchain.Pip
			default:
				return pythonOptions, fmt.Errorf("unsupported toolchain option: %s", tc)
			}
		} else {
			return pythonOptions, errors.New("toolchain option must be a string")
		}
	}

	return pythonOptions, nil
}

func newLanguageHost(exec, engineAddress, tracing string, useToml bool,
) pulumirpc.LanguageRuntimeServer {
	return &pythonLanguageHost{
		exec:          exec,
		engineAddress: engineAddress,
		tracing:       tracing,
		useToml:       useToml,
	}
}

// GetRequiredPlugins computes the complete set of anticipated plugins required by a program.
func (host *pythonLanguageHost) GetRequiredPlugins(ctx context.Context,
	req *pulumirpc.GetRequiredPluginsRequest,
) (*pulumirpc.GetRequiredPluginsResponse, error) {
	opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap())
	if err != nil {
		return nil, err
	}

	// Prepare the virtual environment (if needed).
	err = host.prepareVirtualEnvironment(ctx, req.Info.RootDirectory, req.Info.ProgramDirectory, opts.Virtualenv)
	if err != nil {
		return nil, fmt.Errorf("preparing virtual environment: %w", err)
	}

	validateVersion(ctx, opts)

	// Now, determine which Pulumi packages are installed.
	pulumiPackages, err := determinePulumiPackages(ctx, opts)
	if err != nil {
		return nil, err
	}

	plugins := []*pulumirpc.PluginDependency{}
	for _, pkg := range pulumiPackages {
		plugin, err := determinePluginDependency(pkg)
		if err != nil {
			return nil, err
		}

		if plugin != nil {
			plugins = append(plugins, plugin)
		}
	}

	return &pulumirpc.GetRequiredPluginsResponse{Plugins: plugins}, nil
}

func (host *pythonLanguageHost) Pack(ctx context.Context, req *pulumirpc.PackRequest) (*pulumirpc.PackResponse, error) {
	tc, err := toolchain.ResolveToolchain(toolchain.PythonOptions{
		Toolchain: toolchain.Pip,
	})
	if err != nil {
		return nil, err
	}
	// ensure build is up-to-date
	buildUpgradeCmd, err := tc.ModuleCommand(ctx, "pip", "install", "--upgrade", "build")
	if err != nil {
		return nil, err
	}
	buildUpgradeCmd.Stdout = os.Stdout
	buildUpgradeCmd.Stderr = os.Stderr
	err = buildUpgradeCmd.Run()
	if err != nil {
		return nil, fmt.Errorf("install build tools: %w", err)
	}

	tmp, err := os.MkdirTemp("", "pulumi-python-pack")
	if err != nil {
		return nil, fmt.Errorf("create temporary directory: %w", err)
	}

	buildCmd, err := tc.ModuleCommand(ctx, "build", "--outdir", tmp)
	if err != nil {
		return nil, err
	}
	buildCmd.Dir = req.PackageDirectory

	var stdout, stderr bytes.Buffer
	buildCmd.Stdout = &stdout
	buildCmd.Stderr = &stderr

	err = buildCmd.Run()
	logging.V(5).Infof("Pack stdout: %s", stdout.String())
	logging.V(5).Infof("Pack stderr: %s", stderr.String())
	if err != nil {
		return nil, fmt.Errorf("run python build: %w\n%s\n%s", err, stdout.String(), stderr.String())
	}

	// prefer .whl but return .tar.gz if no .whl is found
	files, err := os.ReadDir(tmp)
	if err != nil {
		return nil, fmt.Errorf("read temporary directory: %w", err)
	}

	var found string
	for _, file := range files {
		if strings.HasSuffix(file.Name(), ".whl") {
			found = file.Name()
			break
		}
		if strings.HasSuffix(file.Name(), ".tar.gz") {
			found = file.Name()
		}
	}

	// Copy the found file to the destination directory
	if found == "" {
		return nil, fmt.Errorf("no .whl or .tar.gz file found\n%s", stderr.String())
	}

	src := filepath.Join(tmp, found)
	dst := filepath.Join(req.DestinationDirectory, found)
	err = fsutil.CopyFile(dst, src, nil)
	if err != nil {
		return nil, fmt.Errorf("copy file: %w", err)
	}

	return &pulumirpc.PackResponse{
		ArtifactPath: dst,
	}, nil
}

// prepareVirtualEnvironment will create and install dependencies in the virtual environment if host.virtualenv is set.
func (host *pythonLanguageHost) prepareVirtualEnvironment(ctx context.Context, root, cwd, virtualenv string) error {
	if virtualenv == "" {
		return nil
	}

	virtualenvAbsPath := virtualenv
	if !filepath.IsAbs(virtualenvAbsPath) {
		virtualenvAbsPath = filepath.Join(root, virtualenv)
	}

	// If the virtual environment directory doesn't exist, create it.
	var createVirtualEnv bool
	info, err := os.Stat(virtualenvAbsPath)
	if err != nil {
		if os.IsNotExist(err) {
			createVirtualEnv = true
		} else {
			return err
		}
	} else if !info.IsDir() {
		return fmt.Errorf("the 'virtualenv' option in Pulumi.yaml is set to %q but it is not a directory", virtualenvAbsPath)
	}

	// If the virtual environment directory exists, but is empty, it needs to be created.
	if !createVirtualEnv {
		empty, err := fsutil.IsDirEmpty(virtualenvAbsPath)
		if err != nil {
			return err
		}
		createVirtualEnv = empty
	}

	tc, err := toolchain.ResolveToolchain(toolchain.PythonOptions{
		Toolchain:  toolchain.Pip,
		Virtualenv: virtualenv,
		Root:       root,
	})
	if err != nil {
		return err
	}

	// Create the virtual environment and install dependencies into it, if needed.
	if createVirtualEnv {
		// Make a connection to the real engine that we will log messages to.
		conn, err := grpc.Dial(
			host.engineAddress,
			grpc.WithTransportCredentials(insecure.NewCredentials()),
			rpcutil.GrpcChannelOptions(),
		)
		if err != nil {
			return fmt.Errorf("language host could not make connection to engine: %w", err)
		}

		// Make a client around that connection.
		engineClient := pulumirpc.NewEngineClient(conn)

		// Create writers that log the output of the install operation as ephemeral messages.
		streamID := rand.Int31() //nolint:gosec

		infoWriter := &logWriter{
			ctx:          ctx,
			engineClient: engineClient,
			streamID:     streamID,
			severity:     pulumirpc.LogSeverity_INFO,
		}

		errorWriter := &logWriter{
			ctx:          ctx,
			engineClient: engineClient,
			streamID:     streamID,
			severity:     pulumirpc.LogSeverity_ERROR,
		}

		if err := tc.InstallDependencies(ctx, cwd, true /*showOutput*/, infoWriter, errorWriter); err != nil {
			return fmt.Errorf("installing dependencies: %w", err)
		}
	}

	if err := tc.ValidateVenv(ctx); err != nil {
		return err
	}

	return nil
}

type logWriter struct {
	ctx          context.Context
	engineClient pulumirpc.EngineClient
	streamID     int32
	severity     pulumirpc.LogSeverity
}

func (w *logWriter) Write(p []byte) (n int, err error) {
	val := string(p)
	if _, err := w.engineClient.Log(w.ctx, &pulumirpc.LogRequest{
		Message:   strings.ToValidUTF8(val, "�"),
		Urn:       "",
		Ephemeral: true,
		StreamId:  w.streamID,
		Severity:  w.severity,
	}); err != nil {
		return 0, err
	}
	return len(val), nil
}

// These packages are known not to have any plugins.
// TODO[pulumi/pulumi#5863]: Remove this once the `pulumi-policy` package includes a `pulumi-plugin.json`
// file that indicates the package does not have an associated plugin, and enough time has passed.
var packagesWithoutPlugins = map[string]struct{}{
	"pulumi-policy": {},
}

// Returns if pkg is a pulumi package.
//
// We check:
// 1. If there is a pulumi-plugin.json file.
// 2. If the first segment is "pulumi". This implies a first party package.
func isPulumiPackage(pkg toolchain.PythonPackage) bool {
	plugin, err := readPulumiPluginJSON(pkg)
	if err == nil && plugin != nil {
		return true
	}

	return strings.HasPrefix(pkg.Name, "pulumi_") || strings.HasPrefix(pkg.Name, "pulumi-")
}

func readPulumiPluginJSON(pkg toolchain.PythonPackage) (*plugin.PulumiPluginJSON, error) {
	// The name of the module inside the package can be different from the package name.
	// However, our convention is to always use the same name, e.g. a package name of
	// "pulumi-aws" will have a module named "pulumi_aws", so we can determine the module
	// by replacing hyphens with underscores.
	packageModuleName := strings.ReplaceAll(pkg.Name, "-", "_")
	pulumiPluginFilePath := filepath.Join(pkg.Location, packageModuleName, "pulumi-plugin.json")
	logging.V(5).Infof("readPulumiPluginJSON: pulumi-plugin.json file path: %s", pulumiPluginFilePath)

	plugin, err := plugin.LoadPulumiPluginJSON(pulumiPluginFilePath)
	if os.IsNotExist(err) {
		return nil, nil
	} else if err != nil {
		return nil, err
	}
	return plugin, nil
}

func determinePulumiPackages(ctx context.Context, options toolchain.PythonOptions) ([]toolchain.PythonPackage, error) {
	logging.V(5).Infof("GetRequiredPlugins: Determining pulumi packages")

	tc, err := toolchain.ResolveToolchain(options)
	if err != nil {
		return nil, err
	}
	packages, err := tc.ListPackages(ctx, true /* transitive */)
	if err != nil {
		return nil, err
	}

	// Only return Pulumi packages.
	pulumiPackages := slice.Prealloc[toolchain.PythonPackage](len(packages))
	for _, pkg := range packages {
		if !isPulumiPackage(pkg) {
			continue
		}

		// Skip packages that are known not have an associated plugin.
		if _, ok := packagesWithoutPlugins[pkg.Name]; ok {
			continue
		}

		pulumiPackages = append(pulumiPackages, pkg)
	}

	logging.V(5).Infof("GetRequiredPlugins: Pulumi packages: %#v", pulumiPackages)

	return pulumiPackages, nil
}

// determinePluginDependency attempts to determine a plugin associated with a package. It checks to see if the package
// contains a pulumi-plugin.json file and uses the information in that file to determine the plugin. If `resource` in
// pulumi-plugin.json is set to false, nil is returned. If the name or version aren't specified in the file, these
// values are derived from the package name and version. If the plugin version cannot be determined from the package
// version, nil is returned.
func determinePluginDependency(pkg toolchain.PythonPackage) (*pulumirpc.PluginDependency, error) {
	var name, version, server string
	plugin, err := readPulumiPluginJSON(pkg)
	if plugin != nil && err == nil {
		// If `resource` is set to false, the Pulumi package has indicated that there is no associated plugin.
		// Ignore it.
		if !plugin.Resource {
			logging.V(5).Infof("GetRequiredPlugins: Ignoring package %s with resource set to false", pkg.Name)
			return nil, nil
		}

		name, version, server = plugin.Name, plugin.Version, plugin.Server
	} else if err != nil {
		logging.V(5).Infof("GetRequiredPlugins: err: %v", err)
		return nil, err
	}

	if name == "" {
		name = strings.TrimPrefix(pkg.Name, "pulumi-")
	}

	if version == "" {
		// The packageVersion may include additional pre-release tags (e.g. "2.14.0a1605583329" for an alpha
		// release, "2.14.0b1605583329" for a beta release, "2.14.0rc1605583329" for an rc release, etc.).
		// Unfortunately, this is not enough information to determine the plugin version. A package version of
		// "3.31.0a1605189729" will have an associated plugin with a version of "3.31.0-alpha.1605189729+42435656".
		// The "+42435656" suffix cannot be determined so the plugin version cannot be determined. In such cases,
		// log the issue and skip the package.
		version, err = determinePluginVersion(pkg.Version)
		if err != nil {
			logging.V(5).Infof(
				"GetRequiredPlugins: Could not determine plugin version for package %s with version %s: %s",
				pkg.Name, pkg.Version, err)
			return nil, nil
		}
	}
	if !strings.HasPrefix(version, "v") {
		// Add "v" prefix if not already present.
		version = "v" + version
	}

	result := &pulumirpc.PluginDependency{
		Name:    name,
		Version: version,
		Kind:    "resource",
		Server:  server,
	}

	logging.V(5).Infof("GetRequiredPlugins: Determining plugin dependency: %#v", result)
	return result, nil
}

// determinePluginVersion attempts to convert a PEP440 package version into a plugin version.
//
// Supported versions:
//
//	PEP440 defines a version as `[N!]N(.N)*[{a|b|rc}N][.postN][.devN]`, but
//	determinePluginVersion only supports a subset of that. Translations are provided for
//	`N(.N)*[{a|b|rc}N][.postN][.devN]`.
//
// Translations:
//
//	We ensure that there are at least 3 version segments. Missing segments are `0`
//	padded.
//	Example: 1.0 => 1.0.0
//
//	We translate a,b,rc to alpha,beta,rc respectively with a hyphen separator.
//	Example: 1.2.3a4 => 1.2.3-alpha.4, 1.2.3rc4 => 1.2.3-rc.4
//
//	We translate `.post` and `.dev` by replacing the `.` with a `+`. If both `.post`
//	and `.dev` are present, only one separator is used.
//	Example: 1.2.3.post4 => 1.2.3+post4, 1.2.3.post4.dev5 => 1.2.3+post4dev5
//
// Reference on PEP440: https://www.python.org/dev/peps/pep-0440/
func determinePluginVersion(packageVersion string) (string, error) {
	if len(packageVersion) == 0 {
		return "", errors.New("cannot parse empty string")
	}
	// Verify ASCII
	for i := 0; i < len(packageVersion); i++ {
		c := packageVersion[i]
		if c > unicode.MaxASCII {
			return "", fmt.Errorf("byte %d is not ascii", i)
		}
	}

	parseNumber := func(s string) (string, string) {
		i := 0
		for _, c := range s {
			if c > '9' || c < '0' {
				break
			}
			i++
		}
		return s[:i], s[i:]
	}

	// Explicitly err on epochs
	if num, maybeEpoch := parseNumber(packageVersion); num != "" && strings.HasPrefix(maybeEpoch, "!") {
		return "", errors.New("epochs are not supported")
	}

	segments := []string{}
	num, rest := "", packageVersion
	foundDot := false
	for {
		if num, rest = parseNumber(rest); num != "" {
			foundDot = false
			segments = append(segments, num)
			if strings.HasPrefix(rest, ".") {
				rest = rest[1:]
				foundDot = true
			} else {
				break
			}
		} else {
			break
		}
	}
	if foundDot {
		rest = "." + rest
	}

	for len(segments) < 3 {
		segments = append(segments, "0")
	}

	if rest == "" {
		r := strings.Join(segments, ".")
		return r, nil
	}

	var preRelease string

	switch {
	case rest[0] == 'a':
		preRelease, rest = parseNumber(rest[1:])
		preRelease = "-alpha." + preRelease
	case rest[0] == 'b':
		preRelease, rest = parseNumber(rest[1:])
		preRelease = "-beta." + preRelease
	case strings.HasPrefix(rest, "rc"):
		preRelease, rest = parseNumber(rest[2:])
		preRelease = "-rc." + preRelease
	}

	var postRelease string
	if strings.HasPrefix(rest, ".post") {
		postRelease, rest = parseNumber(rest[5:])
		postRelease = "+post" + postRelease
	}

	var developmentRelease string
	if strings.HasPrefix(rest, ".dev") {
		developmentRelease, rest = parseNumber(rest[4:])
		join := ""
		if postRelease == "" {
			join = "+"
		}
		developmentRelease = join + "dev" + developmentRelease
	}

	if rest != "" {
		return "", fmt.Errorf("'%s' still unparsed", rest)
	}

	result := strings.Join(segments, ".") + preRelease + postRelease + developmentRelease

	return result, nil
}

// Run is RPC endpoint for LanguageRuntimeServer::Run
func (host *pythonLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest) (*pulumirpc.RunResponse, error) {
	opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap())
	if err != nil {
		return nil, err
	}

	args := []string{host.exec}
	args = append(args, host.constructArguments(req)...)

	config, err := host.constructConfig(req)
	if err != nil {
		err = fmt.Errorf("failed to serialize configuration: %w", err)
		return nil, err
	}
	configSecretKeys, err := host.constructConfigSecretKeys(req)
	if err != nil {
		err = fmt.Errorf("failed to serialize configuration secret keys: %w", err)
		return nil, err
	}

	if logging.V(5) {
		commandStr := strings.Join(args, " ")
		logging.V(5).Infoln("Language host launching process: ", host.exec, commandStr)
	}

	// Now simply spawn a process to execute the requested program, wiring up stdout/stderr directly.
	mkCmd := func(args []string) (*exec.Cmd, error) {
		tc, err := toolchain.ResolveToolchain(opts)
		if err != nil {
			return nil, err
		}

		if err := tc.ValidateVenv(ctx); err != nil {
			return nil, err
		}

		return tc.Command(ctx, args...)
	}

	cmd, err := mkCmd(args)
	if err != nil {
		return nil, err
	}
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr

	if config != "" || configSecretKeys != "" {
		env := cmd.Env
		if env == nil {
			env = os.Environ()
		}
		if config != "" {
			env = append(env, pulumiConfigVar+"="+config)
		}
		if configSecretKeys != "" {
			env = append(env, pulumiConfigSecretKeysVar+"="+configSecretKeys)
		}
		cmd.Env = env
	}

	// Before running the command, we might need to run typechecker first
	var typechecker string
	switch opts.Typechecker {
	case toolchain.TypeCheckerNone:
		break
	case toolchain.TypeCheckerMypy:
		typechecker = "mypy"
	case toolchain.TypeCheckerPyright:
		typechecker = "pyright"
	}

	if typechecker != "" {
		typecheckerCmd, err := mkCmd([]string{"-m", typechecker, req.Info.ProgramDirectory})
		if err != nil {
			return nil, err
		}
		typecheckerCmd.Stdout = os.Stdout
		typecheckerCmd.Stderr = os.Stderr
		if err := typecheckerCmd.Run(); err != nil {
			return nil, fmt.Errorf("%s failed: %w", typechecker, err)
		}
	}
	var errResult string
	if err := cmd.Run(); err != nil {
		// Python does not explicitly flush standard out or standard error when exiting abnormally. For this reason, we
		// need to explicitly flush our output streams so that, when we exit, the engine picks up the child Python
		// process's stdout and stderr writes.
		//
		// This is especially crucial for Python because it is possible for the child Python process to crash very fast
		// if Pulumi is misconfigured, so we must be sure to present a high-quality error message to the user.
		contract.IgnoreError(os.Stdout.Sync())
		contract.IgnoreError(os.Stderr.Sync())
		if exiterr, ok := err.(*exec.ExitError); ok {
			// If the program ran, but exited with a non-zero error code.  This will happen often, since user
			// errors will trigger this.  So, the error message should look as nice as possible.
			if status, stok := exiterr.Sys().(syscall.WaitStatus); stok {
				switch status.ExitStatus() {
				case 0:
					// This really shouldn't happen, but if it does, we don't want to render "non-zero exit code"
					err = fmt.Errorf("program exited unexpectedly: %w", exiterr)
				case pythonProcessExitedAfterShowingUserActionableMessage:
					return &pulumirpc.RunResponse{Error: "", Bail: true}, nil
				default:
					err = fmt.Errorf("program exited with non-zero exit code: %d", status.ExitStatus())
				}
			} else {
				err = fmt.Errorf("program exited unexpectedly: %w", exiterr)
			}
		} else {
			// Otherwise, we didn't even get to run the program.  This ought to never happen unless there's
			// a bug or system condition that prevented us from running the language exec.  Issue a scarier error.
			err = fmt.Errorf("problem executing program (could not run language executor): %w", err)
		}

		errResult = err.Error()
	}

	return &pulumirpc.RunResponse{Error: errResult}, nil
}

// constructArguments constructs a command-line for `pulumi-language-python`
// by enumerating all of the optional and non-optional arguments present
// in a RunRequest.
func (host *pythonLanguageHost) constructArguments(req *pulumirpc.RunRequest) []string {
	var args []string
	maybeAppendArg := func(k, v string) {
		if v != "" {
			args = append(args, "--"+k, v)
		}
	}

	maybeAppendArg("monitor", req.GetMonitorAddress())
	maybeAppendArg("engine", host.engineAddress)
	maybeAppendArg("project", req.GetProject())
	maybeAppendArg("stack", req.GetStack())
	maybeAppendArg("pwd", req.GetPwd())
	maybeAppendArg("dry_run", strconv.FormatBool(req.GetDryRun()))
	maybeAppendArg("parallel", strconv.Itoa(int(req.GetParallel())))
	maybeAppendArg("tracing", host.tracing)
	maybeAppendArg("organization", req.GetOrganization())

	// The engine should always pass a name for entry point, even if its just "." for the program directory.
	args = append(args, req.Info.EntryPoint)

	args = append(args, req.GetArgs()...)
	return args
}

// constructConfig json-serializes the configuration data given as part of a RunRequest.
func (host *pythonLanguageHost) constructConfig(req *pulumirpc.RunRequest) (string, error) {
	configMap := req.GetConfig()
	if configMap == nil {
		return "", nil
	}

	configJSON, err := json.Marshal(configMap)
	if err != nil {
		return "", err
	}

	return string(configJSON), nil
}

// constructConfigSecretKeys JSON-serializes the list of keys that contain secret values given as part of
// a RunRequest.
func (host *pythonLanguageHost) constructConfigSecretKeys(req *pulumirpc.RunRequest) (string, error) {
	configSecretKeys := req.GetConfigSecretKeys()
	if configSecretKeys == nil {
		return "[]", nil
	}

	configSecretKeysJSON, err := json.Marshal(configSecretKeys)
	if err != nil {
		return "", err
	}

	return string(configSecretKeysJSON), nil
}

func (host *pythonLanguageHost) GetPluginInfo(ctx context.Context, req *emptypb.Empty) (*pulumirpc.PluginInfo, error) {
	return &pulumirpc.PluginInfo{
		Version: version.Version,
	}, nil
}

// validateVersion checks that python is running a valid version. If a version
// is invalid, it prints to os.Stderr. This is interpreted as diagnostic message
// by the Pulumi CLI program.
func validateVersion(ctx context.Context, options toolchain.PythonOptions) {
	var versionCmd *exec.Cmd
	var err error
	versionArgs := []string{"--version"}

	tc, err := toolchain.ResolveToolchain(options)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to configure python toolchain: %s\n", err)
		return
	}

	versionCmd, err = tc.Command(ctx, versionArgs...)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create python version command: %s\n", err)
		return
	}
	var out []byte
	if out, err = versionCmd.Output(); err != nil {
		fmt.Fprintf(os.Stderr, "Failed to resolve python version command: %s\n", err)
		return
	}
	version := strings.TrimSpace(strings.TrimPrefix(string(out), "Python "))
	parsed, err := semver.Parse(version)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to parse python version: '%s'\n", version)
		return
	}
	if parsed.LT(minimumSupportedPythonVersion) {
		fmt.Fprintf(os.Stderr, "Pulumi does not support Python %s."+
			" Please upgrade to at least %s\n", parsed, minimumSupportedPythonVersion)
	} else if parsed.LT(eolPythonVersion) {
		fmt.Fprintf(os.Stderr, "Python %d.%d is approaching EOL and will not be supported in Pulumi soon."+
			" Check %s for more details\n", parsed.Major,
			parsed.Minor, eolPythonVersionIssue)
	}
}

func (host *pythonLanguageHost) InstallDependencies(
	req *pulumirpc.InstallDependenciesRequest, server pulumirpc.LanguageRuntime_InstallDependenciesServer,
) error {
	opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap())
	if err != nil {
		return err
	}

	closer, stdout, stderr, err := rpcutil.MakeInstallDependenciesStreams(server, req.IsTerminal)
	if err != nil {
		return err
	}
	// best effort close, but we try an explicit close and error check at the end as well
	defer closer.Close()

	stdout.Write([]byte("Installing dependencies...\n\n"))

	tc, err := toolchain.ResolveToolchain(opts)
	if err != nil {
		return err
	}
	if err := tc.InstallDependencies(server.Context(), req.Info.ProgramDirectory,
		true /*showOutput*/, stdout, stderr); err != nil {
		return err
	}

	stdout.Write([]byte("Finished installing dependencies\n\n"))

	return closer.Close()
}

func (host *pythonLanguageHost) About(ctx context.Context, req *emptypb.Empty) (*pulumirpc.AboutResponse, error) {
	tc, err := toolchain.ResolveToolchain(toolchain.PythonOptions{
		Toolchain: toolchain.Pip,
	})
	if err != nil {
		return nil, err
	}

	info, err := tc.About(ctx)
	if err != nil {
		return nil, err
	}

	return &pulumirpc.AboutResponse{
		Executable: info.Executable,
		Version:    info.Version,
	}, nil
}

func (host *pythonLanguageHost) GetProgramDependencies(
	ctx context.Context, req *pulumirpc.GetProgramDependenciesRequest,
) (*pulumirpc.GetProgramDependenciesResponse, error) {
	opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap())
	if err != nil {
		return nil, err
	}

	tc, err := toolchain.ResolveToolchain(opts)
	if err != nil {
		return nil, err
	}
	result, err := tc.ListPackages(ctx, req.TransitiveDependencies /* transitive */)
	if err != nil {
		return nil, fmt.Errorf("failed to get python dependencies: %w", err)
	}

	dependencies := make([]*pulumirpc.DependencyInfo, len(result))
	for i, dep := range result {
		dependencies[i] = &pulumirpc.DependencyInfo{
			Name:    dep.Name,
			Version: dep.Version,
		}
	}

	return &pulumirpc.GetProgramDependenciesResponse{
		Dependencies: dependencies,
	}, nil
}

func (host *pythonLanguageHost) RunPlugin(
	req *pulumirpc.RunPluginRequest, server pulumirpc.LanguageRuntime_RunPluginServer,
) error {
	logging.V(5).Infof("Attempting to run python plugin in %s", req.Info.ProgramDirectory)

	opts, err := parseOptions(req.Info.RootDirectory, req.Info.Options.AsMap())
	if err != nil {
		return err
	}

	args := []string{req.Info.ProgramDirectory}
	args = append(args, req.Args...)

	tc, err := toolchain.ResolveToolchain(opts)
	if err != nil {
		return err
	}
	cmd, err := tc.Command(server.Context(), args...)
	if err != nil {
		return err
	}

	closer, stdout, stderr, err := rpcutil.MakeRunPluginStreams(server, false)
	if err != nil {
		return err
	}
	// best effort close, but we try an explicit close and error check at the end as well
	defer closer.Close()

	cmd.Dir = req.Pwd
	cmd.Env = append(cmd.Env, req.Env...)
	cmd.Stdout, cmd.Stderr = stdout, stderr

	if err = cmd.Run(); err != nil {
		var exiterr *exec.ExitError
		if errors.As(err, &exiterr) {
			if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
				return server.Send(&pulumirpc.RunPluginResponse{
					Output: &pulumirpc.RunPluginResponse_Exitcode{Exitcode: int32(status.ExitStatus())},
				})
			}
			return fmt.Errorf("program exited unexpectedly: %w", exiterr)
		}
		return fmt.Errorf("problem executing plugin program (could not run language executor): %w", err)
	}

	return closer.Close()
}

func (host *pythonLanguageHost) GenerateProject(
	ctx context.Context, req *pulumirpc.GenerateProjectRequest,
) (*pulumirpc.GenerateProjectResponse, error) {
	loader, err := schema.NewLoaderClient(req.LoaderTarget)
	if err != nil {
		return nil, err
	}

	var extraOptions []pcl.BindOption
	if !req.Strict {
		extraOptions = append(extraOptions, pcl.NonStrictBindOptions()...)
	}

	// for python, prefer output-versioned invokes
	extraOptions = append(extraOptions, pcl.PreferOutputVersionedInvokes)

	program, diags, err := pcl.BindDirectory(req.SourceDirectory, loader, extraOptions...)
	if err != nil {
		return nil, err
	}
	if diags.HasErrors() {
		rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags)

		return &pulumirpc.GenerateProjectResponse{
			Diagnostics: rpcDiagnostics,
		}, nil
	}

	var project workspace.Project
	if err := json.Unmarshal([]byte(req.Project), &project); err != nil {
		return nil, err
	}

	err = codegen.GenerateProject(req.TargetDirectory, project, program, req.LocalDependencies)
	if err != nil {
		return nil, err
	}

	rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags)

	return &pulumirpc.GenerateProjectResponse{
		Diagnostics: rpcDiagnostics,
	}, nil
}

func (host *pythonLanguageHost) GenerateProgram(
	ctx context.Context, req *pulumirpc.GenerateProgramRequest,
) (*pulumirpc.GenerateProgramResponse, error) {
	loader, err := schema.NewLoaderClient(req.LoaderTarget)
	if err != nil {
		return nil, err
	}

	parser := hclsyntax.NewParser()
	// Load all .pp files in the directory
	for path, contents := range req.Source {
		err = parser.ParseFile(strings.NewReader(contents), path)
		if err != nil {
			return nil, err
		}
		diags := parser.Diagnostics
		if diags.HasErrors() {
			return nil, diags
		}
	}

	program, diags, err := pcl.BindProgram(parser.Files,
		pcl.Loader(loader),
		pcl.PreferOutputVersionedInvokes)
	if err != nil {
		return nil, err
	}

	rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags)
	if diags.HasErrors() {
		return &pulumirpc.GenerateProgramResponse{
			Diagnostics: rpcDiagnostics,
		}, nil
	}
	if program == nil {
		return nil, errors.New("internal error program was nil")
	}

	files, diags, err := codegen.GenerateProgram(program)
	if err != nil {
		return nil, err
	}
	rpcDiagnostics = append(rpcDiagnostics, plugin.HclDiagnosticsToRPCDiagnostics(diags)...)

	return &pulumirpc.GenerateProgramResponse{
		Source:      files,
		Diagnostics: rpcDiagnostics,
	}, nil
}

func (host *pythonLanguageHost) GeneratePackage(
	ctx context.Context, req *pulumirpc.GeneratePackageRequest,
) (*pulumirpc.GeneratePackageResponse, error) {
	loader, err := schema.NewLoaderClient(req.LoaderTarget)
	if err != nil {
		return nil, err
	}

	var spec schema.PackageSpec
	err = json.Unmarshal([]byte(req.Schema), &spec)
	if err != nil {
		return nil, err
	}

	pkg, diags, err := schema.BindSpec(spec, loader)
	if err != nil {
		return nil, err
	}
	rpcDiagnostics := plugin.HclDiagnosticsToRPCDiagnostics(diags)
	if diags.HasErrors() {
		return &pulumirpc.GeneratePackageResponse{
			Diagnostics: rpcDiagnostics,
		}, nil
	}

	if host.useToml {
		var info codegen.PackageInfo
		var ok bool
		if info, ok = pkg.Language["python"].(codegen.PackageInfo); !ok {
			info = codegen.PackageInfo{}
		}
		info.PyProject.Enabled = true
		pkg.Language["python"] = info
	}

	files, err := codegen.GeneratePackage("pulumi-language-python", pkg, req.ExtraFiles)
	if err != nil {
		return nil, err
	}

	for filename, data := range files {
		outPath := filepath.Join(req.Directory, filename)

		err := os.MkdirAll(filepath.Dir(outPath), 0o700)
		if err != nil {
			return nil, fmt.Errorf("could not create output directory %s: %w", filepath.Dir(filename), err)
		}

		err = os.WriteFile(outPath, data, 0o600)
		if err != nil {
			return nil, fmt.Errorf("could not write output file %s: %w", filename, err)
		}
	}

	return &pulumirpc.GeneratePackageResponse{
		Diagnostics: rpcDiagnostics,
	}, nil
}