mirror of https://github.com/pulumi/pulumi.git
2015 lines
71 KiB
Go
2015 lines
71 KiB
Go
// Copyright 2016-2018, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/blang/semver"
|
|
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/opentracing/opentracing-go"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
"google.golang.org/protobuf/types/known/structpb"
|
|
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/promise"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/archive"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/asset"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/slice"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
|
|
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
|
|
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
|
|
)
|
|
|
|
// The `Type()` for the NodeJS dynamic provider. Logically, this is the same as calling
|
|
// providers.MakeProviderType(tokens.Package("pulumi-nodejs")), but does not depend on the providers package
|
|
// (a direct dependency would cause a cyclic import issue.
|
|
//
|
|
// This is needed because we have to handle some buggy behavior that previous versions of this provider implemented.
|
|
const nodejsDynamicProviderType = "pulumi:providers:pulumi-nodejs"
|
|
|
|
// The `Type()` for the Kubernetes provider. Logically, this is the same as calling
|
|
// providers.MakeProviderType(tokens.Package("kubernetes")), but does not depend on the providers package
|
|
// (a direct dependency would cause a cyclic import issue.
|
|
//
|
|
// This is needed because we have to handle some buggy behavior that previous versions of this provider implemented.
|
|
const kubernetesProviderType = "pulumi:providers:kubernetes"
|
|
|
|
// provider reflects a resource plugin, loaded dynamically for a single package.
|
|
type provider struct {
|
|
NotForwardCompatibleProvider
|
|
|
|
ctx *Context // a plugin context for caching, etc.
|
|
pkg tokens.Package // the Pulumi package containing this provider's resources.
|
|
plug *plugin // the actual plugin process wrapper.
|
|
clientRaw pulumirpc.ResourceProviderClient // the raw provider client; usually unsafe to use directly.
|
|
disableProviderPreview bool // true if previews for Create and Update are disabled.
|
|
legacyPreview bool // enables legacy behavior for unconfigured provider previews.
|
|
|
|
configSource *promise.CompletionSource[pluginConfig] // the source for the provider's configuration.
|
|
}
|
|
|
|
// pluginConfig holds the configuration of the provider
|
|
// as specified by the Configure call.
|
|
type pluginConfig struct {
|
|
known bool // true if all configuration values are known.
|
|
|
|
acceptSecrets bool // true if this plugin accepts strongly-typed secrets.
|
|
acceptResources bool // true if this plugin accepts strongly-typed resource refs.
|
|
acceptOutputs bool // true if this plugin accepts output values.
|
|
supportsPreview bool // true if this plugin supports previews for Create and Update.
|
|
}
|
|
|
|
// Checks PULUMI_DEBUG_PROVIDERS environment variable for any overrides for the provider identified
|
|
// by pkg. If the user has requested to attach to a live provider, returns the port number from the
|
|
// env var. For example, `PULUMI_DEBUG_PROVIDERS=aws:12345,gcp:678` will result in 12345 for aws.
|
|
func GetProviderAttachPort(pkg tokens.Package) (*int, error) {
|
|
var optAttach string
|
|
|
|
if providersEnvVar, has := os.LookupEnv("PULUMI_DEBUG_PROVIDERS"); has {
|
|
for _, provider := range strings.Split(providersEnvVar, ",") {
|
|
parts := strings.SplitN(provider, ":", 2)
|
|
|
|
if parts[0] == pkg.String() {
|
|
optAttach = parts[1]
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if optAttach == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
port, err := strconv.Atoi(optAttach)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Expected a numeric port, got %s in PULUMI_DEBUG_PROVIDERS: %w",
|
|
optAttach, err)
|
|
}
|
|
return &port, nil
|
|
}
|
|
|
|
// NewProvider attempts to bind to a given package's resource plugin and then creates a gRPC connection to it. If the
|
|
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
|
|
func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Version,
|
|
options map[string]interface{}, disableProviderPreview bool, jsonConfig string,
|
|
) (Provider, error) {
|
|
// See if this is a provider we just want to attach to
|
|
var plug *plugin
|
|
|
|
attachPort, err := GetProviderAttachPort(pkg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
prefix := fmt.Sprintf("%v (resource)", pkg)
|
|
|
|
if attachPort != nil {
|
|
port := *attachPort
|
|
|
|
conn, err := dialPlugin(port, pkg.String(), prefix, providerPluginDialOptions(ctx, pkg, ""))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Done; store the connection and return the plugin info.
|
|
plug = &plugin{
|
|
Conn: conn,
|
|
// Nothing to kill
|
|
Kill: func() error { return nil },
|
|
}
|
|
} else {
|
|
// Load the plugin's path by using the standard workspace logic.
|
|
path, err := workspace.GetPluginPath(ctx.Diag,
|
|
apitype.ResourcePlugin, strings.ReplaceAll(string(pkg), tokens.QNameDelimiter, "_"),
|
|
version, host.GetProjectPlugins())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
contract.Assertf(path != "", "unexpected empty path for plugin %s", pkg)
|
|
|
|
// Runtime options are passed as environment variables to the provider, this is _currently_ used by
|
|
// dynamic providers to do things like lookup the virtual environment to use.
|
|
env := os.Environ()
|
|
for k, v := range options {
|
|
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
|
|
}
|
|
if jsonConfig != "" {
|
|
env = append(env, "PULUMI_CONFIG="+jsonConfig)
|
|
}
|
|
plug, err = newPlugin(ctx, ctx.Pwd, path, prefix,
|
|
apitype.ResourcePlugin, []string{host.ServerAddr()}, env, providerPluginDialOptions(ctx, pkg, ""))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
contract.Assertf(plug != nil, "unexpected nil resource plugin for %s", pkg)
|
|
|
|
legacyPreview := cmdutil.IsTruthy(os.Getenv("PULUMI_LEGACY_PROVIDER_PREVIEW"))
|
|
|
|
p := &provider{
|
|
ctx: ctx,
|
|
pkg: pkg,
|
|
plug: plug,
|
|
clientRaw: pulumirpc.NewResourceProviderClient(plug.Conn),
|
|
disableProviderPreview: disableProviderPreview,
|
|
legacyPreview: legacyPreview,
|
|
configSource: &promise.CompletionSource[pluginConfig]{},
|
|
}
|
|
|
|
// If we just attached (i.e. plugin bin is nil) we need to call attach
|
|
if plug.Bin == "" {
|
|
err := p.Attach(host.ServerAddr())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func providerPluginDialOptions(ctx *Context, pkg tokens.Package, path string) []grpc.DialOption {
|
|
dialOpts := append(
|
|
rpcutil.OpenTracingInterceptorDialOptions(otgrpc.SpanDecorator(decorateProviderSpans)),
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
rpcutil.GrpcChannelOptions(),
|
|
)
|
|
|
|
if ctx.DialOptions != nil {
|
|
metadata := map[string]interface{}{
|
|
"mode": "client",
|
|
"kind": "resource",
|
|
}
|
|
if pkg != "" {
|
|
metadata["name"] = pkg.String()
|
|
}
|
|
if path != "" {
|
|
metadata["path"] = path
|
|
}
|
|
dialOpts = append(dialOpts, ctx.DialOptions(metadata)...)
|
|
}
|
|
|
|
return dialOpts
|
|
}
|
|
|
|
// NewProviderFromPath creates a new provider by loading the plugin binary located at `path`.
|
|
func NewProviderFromPath(host Host, ctx *Context, path string) (Provider, error) {
|
|
env := os.Environ()
|
|
|
|
plug, err := newPlugin(ctx, ctx.Pwd, path, "",
|
|
apitype.ResourcePlugin, []string{host.ServerAddr()}, env, providerPluginDialOptions(ctx, "", path))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
contract.Assertf(plug != nil, "unexpected nil resource plugin at %q", path)
|
|
|
|
legacyPreview := cmdutil.IsTruthy(os.Getenv("PULUMI_LEGACY_PROVIDER_PREVIEW"))
|
|
|
|
p := &provider{
|
|
ctx: ctx,
|
|
plug: plug,
|
|
clientRaw: pulumirpc.NewResourceProviderClient(plug.Conn),
|
|
legacyPreview: legacyPreview,
|
|
configSource: &promise.CompletionSource[pluginConfig]{},
|
|
}
|
|
|
|
// If we just attached (i.e. plugin bin is nil) we need to call attach
|
|
if plug.Bin == "" {
|
|
err := p.Attach(host.ServerAddr())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func NewProviderWithClient(ctx *Context, pkg tokens.Package, client pulumirpc.ResourceProviderClient,
|
|
disableProviderPreview bool,
|
|
) Provider {
|
|
return &provider{
|
|
ctx: ctx,
|
|
pkg: pkg,
|
|
clientRaw: client,
|
|
disableProviderPreview: disableProviderPreview,
|
|
configSource: &promise.CompletionSource[pluginConfig]{},
|
|
}
|
|
}
|
|
|
|
func (p *provider) Pkg() tokens.Package { return p.pkg }
|
|
|
|
// label returns a base label for tracing functions.
|
|
func (p *provider) label() string {
|
|
return fmt.Sprintf("Provider[%s, %p]", p.pkg, p)
|
|
}
|
|
|
|
func (p *provider) requestContext() context.Context {
|
|
if p.ctx == nil {
|
|
return context.Background()
|
|
}
|
|
return p.ctx.Request()
|
|
}
|
|
|
|
// isDiffCheckConfigLogicallyUnimplemented returns true when an rpcerror.Error should be treated as if it was an error
|
|
// due to a rpc being unimplemented. Due to past mistakes, different providers returned "Unimplemented" in a variaity of
|
|
// different ways that don't always result in an Uimplemented error code.
|
|
func isDiffCheckConfigLogicallyUnimplemented(err *rpcerror.Error, providerType tokens.Type) bool {
|
|
switch string(providerType) {
|
|
// The NodeJS dynamic provider implementation incorrectly returned an empty message instead of properly implementing
|
|
// Diff/CheckConfig. This gets turned into a error with type: "Internal".
|
|
case nodejsDynamicProviderType:
|
|
if err.Code() == codes.Internal {
|
|
logging.V(8).Infof("treating error %s as unimplemented error", err)
|
|
return true
|
|
}
|
|
|
|
// The Kubernetes provider returned an "Unimplmeneted" message, but it did so by returning a status from a different
|
|
// package that the provider was expected. That caused the error to be wrapped with an "Unknown" error.
|
|
case kubernetesProviderType:
|
|
if err.Code() == codes.Unknown && strings.Contains(err.Message(), "Unimplemented") {
|
|
logging.V(8).Infof("treating error %s as unimplemented error", err)
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (p *provider) Parameterize(ctx context.Context, request ParameterizeRequest) (ParameterizeResponse, error) {
|
|
var params pulumirpc.ParameterizeRequest
|
|
switch p := request.Parameters.(type) {
|
|
case *ParameterizeArgs:
|
|
params.Parameters = &pulumirpc.ParameterizeRequest_Args{
|
|
Args: &pulumirpc.ParameterizeRequest_ParametersArgs{
|
|
Args: p.Args,
|
|
},
|
|
}
|
|
case *ParameterizeValue:
|
|
params.Parameters = &pulumirpc.ParameterizeRequest_Value{
|
|
Value: &pulumirpc.ParameterizeRequest_ParametersValue{
|
|
Name: p.Name,
|
|
Version: p.Version.String(),
|
|
Value: p.Value,
|
|
},
|
|
}
|
|
case nil:
|
|
// No args present. That should be Ok.
|
|
default:
|
|
panic(fmt.Sprintf("Impossible - type is constrained to ParameterizeArgs or ParameterizeValue, found %T", p))
|
|
}
|
|
resp, err := p.clientRaw.Parameterize(p.requestContext(), ¶ms)
|
|
if err != nil {
|
|
return ParameterizeResponse{}, err
|
|
}
|
|
version, err := semver.Parse(resp.Version)
|
|
if err != nil {
|
|
return ParameterizeResponse{}, err
|
|
}
|
|
return ParameterizeResponse{Name: resp.Name, Version: version}, err
|
|
}
|
|
|
|
// GetSchema fetches the schema for this resource provider, if any.
|
|
func (p *provider) GetSchema(ctx context.Context, req GetSchemaRequest) (GetSchemaResponse, error) {
|
|
var subpackageVersion string
|
|
if req.SubpackageVersion != nil {
|
|
subpackageVersion = req.SubpackageVersion.String()
|
|
}
|
|
|
|
resp, err := p.clientRaw.GetSchema(p.requestContext(), &pulumirpc.GetSchemaRequest{
|
|
Version: int32(req.Version),
|
|
SubpackageName: req.SubpackageName,
|
|
SubpackageVersion: subpackageVersion,
|
|
})
|
|
if err != nil {
|
|
return GetSchemaResponse{}, err
|
|
}
|
|
return GetSchemaResponse{[]byte(resp.GetSchema())}, nil
|
|
}
|
|
|
|
// CheckConfig validates the configuration for this resource provider.
|
|
func (p *provider) CheckConfig(ctx context.Context, req CheckConfigRequest) (CheckConfigResponse, error) {
|
|
label := fmt.Sprintf("%s.CheckConfig(%s)", p.label(), req.URN)
|
|
logging.V(7).Infof("%s executing (#olds=%d,#news=%d)", label, len(req.Olds), len(req.News))
|
|
|
|
molds, err := MarshalProperties(req.Olds, MarshalOptions{
|
|
Label: label + ".olds",
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
})
|
|
if err != nil {
|
|
return CheckConfigResponse{}, err
|
|
}
|
|
|
|
mnews, err := MarshalProperties(req.News, MarshalOptions{
|
|
Label: label + ".news",
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
})
|
|
if err != nil {
|
|
return CheckConfigResponse{}, err
|
|
}
|
|
|
|
resp, err := p.clientRaw.CheckConfig(p.requestContext(), &pulumirpc.CheckRequest{
|
|
Urn: string(req.URN),
|
|
Olds: molds,
|
|
News: mnews,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
code := rpcError.Code()
|
|
if code == codes.Unimplemented || isDiffCheckConfigLogicallyUnimplemented(rpcError, req.URN.Type()) {
|
|
// For backwards compatibility, just return the news as if the provider was okay with them.
|
|
logging.V(7).Infof("%s unimplemented rpc: returning news as is", label)
|
|
return CheckConfigResponse{Properties: req.News}, nil
|
|
}
|
|
logging.V(8).Infof("%s provider received rpc error `%s`: `%s`", label, rpcError.Code(),
|
|
rpcError.Message())
|
|
return CheckConfigResponse{}, err
|
|
}
|
|
|
|
// Unmarshal the provider inputs.
|
|
var inputs resource.PropertyMap
|
|
if ins := resp.GetInputs(); ins != nil {
|
|
inputs, err = UnmarshalProperties(ins, MarshalOptions{
|
|
Label: label + ".inputs",
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
RejectUnknowns: !req.AllowUnknowns,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return CheckConfigResponse{}, err
|
|
}
|
|
}
|
|
|
|
// And now any properties that failed verification.
|
|
failures := slice.Prealloc[CheckFailure](len(resp.GetFailures()))
|
|
for _, failure := range resp.GetFailures() {
|
|
failures = append(failures, CheckFailure{resource.PropertyKey(failure.Property), failure.Reason})
|
|
}
|
|
|
|
// Copy over any secret annotations, since we could not pass any to the provider, and return.
|
|
annotateSecrets(inputs, req.News)
|
|
logging.V(7).Infof("%s success: inputs=#%d failures=#%d", label, len(inputs), len(failures))
|
|
return CheckConfigResponse{Properties: inputs, Failures: failures}, nil
|
|
}
|
|
|
|
func decodeDetailedDiff(resp *pulumirpc.DiffResponse) map[string]PropertyDiff {
|
|
if !resp.GetHasDetailedDiff() {
|
|
return nil
|
|
}
|
|
|
|
detailedDiff := make(map[string]PropertyDiff)
|
|
for k, v := range resp.GetDetailedDiff() {
|
|
var d DiffKind
|
|
switch v.GetKind() {
|
|
case pulumirpc.PropertyDiff_ADD:
|
|
d = DiffAdd
|
|
case pulumirpc.PropertyDiff_ADD_REPLACE:
|
|
d = DiffAddReplace
|
|
case pulumirpc.PropertyDiff_DELETE:
|
|
d = DiffDelete
|
|
case pulumirpc.PropertyDiff_DELETE_REPLACE:
|
|
d = DiffDeleteReplace
|
|
case pulumirpc.PropertyDiff_UPDATE:
|
|
d = DiffUpdate
|
|
case pulumirpc.PropertyDiff_UPDATE_REPLACE:
|
|
d = DiffUpdateReplace
|
|
default:
|
|
// Consider unknown diff kinds to be simple updates.
|
|
d = DiffUpdate
|
|
}
|
|
detailedDiff[k] = PropertyDiff{
|
|
Kind: d,
|
|
InputDiff: v.GetInputDiff(),
|
|
}
|
|
}
|
|
|
|
return detailedDiff
|
|
}
|
|
|
|
// DiffConfig checks what impacts a hypothetical change to this provider's configuration will have on the provider.
|
|
func (p *provider) DiffConfig(ctx context.Context, req DiffConfigRequest) (DiffConfigResponse, error) {
|
|
label := fmt.Sprintf("%s.DiffConfig(%s)", p.label(), req.URN)
|
|
logging.V(7).Infof("%s: executing (#oldInputs=%d#oldOutputs=%d,#newInputs=%d)",
|
|
label, len(req.OldInputs), len(req.OldOutputs), len(req.NewInputs))
|
|
|
|
mOldInputs, err := MarshalProperties(req.OldInputs, MarshalOptions{
|
|
Label: label + ".oldInputs",
|
|
KeepUnknowns: true,
|
|
})
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
mOldOutputs, err := MarshalProperties(req.OldOutputs, MarshalOptions{
|
|
Label: label + ".oldOutputs",
|
|
KeepUnknowns: true,
|
|
})
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
mNewInputs, err := MarshalProperties(req.NewInputs, MarshalOptions{
|
|
Label: label + ".newInputs",
|
|
KeepUnknowns: true,
|
|
})
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
resp, err := p.clientRaw.DiffConfig(p.requestContext(), &pulumirpc.DiffRequest{
|
|
Urn: string(req.URN),
|
|
OldInputs: mOldInputs,
|
|
Olds: mOldOutputs,
|
|
News: mNewInputs,
|
|
IgnoreChanges: req.IgnoreChanges,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
code := rpcError.Code()
|
|
if code == codes.Unimplemented || isDiffCheckConfigLogicallyUnimplemented(rpcError, req.URN.Type()) {
|
|
logging.V(7).Infof("%s unimplemented rpc: returning DiffUnknown with no replaces", label)
|
|
// In this case, the provider plugin did not implement this and we have to provide some answer:
|
|
//
|
|
// There are two interesting scenarios with the present gRPC interface:
|
|
// 1. Configuration differences in which all properties are known
|
|
// 2. Configuration differences in which some new property is unknown.
|
|
//
|
|
// In both cases, we return a diff result that indicates that the provider _should not_ be replaced.
|
|
// Although this decision is not conservative--indeed, the conservative decision would be to always require
|
|
// replacement of a provider if any input has changed--we believe that it results in the best possible user
|
|
// experience for providers that do not implement DiffConfig functionality. If we took the conservative
|
|
// route here, any change to a provider's configuration (no matter how inconsequential) would cause all of
|
|
// its resources to be replaced. This is clearly a bad experience, and differs from how things worked prior
|
|
// to first-class providers.
|
|
return DiffResult{Changes: DiffUnknown, ReplaceKeys: nil}, nil
|
|
}
|
|
logging.V(8).Infof("%s provider received rpc error `%s`: `%s`", label, rpcError.Code(),
|
|
rpcError.Message())
|
|
// https://github.com/pulumi/pulumi/issues/14529: Old versions of kubernetes would error on this
|
|
// call if "kubeconfig" was set to a file. This didn't cause issues later when the same config was
|
|
// passed to Configure, and for many years silently "worked".
|
|
// https://github.com/pulumi/pulumi/pull/14436 fixed this method to start returning errors which
|
|
// exposed this issue with the kubernetes provider, new versions will be fixed to not error on
|
|
// this (https://github.com/pulumi/pulumi-kubernetes/issues/2663) but so that the CLI continues to
|
|
// work for old versions we have an explicit ignore for this one error here.
|
|
if p.pkg == "kubernetes" &&
|
|
strings.Contains(rpcError.Error(), "cannot unmarshal string into Go value of type struct") {
|
|
logging.V(8).Infof("%s ignoring error from kubernetes provider", label)
|
|
return DiffResult{Changes: DiffUnknown}, nil
|
|
}
|
|
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
replaces := slice.Prealloc[resource.PropertyKey](len(resp.GetReplaces()))
|
|
for _, replace := range resp.GetReplaces() {
|
|
replaces = append(replaces, resource.PropertyKey(replace))
|
|
}
|
|
stables := slice.Prealloc[resource.PropertyKey](len(resp.GetStables()))
|
|
for _, stable := range resp.GetStables() {
|
|
stables = append(stables, resource.PropertyKey(stable))
|
|
}
|
|
diffs := slice.Prealloc[resource.PropertyKey](len(resp.GetDiffs()))
|
|
for _, diff := range resp.GetDiffs() {
|
|
diffs = append(diffs, resource.PropertyKey(diff))
|
|
}
|
|
|
|
changes := resp.GetChanges()
|
|
deleteBeforeReplace := resp.GetDeleteBeforeReplace()
|
|
logging.V(7).Infof("%s success: changes=%d #replaces=%v #stables=%v delbefrepl=%v, diffs=#%v",
|
|
label, changes, replaces, stables, deleteBeforeReplace, diffs)
|
|
|
|
return DiffResult{
|
|
Changes: DiffChanges(changes),
|
|
ReplaceKeys: replaces,
|
|
StableKeys: stables,
|
|
ChangedKeys: diffs,
|
|
DetailedDiff: decodeDetailedDiff(resp),
|
|
DeleteBeforeReplace: deleteBeforeReplace,
|
|
}, nil
|
|
}
|
|
|
|
// annotateSecrets copies the "secretness" from the ins to the outs. If there are values with the same keys for the
|
|
// outs and the ins, if they are both objects, they are transformed recursively. Otherwise, if the value in the ins
|
|
// contains a secret, the entire out value is marked as a secret. This is very close to how we project secrets
|
|
// in the programming model, with one small difference, which is how we treat the case where both are objects. In the
|
|
// programming model, we would say the entire output object is a secret. Here, we actually recur in. We do this because
|
|
// we don't want a single secret value in a rich structure to taint the entire object. Doing so would mean things like
|
|
// the entire value in the deployment would be encrypted instead of a small chunk. It also means the entire property
|
|
// would be displayed as `[secret]` in the CLI instead of a small part.
|
|
//
|
|
// NOTE: This means that for an array, if any value in the input version is a secret, the entire output array is
|
|
// marked as a secret. This is actually a very nice result, because often arrays are treated like sets by providers
|
|
// and the order may not be preserved across an operation. This means we do end up encrypting the entire array
|
|
// but that's better than accidentally leaking a value which just moved to a different location.
|
|
func annotateSecrets(outs, ins resource.PropertyMap) {
|
|
if outs == nil || ins == nil {
|
|
return
|
|
}
|
|
|
|
for key, inValue := range ins {
|
|
outValue, has := outs[key]
|
|
if !has {
|
|
continue
|
|
}
|
|
if outValue.IsObject() && inValue.IsObject() {
|
|
annotateSecrets(outValue.ObjectValue(), inValue.ObjectValue())
|
|
} else if !outValue.IsSecret() && inValue.ContainsSecrets() {
|
|
outs[key] = resource.MakeSecret(outValue)
|
|
}
|
|
}
|
|
}
|
|
|
|
func removeSecrets(v resource.PropertyValue) interface{} {
|
|
switch {
|
|
case v.IsNull():
|
|
return nil
|
|
case v.IsBool():
|
|
return v.BoolValue()
|
|
case v.IsNumber():
|
|
return v.NumberValue()
|
|
case v.IsString():
|
|
return v.StringValue()
|
|
case v.IsArray():
|
|
arr := []interface{}{}
|
|
for _, v := range v.ArrayValue() {
|
|
arr = append(arr, removeSecrets(v))
|
|
}
|
|
return arr
|
|
case v.IsAsset():
|
|
return v.AssetValue()
|
|
case v.IsArchive():
|
|
return v.ArchiveValue()
|
|
case v.IsComputed():
|
|
return v.Input()
|
|
case v.IsOutput():
|
|
return v.OutputValue()
|
|
case v.IsSecret():
|
|
return removeSecrets(v.SecretValue().Element)
|
|
default:
|
|
contract.Assertf(v.IsObject(), "v is not Object '%v' instead", v.TypeString())
|
|
obj := map[string]interface{}{}
|
|
for k, v := range v.ObjectValue() {
|
|
obj[string(k)] = removeSecrets(v)
|
|
}
|
|
return obj
|
|
}
|
|
}
|
|
|
|
func traverseProperty(element resource.PropertyValue, f func(resource.PropertyValue)) {
|
|
f(element)
|
|
if element.IsSecret() {
|
|
traverseSecret(element.SecretValue(), f)
|
|
} else if element.IsObject() {
|
|
traverseMap(element.ObjectValue(), f)
|
|
} else if element.IsArray() {
|
|
traverseArray(element.ArrayValue(), f)
|
|
}
|
|
}
|
|
|
|
func traverseArray(elements []resource.PropertyValue, f func(resource.PropertyValue)) {
|
|
for _, element := range elements {
|
|
traverseProperty(element, f)
|
|
}
|
|
}
|
|
|
|
func traverseSecret(v *resource.Secret, f func(resource.PropertyValue)) {
|
|
traverseProperty(v.Element, f)
|
|
}
|
|
|
|
func traverseMap(m resource.PropertyMap, f func(resource.PropertyValue)) {
|
|
for _, value := range m {
|
|
traverseProperty(value, f)
|
|
}
|
|
}
|
|
|
|
// restoreElidedAssetContents is used to restore contents of assets inside resource property maps after
|
|
// we have skipped serializing contents of assets in order to avoid sending them over the wire to resource
|
|
// providers. Mainly used in `Read` operations after we receive the live inputs from the resource provider plugin.
|
|
// Those inputs may echo back the input assets and the engine writes them out to the state. We need to make sure that
|
|
// we don't write out empty assets to the state, so we restore the asset contents from the original inputs.
|
|
func restoreElidedAssetContents(original resource.PropertyMap, transformed resource.PropertyMap) {
|
|
isEmptyAsset := func(v *asset.Asset) bool {
|
|
return v.Text == "" && v.Path == "" && v.URI == ""
|
|
}
|
|
|
|
isEmptyArchive := func(v *archive.Archive) bool {
|
|
return v.Path == "" && v.URI == "" && v.Assets == nil
|
|
}
|
|
|
|
originalAssets := map[string]*asset.Asset{}
|
|
originalArchives := map[string]*archive.Archive{}
|
|
|
|
traverseMap(original, func(value resource.PropertyValue) {
|
|
if value.IsAsset() {
|
|
originalAsset := value.AssetValue()
|
|
originalAssets[originalAsset.Hash] = originalAsset
|
|
}
|
|
|
|
if value.IsArchive() {
|
|
originalArchive := value.ArchiveValue()
|
|
originalArchives[originalArchive.Hash] = originalArchive
|
|
}
|
|
})
|
|
|
|
traverseMap(transformed, func(value resource.PropertyValue) {
|
|
if value.IsAsset() {
|
|
transformedAsset := value.AssetValue()
|
|
originalAsset, has := originalAssets[transformedAsset.Hash]
|
|
if has && isEmptyAsset(transformedAsset) {
|
|
transformedAsset.Sig = originalAsset.Sig
|
|
transformedAsset.Text = originalAsset.Text
|
|
transformedAsset.Path = originalAsset.Path
|
|
transformedAsset.URI = originalAsset.URI
|
|
}
|
|
}
|
|
|
|
if value.IsArchive() {
|
|
transformedArchive := value.ArchiveValue()
|
|
originalArchive, has := originalArchives[transformedArchive.Hash]
|
|
if has && isEmptyArchive(transformedArchive) {
|
|
transformedArchive.Sig = originalArchive.Sig
|
|
transformedArchive.URI = originalArchive.URI
|
|
transformedArchive.Path = originalArchive.Path
|
|
transformedArchive.Assets = originalArchive.Assets
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// Configure configures the resource provider with "globals" that control its behavior.
|
|
func (p *provider) Configure(ctx context.Context, req ConfigureRequest) (ConfigureResponse, error) {
|
|
label := p.label() + ".Configure()"
|
|
logging.V(7).Infof("%s executing (#vars=%d)", label, len(req.Inputs))
|
|
|
|
// Convert the inputs to a config map. If any are unknown, do not configure the underlying plugin: instead, leave
|
|
// the cfgknown bit unset and carry on.
|
|
config := make(map[string]string)
|
|
for k, v := range req.Inputs {
|
|
if k == "version" {
|
|
continue
|
|
}
|
|
|
|
if v.ContainsUnknowns() {
|
|
p.configSource.MustFulfill(pluginConfig{
|
|
known: false,
|
|
acceptSecrets: false,
|
|
acceptResources: false,
|
|
})
|
|
return ConfigureResponse{}, nil
|
|
}
|
|
|
|
mapped := removeSecrets(v)
|
|
if _, isString := mapped.(string); !isString {
|
|
marshalled, err := json.Marshal(mapped)
|
|
if err != nil {
|
|
err := fmt.Errorf("marshaling configuration property '%v': %w", k, err)
|
|
p.configSource.MustReject(err)
|
|
return ConfigureResponse{}, err
|
|
}
|
|
mapped = string(marshalled)
|
|
}
|
|
|
|
// Pass the older spelling of a configuration key across the RPC interface, for now, to support
|
|
// providers which are on the older plan.
|
|
config[string(p.Pkg())+":config:"+string(k)] = mapped.(string)
|
|
}
|
|
|
|
minputs, err := MarshalProperties(req.Inputs, MarshalOptions{
|
|
Label: label + ".inputs",
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
err := fmt.Errorf("marshaling provider inputs: %w", err)
|
|
p.configSource.MustReject(err)
|
|
return ConfigureResponse{}, err
|
|
}
|
|
|
|
// Spawn the configure to happen in parallel. This ensures that we remain responsive elsewhere that might
|
|
// want to make forward progress, even as the configure call is happening.
|
|
go func() {
|
|
resp, err := p.clientRaw.Configure(p.requestContext(), &pulumirpc.ConfigureRequest{
|
|
AcceptSecrets: true,
|
|
AcceptResources: true,
|
|
SendsOldInputs: true,
|
|
SendsOldInputsToDelete: true,
|
|
Variables: config,
|
|
Args: minputs,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
|
|
err = createConfigureError(rpcError)
|
|
p.configSource.MustReject(err)
|
|
return
|
|
}
|
|
|
|
p.configSource.MustFulfill(pluginConfig{
|
|
known: true,
|
|
acceptSecrets: resp.GetAcceptSecrets(),
|
|
acceptResources: resp.GetAcceptResources(),
|
|
supportsPreview: resp.GetSupportsPreview(),
|
|
acceptOutputs: resp.GetAcceptOutputs(),
|
|
})
|
|
}()
|
|
|
|
return ConfigureResponse{}, nil
|
|
}
|
|
|
|
// Check validates that the given property bag is valid for a resource of the given type.
|
|
func (p *provider) Check(ctx context.Context, req CheckRequest) (CheckResponse, error) {
|
|
label := fmt.Sprintf("%s.Check(%s)", p.label(), req.URN)
|
|
logging.V(7).Infof("%s executing (#olds=%d,#news=%d)", label, len(req.Olds), len(req.News))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return CheckResponse{}, err
|
|
}
|
|
|
|
// If the configuration for this provider was not fully known--e.g. if we are doing a preview and some input
|
|
// property was sourced from another resource's output properties--don't call into the underlying provider.
|
|
if !pcfg.known {
|
|
return CheckResponse{Properties: req.News}, nil
|
|
}
|
|
|
|
molds, err := MarshalProperties(req.Olds, MarshalOptions{
|
|
Label: label + ".olds",
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return CheckResponse{}, err
|
|
}
|
|
mnews, err := MarshalProperties(req.News, MarshalOptions{
|
|
Label: label + ".news",
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return CheckResponse{}, err
|
|
}
|
|
|
|
resp, err := client.Check(p.requestContext(), &pulumirpc.CheckRequest{
|
|
Urn: string(req.URN),
|
|
Olds: molds,
|
|
News: mnews,
|
|
RandomSeed: req.RandomSeed,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
|
|
return CheckResponse{}, rpcError
|
|
}
|
|
|
|
// Unmarshal the provider inputs.
|
|
var inputs resource.PropertyMap
|
|
if ins := resp.GetInputs(); ins != nil {
|
|
inputs, err = UnmarshalProperties(ins, MarshalOptions{
|
|
Label: label + ".inputs",
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
RejectUnknowns: !req.AllowUnknowns,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return CheckResponse{}, err
|
|
}
|
|
}
|
|
|
|
// If we could not pass secrets to the provider, retain the secret bit on any property with the same name. This
|
|
// allows us to retain metadata about secrets in many cases, even for providers that do not understand secrets
|
|
// natively.
|
|
if !pcfg.acceptSecrets {
|
|
annotateSecrets(inputs, req.News)
|
|
}
|
|
|
|
// And now any properties that failed verification.
|
|
failures := slice.Prealloc[CheckFailure](len(resp.GetFailures()))
|
|
for _, failure := range resp.GetFailures() {
|
|
failures = append(failures, CheckFailure{resource.PropertyKey(failure.Property), failure.Reason})
|
|
}
|
|
|
|
logging.V(7).Infof("%s success: inputs=#%d failures=#%d", label, len(inputs), len(failures))
|
|
return CheckResponse{Properties: inputs, Failures: failures}, nil
|
|
}
|
|
|
|
// Diff checks what impacts a hypothetical update will have on the resource's properties.
|
|
func (p *provider) Diff(ctx context.Context, req DiffRequest) (DiffResponse, error) {
|
|
contract.Assertf(req.URN != "", "Diff requires a URN")
|
|
contract.Assertf(req.ID != "", "Diff requires an ID")
|
|
contract.Assertf(req.OldInputs != nil, "Diff requires old input properties")
|
|
contract.Assertf(req.NewInputs != nil, "Diff requires new input properties")
|
|
contract.Assertf(req.OldOutputs != nil, "Diff requires old output properties")
|
|
|
|
label := fmt.Sprintf("%s.Diff(%s,%s)", p.label(), req.URN, req.ID)
|
|
logging.V(7).Infof("%s: executing (#oldInputs=%d#oldOutputs=%d,#newInputs=%d)",
|
|
label, len(req.OldInputs), len(req.OldOutputs), len(req.NewInputs))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
// If the configuration for this provider was not fully known--e.g. if we are doing a preview and some input
|
|
// property was sourced from another resource's output properties--don't call into the underlying provider.
|
|
// Instead, indicate that the diff is unavailable and write a message
|
|
if !pcfg.known {
|
|
logging.V(7).Infof("%s: cannot diff due to unknown config", label)
|
|
const message = "The provider for this resource has inputs that are not known during preview.\n" +
|
|
"This preview may not correctly represent the changes that will be applied during an update."
|
|
return DiffResult{}, DiffUnavailable(message)
|
|
}
|
|
|
|
mOldInputs, err := MarshalProperties(req.OldInputs, MarshalOptions{
|
|
Label: label + ".oldInputs",
|
|
ElideAssetContents: true,
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
mOldOutputs, err := MarshalProperties(req.OldOutputs, MarshalOptions{
|
|
Label: label + ".oldOutputs",
|
|
ElideAssetContents: true,
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
mNewInputs, err := MarshalProperties(req.NewInputs, MarshalOptions{
|
|
Label: label + ".newInputs",
|
|
ElideAssetContents: true,
|
|
KeepUnknowns: req.AllowUnknowns,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return DiffResult{}, err
|
|
}
|
|
|
|
resp, err := client.Diff(p.requestContext(), &pulumirpc.DiffRequest{
|
|
Id: string(req.ID),
|
|
Urn: string(req.URN),
|
|
OldInputs: mOldInputs,
|
|
Olds: mOldOutputs,
|
|
News: mNewInputs,
|
|
IgnoreChanges: req.IgnoreChanges,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: %v", label, rpcError.Message())
|
|
return DiffResult{}, rpcError
|
|
}
|
|
|
|
// nil is semantically important to a lot of the pulumi system so we only pre-allocate if we have non-zero length.
|
|
replaces := slice.Prealloc[resource.PropertyKey](len(resp.GetReplaces()))
|
|
for _, replace := range resp.GetReplaces() {
|
|
replaces = append(replaces, resource.PropertyKey(replace))
|
|
}
|
|
stables := slice.Prealloc[resource.PropertyKey](len(resp.GetStables()))
|
|
for _, stable := range resp.GetStables() {
|
|
stables = append(stables, resource.PropertyKey(stable))
|
|
}
|
|
diffs := slice.Prealloc[resource.PropertyKey](len(resp.GetDiffs()))
|
|
for _, diff := range resp.GetDiffs() {
|
|
diffs = append(diffs, resource.PropertyKey(diff))
|
|
}
|
|
|
|
changes := resp.GetChanges()
|
|
deleteBeforeReplace := resp.GetDeleteBeforeReplace()
|
|
logging.V(7).Infof("%s success: changes=%d #replaces=%v #stables=%v delbefrepl=%v, diffs=#%v, detaileddiff=%v",
|
|
label, changes, replaces, stables, deleteBeforeReplace, diffs, resp.GetDetailedDiff())
|
|
|
|
return DiffResult{
|
|
Changes: DiffChanges(changes),
|
|
ReplaceKeys: replaces,
|
|
StableKeys: stables,
|
|
ChangedKeys: diffs,
|
|
DetailedDiff: decodeDetailedDiff(resp),
|
|
DeleteBeforeReplace: deleteBeforeReplace,
|
|
}, nil
|
|
}
|
|
|
|
// Create allocates a new instance of the provided resource and assigns its unique resource.ID and outputs afterwards.
|
|
func (p *provider) Create(ctx context.Context, req CreateRequest) (CreateResponse, error) {
|
|
contract.Assertf(req.URN != "", "Create requires a URN")
|
|
contract.Assertf(req.Properties != nil, "Create requires properties")
|
|
|
|
label := fmt.Sprintf("%s.Create(%s)", p.label(), req.URN)
|
|
logging.V(7).Infof("%s executing (#props=%v)", label, len(req.Properties))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return CreateResponse{}, err
|
|
}
|
|
|
|
// If this is a preview and the plugin does not support provider previews, or if the configuration for the provider
|
|
// is not fully known, hand back an empty property map. This will force the language SDK will to treat all properties
|
|
// as unknown, which is conservatively correct.
|
|
//
|
|
// If the provider does not support previews, return the inputs as the state. Note that this can cause problems for
|
|
// the language SDKs if there are input and state properties that share a name but expect differently-shaped values.
|
|
if req.Preview {
|
|
// TODO: it would be great to swap the order of these if statements. This would prevent a behavioral change for
|
|
// providers that do not support provider previews, which will always return the inputs as state regardless of
|
|
// whether or not the config is known. Unfortunately, we can't, since the `supportsPreview` bit depends on the
|
|
// result of `Configure`, which we won't call if the `cfgknown` is false. It may be worth fixing this catch-22
|
|
// by extending the provider gRPC interface with a `SupportsFeature` API similar to the language monitor.
|
|
if !pcfg.known {
|
|
if p.legacyPreview {
|
|
return CreateResponse{Properties: req.Properties}, nil
|
|
}
|
|
return CreateResponse{}, nil
|
|
}
|
|
if !pcfg.supportsPreview || p.disableProviderPreview {
|
|
return CreateResponse{Properties: req.Properties}, nil
|
|
}
|
|
}
|
|
|
|
// We should only be calling {Create,Update,Delete} if the provider is fully configured.
|
|
contract.Assertf(pcfg.known, "Create cannot be called if the configuration is unknown")
|
|
|
|
mprops, err := MarshalProperties(req.Properties, MarshalOptions{
|
|
Label: label + ".inputs",
|
|
KeepUnknowns: req.Preview,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return CreateResponse{}, err
|
|
}
|
|
|
|
var id resource.ID
|
|
var liveObject *structpb.Struct
|
|
var resourceError error
|
|
resourceStatus := resource.StatusOK
|
|
resp, err := client.Create(p.requestContext(), &pulumirpc.CreateRequest{
|
|
Urn: string(req.URN),
|
|
Properties: mprops,
|
|
Timeout: req.Timeout,
|
|
Preview: req.Preview,
|
|
})
|
|
if err != nil {
|
|
resourceStatus, id, liveObject, _, resourceError = parseError(err)
|
|
logging.V(7).Infof("%s failed: %v", label, resourceError)
|
|
|
|
if resourceStatus != resource.StatusPartialFailure {
|
|
return CreateResponse{}, resourceError
|
|
}
|
|
// Else it's a `StatusPartialFailure`.
|
|
} else {
|
|
id = resource.ID(resp.GetId())
|
|
liveObject = resp.GetProperties()
|
|
}
|
|
|
|
if id == "" && !req.Preview {
|
|
return CreateResponse{Status: resource.StatusUnknown},
|
|
fmt.Errorf("plugin for package '%v' returned empty resource.ID from create '%v'", p.pkg, req.URN)
|
|
}
|
|
|
|
outs, err := UnmarshalProperties(liveObject, MarshalOptions{
|
|
Label: label + ".outputs",
|
|
RejectUnknowns: !req.Preview,
|
|
KeepUnknowns: req.Preview,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return CreateResponse{Status: resourceStatus}, err
|
|
}
|
|
|
|
// If we could not pass secrets to the provider, retain the secret bit on any property with the same name. This
|
|
// allows us to retain metadata about secrets in many cases, even for providers that do not understand secrets
|
|
// natively.
|
|
if !pcfg.acceptSecrets {
|
|
annotateSecrets(outs, req.Properties)
|
|
}
|
|
|
|
logging.V(7).Infof("%s success: id=%s; #outs=%d", label, id, len(outs))
|
|
return CreateResponse{
|
|
ID: id,
|
|
Properties: outs,
|
|
Status: resourceStatus,
|
|
}, resourceError
|
|
}
|
|
|
|
// read the current live state associated with a resource. enough state must be include in the inputs to uniquely
|
|
// identify the resource; this is typically just the resource id, but may also include some properties.
|
|
func (p *provider) Read(ctx context.Context, req ReadRequest) (ReadResponse, error) {
|
|
contract.Assertf(req.URN != "", "Read URN was empty")
|
|
contract.Assertf(req.ID != "", "Read ID was empty")
|
|
|
|
label := fmt.Sprintf("%s.Read(%s,%s)", p.label(), req.ID, req.URN)
|
|
logging.V(7).Infof("%s executing (#inputs=%v, #state=%v)", label, len(req.Inputs), len(req.State))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return ReadResponse{Status: resource.StatusUnknown}, err
|
|
}
|
|
|
|
// If the provider is not fully configured, return an empty bag.
|
|
if !pcfg.known {
|
|
return ReadResponse{ReadResult{
|
|
Outputs: resource.PropertyMap{},
|
|
Inputs: resource.PropertyMap{},
|
|
}, resource.StatusUnknown}, nil
|
|
}
|
|
|
|
// Marshal the resource inputs and state so we can perform the RPC.
|
|
var minputs *structpb.Struct
|
|
if req.Inputs != nil {
|
|
m, err := MarshalProperties(req.Inputs, MarshalOptions{
|
|
Label: label,
|
|
ElideAssetContents: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return ReadResponse{Status: resource.StatusUnknown}, err
|
|
}
|
|
minputs = m
|
|
}
|
|
mstate, err := MarshalProperties(req.State, MarshalOptions{
|
|
Label: label,
|
|
ElideAssetContents: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return ReadResponse{Status: resource.StatusUnknown}, err
|
|
}
|
|
|
|
// Now issue the read request over RPC, blocking until it finished.
|
|
var readID resource.ID
|
|
var liveObject *structpb.Struct
|
|
var liveInputs *structpb.Struct
|
|
var resourceError error
|
|
resourceStatus := resource.StatusOK
|
|
resp, err := client.Read(p.requestContext(), &pulumirpc.ReadRequest{
|
|
Id: string(req.ID),
|
|
Urn: string(req.URN),
|
|
Properties: mstate,
|
|
Inputs: minputs,
|
|
})
|
|
if err != nil {
|
|
resourceStatus, readID, liveObject, liveInputs, resourceError = parseError(err)
|
|
logging.V(7).Infof("%s failed: %v", label, err)
|
|
|
|
if resourceStatus != resource.StatusPartialFailure {
|
|
return ReadResponse{Status: resourceStatus}, resourceError
|
|
}
|
|
// Else it's a `StatusPartialFailure`.
|
|
} else {
|
|
readID = resource.ID(resp.GetId())
|
|
liveObject = resp.GetProperties()
|
|
liveInputs = resp.GetInputs()
|
|
}
|
|
|
|
// If the resource was missing, simply return a nil property map.
|
|
if string(readID) == "" {
|
|
return ReadResponse{Status: resourceStatus}, nil
|
|
}
|
|
|
|
// Finally, unmarshal the resulting state properties and return them.
|
|
newState, err := UnmarshalProperties(liveObject, MarshalOptions{
|
|
Label: label + ".outputs",
|
|
RejectUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return ReadResponse{Status: resourceStatus}, err
|
|
}
|
|
|
|
var newInputs resource.PropertyMap
|
|
if liveInputs != nil {
|
|
newInputs, err = UnmarshalProperties(liveInputs, MarshalOptions{
|
|
Label: label + ".inputs",
|
|
RejectUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return ReadResponse{Status: resourceStatus}, err
|
|
}
|
|
}
|
|
|
|
// If we could not pass secrets to the provider, retain the secret bit on any property with the same name. This
|
|
// allows us to retain metadata about secrets in many cases, even for providers that do not understand secrets
|
|
// natively.
|
|
if !pcfg.acceptSecrets {
|
|
annotateSecrets(newInputs, req.Inputs)
|
|
annotateSecrets(newState, req.State)
|
|
}
|
|
|
|
// make sure any echoed properties restore their original asset contents if they have not changed
|
|
restoreElidedAssetContents(req.Inputs, newInputs)
|
|
restoreElidedAssetContents(req.Inputs, newState)
|
|
|
|
logging.V(7).Infof("%s success; #outs=%d, #inputs=%d", label, len(newState), len(newInputs))
|
|
return ReadResponse{ReadResult{
|
|
ID: readID,
|
|
Outputs: newState,
|
|
Inputs: newInputs,
|
|
}, resourceStatus}, resourceError
|
|
}
|
|
|
|
// Update updates an existing resource with new values.
|
|
func (p *provider) Update(ctx context.Context, req UpdateRequest) (UpdateResponse, error) {
|
|
contract.Assertf(req.URN != "", "Update requires a URN")
|
|
contract.Assertf(req.ID != "", "Update requires an ID")
|
|
contract.Assertf(req.OldInputs != nil, "Update requires old inputs")
|
|
contract.Assertf(req.OldOutputs != nil, "Update requires old outputs")
|
|
contract.Assertf(req.NewInputs != nil, "Update requires new properties")
|
|
|
|
label := fmt.Sprintf("%s.Update(%s,%s)", p.label(), req.ID, req.URN)
|
|
logging.V(7).Infof("%s executing (#oldInputs=%v,#oldOutputs=%v,#newInputs=%v)",
|
|
label, len(req.OldInputs), len(req.OldOutputs), len(req.NewInputs))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return UpdateResponse{Properties: req.NewInputs, Status: resource.StatusOK}, err
|
|
}
|
|
|
|
// If this is a preview and the plugin does not support provider previews, or if the configuration for the provider
|
|
// is not fully known, hand back an empty property map. This will force the language SDK to treat all properties
|
|
// as unknown, which is conservatively correct.
|
|
//
|
|
// If the provider does not support previews, return the inputs as the state. Note that this can cause problems for
|
|
// the language SDKs if there are input and state properties that share a name but expect differently-shaped values.
|
|
if req.Preview {
|
|
// TODO: it would be great to swap the order of these if statements. This would prevent a behavioral change for
|
|
// providers that do not support provider previews, which will always return the inputs as state regardless of
|
|
// whether or not the config is known. Unfortunately, we can't, since the `supportsPreview` bit depends on the
|
|
// result of `Configure`, which we won't call if the `cfgknown` is false. It may be worth fixing this catch-22
|
|
// by extending the provider gRPC interface with a `SupportsFeature` API similar to the language monitor.
|
|
if !pcfg.known {
|
|
if p.legacyPreview {
|
|
return UpdateResponse{Properties: req.NewInputs, Status: resource.StatusOK}, nil
|
|
}
|
|
return UpdateResponse{Properties: resource.PropertyMap{}, Status: resource.StatusOK}, nil
|
|
}
|
|
if !pcfg.supportsPreview || p.disableProviderPreview {
|
|
return UpdateResponse{Properties: req.NewInputs, Status: resource.StatusOK}, nil
|
|
}
|
|
}
|
|
|
|
// We should only be calling {Create,Update,Delete} if the provider is fully configured.
|
|
contract.Assertf(pcfg.known, "Update cannot be called if the configuration is unknown")
|
|
|
|
mOldInputs, err := MarshalProperties(req.OldInputs, MarshalOptions{
|
|
Label: label + ".oldInputs",
|
|
ElideAssetContents: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return UpdateResponse{Status: resource.StatusOK}, err
|
|
}
|
|
mOldOutputs, err := MarshalProperties(req.OldOutputs, MarshalOptions{
|
|
Label: label + ".oldOutputs",
|
|
ElideAssetContents: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return UpdateResponse{Status: resource.StatusOK}, err
|
|
}
|
|
mNewInputs, err := MarshalProperties(req.NewInputs, MarshalOptions{
|
|
Label: label + ".newInputs",
|
|
KeepUnknowns: req.Preview,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return UpdateResponse{Status: resource.StatusOK}, err
|
|
}
|
|
|
|
var liveObject *structpb.Struct
|
|
var resourceError error
|
|
resourceStatus := resource.StatusOK
|
|
resp, err := client.Update(p.requestContext(), &pulumirpc.UpdateRequest{
|
|
Id: string(req.ID),
|
|
Urn: string(req.URN),
|
|
Olds: mOldOutputs,
|
|
News: mNewInputs,
|
|
Timeout: req.Timeout,
|
|
IgnoreChanges: req.IgnoreChanges,
|
|
Preview: req.Preview,
|
|
OldInputs: mOldInputs,
|
|
})
|
|
if err != nil {
|
|
resourceStatus, _, liveObject, _, resourceError = parseError(err)
|
|
logging.V(7).Infof("%s failed: %v", label, resourceError)
|
|
|
|
if resourceStatus != resource.StatusPartialFailure {
|
|
return UpdateResponse{Status: resourceStatus}, resourceError
|
|
}
|
|
// Else it's a `StatusPartialFailure`.
|
|
} else {
|
|
liveObject = resp.GetProperties()
|
|
}
|
|
|
|
outs, err := UnmarshalProperties(liveObject, MarshalOptions{
|
|
Label: label + ".outputs",
|
|
RejectUnknowns: !req.Preview,
|
|
KeepUnknowns: req.Preview,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return UpdateResponse{Status: resourceStatus}, err
|
|
}
|
|
|
|
// If we could not pass secrets to the provider, retain the secret bit on any property with the same name. This
|
|
// allows us to retain metadata about secrets in many cases, even for providers that do not understand secrets
|
|
// natively.
|
|
if !pcfg.acceptSecrets {
|
|
annotateSecrets(outs, req.NewInputs)
|
|
}
|
|
logging.V(7).Infof("%s success; #outs=%d", label, len(outs))
|
|
|
|
return UpdateResponse{Properties: outs, Status: resourceStatus}, resourceError
|
|
}
|
|
|
|
// Delete tears down an existing resource.
|
|
func (p *provider) Delete(ctx context.Context, req DeleteRequest) (DeleteResponse, error) {
|
|
contract.Assertf(req.URN != "", "Delete requires a URN")
|
|
contract.Assertf(req.ID != "", "Delete requires an ID")
|
|
|
|
label := fmt.Sprintf("%s.Delete(%s,%s)", p.label(), req.URN, req.ID)
|
|
logging.V(7).Infof("%s executing (#inputs=%d, #outputs=%d)", label, len(req.Inputs), len(req.Outputs))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return DeleteResponse{}, err
|
|
}
|
|
|
|
// We should never call delete at preview time, so we should never see unknowns here
|
|
contract.Assertf(pcfg.known, "Delete cannot be called if the configuration is unknown")
|
|
|
|
minputs, err := MarshalProperties(req.Inputs, MarshalOptions{
|
|
Label: label,
|
|
ElideAssetContents: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return DeleteResponse{}, err
|
|
}
|
|
|
|
moutputs, err := MarshalProperties(req.Outputs, MarshalOptions{
|
|
Label: label,
|
|
ElideAssetContents: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return DeleteResponse{}, err
|
|
}
|
|
|
|
// We should only be calling {Create,Update,Delete} if the provider is fully configured.
|
|
contract.Assertf(pcfg.known, "Delete cannot be called if the configuration is unknown")
|
|
|
|
if _, err := client.Delete(p.requestContext(), &pulumirpc.DeleteRequest{
|
|
Id: string(req.ID),
|
|
Urn: string(req.URN),
|
|
Properties: moutputs,
|
|
Timeout: req.Timeout,
|
|
OldInputs: minputs,
|
|
}); err != nil {
|
|
resourceStatus, rpcErr := resourceStateAndError(err)
|
|
logging.V(7).Infof("%s failed: %v", label, rpcErr)
|
|
return DeleteResponse{Status: resourceStatus}, rpcErr
|
|
}
|
|
|
|
logging.V(7).Infof("%s success", label)
|
|
return DeleteResponse{Status: resource.StatusOK}, err
|
|
}
|
|
|
|
// Construct creates a new component resource from the given type, name, parent, options, and inputs, and returns
|
|
// its URN and outputs.
|
|
func (p *provider) Construct(ctx context.Context, req ConstructRequest) (ConstructResponse, error) {
|
|
contract.Assertf(req.Type != "", "Construct requires a type")
|
|
contract.Assertf(req.Name != "", "Construct requires a name")
|
|
contract.Assertf(req.Inputs != nil, "Construct requires input properties")
|
|
|
|
label := fmt.Sprintf("%s.Construct(%s, %s, %s)", p.label(), req.Type, req.Name, req.Parent)
|
|
logging.V(7).Infof("%s executing (#inputs=%v)", label, len(req.Inputs))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(ctx)
|
|
if err != nil {
|
|
return ConstructResult{}, err
|
|
}
|
|
|
|
// If the provider is not fully configured, we need to error. We can't support unknown URNs but if the
|
|
// provider isn't configured we can't call into it to get the URN.
|
|
if !pcfg.known {
|
|
return ConstructResult{}, errors.New("cannot construct components if the provider is configured with unknown values")
|
|
}
|
|
|
|
if !pcfg.acceptSecrets {
|
|
return ConstructResult{}, errors.New("plugins that can construct components must support secrets")
|
|
}
|
|
|
|
// Marshal the input properties.
|
|
minputs, err := MarshalProperties(req.Inputs, MarshalOptions{
|
|
Label: label + ".inputs",
|
|
KeepUnknowns: true,
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
// To initially scope the use of this new feature, we only keep output values for
|
|
// Construct and Call (when the client accepts them).
|
|
KeepOutputValues: pcfg.acceptOutputs,
|
|
})
|
|
if err != nil {
|
|
return ConstructResult{}, err
|
|
}
|
|
|
|
// Marshal the aliases.
|
|
aliasURNs := make([]string, len(req.Options.Aliases))
|
|
for i, alias := range req.Options.Aliases {
|
|
aliasURNs[i] = string(alias.URN)
|
|
}
|
|
|
|
// Marshal the dependencies.
|
|
dependencies := make([]string, len(req.Options.Dependencies))
|
|
for i, dep := range req.Options.Dependencies {
|
|
dependencies[i] = string(dep)
|
|
}
|
|
|
|
// Marshal the property dependencies.
|
|
inputDependencies := map[string]*pulumirpc.ConstructRequest_PropertyDependencies{}
|
|
for name, dependencies := range req.Options.PropertyDependencies {
|
|
urns := make([]string, len(dependencies))
|
|
for i, urn := range dependencies {
|
|
urns[i] = string(urn)
|
|
}
|
|
inputDependencies[string(name)] = &pulumirpc.ConstructRequest_PropertyDependencies{Urns: urns}
|
|
}
|
|
|
|
// Marshal the config.
|
|
config := map[string]string{}
|
|
for k, v := range req.Info.Config {
|
|
config[k.String()] = v
|
|
}
|
|
configSecretKeys := []string{}
|
|
for _, k := range req.Info.ConfigSecretKeys {
|
|
configSecretKeys = append(configSecretKeys, k.String())
|
|
}
|
|
|
|
rpcReq := &pulumirpc.ConstructRequest{
|
|
Project: req.Info.Project,
|
|
Stack: req.Info.Stack,
|
|
Config: config,
|
|
ConfigSecretKeys: configSecretKeys,
|
|
DryRun: req.Info.DryRun,
|
|
Parallel: int32(req.Info.Parallel),
|
|
MonitorEndpoint: req.Info.MonitorAddress,
|
|
Type: string(req.Type),
|
|
Name: req.Name,
|
|
Parent: string(req.Parent),
|
|
Inputs: minputs,
|
|
Protect: req.Options.Protect,
|
|
Providers: req.Options.Providers,
|
|
InputDependencies: inputDependencies,
|
|
Aliases: aliasURNs,
|
|
Dependencies: dependencies,
|
|
AdditionalSecretOutputs: req.Options.AdditionalSecretOutputs,
|
|
DeletedWith: string(req.Options.DeletedWith),
|
|
DeleteBeforeReplace: req.Options.DeleteBeforeReplace,
|
|
IgnoreChanges: req.Options.IgnoreChanges,
|
|
ReplaceOnChanges: req.Options.ReplaceOnChanges,
|
|
RetainOnDelete: req.Options.RetainOnDelete,
|
|
AcceptsOutputValues: true,
|
|
}
|
|
if ct := req.Options.CustomTimeouts; ct != nil {
|
|
rpcReq.CustomTimeouts = &pulumirpc.ConstructRequest_CustomTimeouts{
|
|
Create: ct.Create,
|
|
Update: ct.Update,
|
|
Delete: ct.Delete,
|
|
}
|
|
}
|
|
|
|
resp, err := client.Construct(p.requestContext(), rpcReq)
|
|
if err != nil {
|
|
return ConstructResult{}, err
|
|
}
|
|
|
|
outputs, err := UnmarshalProperties(resp.GetState(), MarshalOptions{
|
|
Label: label + ".outputs",
|
|
KeepUnknowns: req.Info.DryRun,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
KeepOutputValues: true,
|
|
})
|
|
if err != nil {
|
|
return ConstructResult{}, err
|
|
}
|
|
|
|
outputDependencies := map[resource.PropertyKey][]resource.URN{}
|
|
for k, rpcDeps := range resp.GetStateDependencies() {
|
|
urns := make([]resource.URN, len(rpcDeps.Urns))
|
|
for i, d := range rpcDeps.Urns {
|
|
urns[i] = resource.URN(d)
|
|
}
|
|
outputDependencies[resource.PropertyKey(k)] = urns
|
|
}
|
|
|
|
logging.V(7).Infof("%s success: #outputs=%d", label, len(outputs))
|
|
return ConstructResponse{
|
|
URN: resource.URN(resp.GetUrn()),
|
|
Outputs: outputs,
|
|
OutputDependencies: outputDependencies,
|
|
}, nil
|
|
}
|
|
|
|
// Invoke dynamically executes a built-in function in the provider.
|
|
func (p *provider) Invoke(ctx context.Context, req InvokeRequest) (InvokeResponse, error) {
|
|
contract.Assertf(req.Tok != "", "Invoke requires a token")
|
|
|
|
label := fmt.Sprintf("%s.Invoke(%s)", p.label(), req.Tok)
|
|
logging.V(7).Infof("%s executing (#args=%d)", label, len(req.Args))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(ctx)
|
|
if err != nil {
|
|
return InvokeResponse{}, err
|
|
}
|
|
|
|
// If the provider is not fully configured, return an empty property map.
|
|
if !pcfg.known {
|
|
return InvokeResponse{Properties: resource.PropertyMap{}}, nil
|
|
}
|
|
|
|
margs, err := MarshalProperties(req.Args, MarshalOptions{
|
|
Label: label + ".args",
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return InvokeResponse{}, err
|
|
}
|
|
|
|
resp, err := client.Invoke(p.requestContext(), &pulumirpc.InvokeRequest{
|
|
Tok: string(req.Tok),
|
|
Args: margs,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: %v", label, rpcError.Message())
|
|
return InvokeResponse{}, rpcError
|
|
}
|
|
|
|
// Unmarshal any return values.
|
|
ret, err := UnmarshalProperties(resp.GetReturn(), MarshalOptions{
|
|
Label: label + ".returns",
|
|
RejectUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return InvokeResponse{}, err
|
|
}
|
|
|
|
// And now any properties that failed verification.
|
|
failures := slice.Prealloc[CheckFailure](len(resp.GetFailures()))
|
|
for _, failure := range resp.GetFailures() {
|
|
failures = append(failures, CheckFailure{resource.PropertyKey(failure.Property), failure.Reason})
|
|
}
|
|
|
|
logging.V(7).Infof("%s success (#ret=%d,#failures=%d) success", label, len(ret), len(failures))
|
|
return InvokeResponse{
|
|
Properties: ret,
|
|
Failures: failures,
|
|
}, nil
|
|
}
|
|
|
|
// StreamInvoke dynamically executes a built-in function in the provider, which returns a stream of
|
|
// responses.
|
|
func (p *provider) StreamInvoke(ctx context.Context, req StreamInvokeRequest) (StreamInvokeResponse, error) {
|
|
contract.Assertf(req.Tok != "", "StreamInvoke requires a token")
|
|
|
|
label := fmt.Sprintf("%s.StreamInvoke(%s)", p.label(), req.Tok)
|
|
logging.V(7).Infof("%s executing (#args=%d)", label, len(req.Args))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return StreamInvokeResponse{}, err
|
|
}
|
|
|
|
// If the provider is not fully configured, return an empty property map.
|
|
if !pcfg.known {
|
|
return StreamInvokeResponse{}, req.OnNext(resource.PropertyMap{})
|
|
}
|
|
|
|
margs, err := MarshalProperties(req.Args, MarshalOptions{
|
|
Label: label + ".args",
|
|
KeepSecrets: pcfg.acceptSecrets,
|
|
KeepResources: pcfg.acceptResources,
|
|
})
|
|
if err != nil {
|
|
return StreamInvokeResponse{}, err
|
|
}
|
|
|
|
streamClient, err := client.StreamInvoke(p.requestContext(), &pulumirpc.InvokeRequest{
|
|
Tok: string(req.Tok),
|
|
Args: margs,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: %v", label, rpcError.Message())
|
|
return StreamInvokeResponse{}, rpcError
|
|
}
|
|
|
|
for {
|
|
in, err := streamClient.Recv()
|
|
if err == io.EOF {
|
|
return StreamInvokeResponse{}, nil
|
|
}
|
|
if err != nil {
|
|
return StreamInvokeResponse{}, err
|
|
}
|
|
|
|
// Unmarshal response.
|
|
ret, err := UnmarshalProperties(in.GetReturn(), MarshalOptions{
|
|
Label: label + ".returns",
|
|
RejectUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
})
|
|
if err != nil {
|
|
return StreamInvokeResponse{}, err
|
|
}
|
|
|
|
// Check properties that failed verification.
|
|
var failures []CheckFailure
|
|
for _, failure := range in.GetFailures() {
|
|
failures = append(failures, CheckFailure{resource.PropertyKey(failure.Property), failure.Reason})
|
|
}
|
|
|
|
if len(failures) > 0 {
|
|
return StreamInvokeResponse{Failures: failures}, nil
|
|
}
|
|
|
|
// Send stream message back to whoever is consuming the stream.
|
|
if err := req.OnNext(ret); err != nil {
|
|
return StreamInvokeResponse{}, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Call dynamically executes a method in the provider associated with a component resource.
|
|
func (p *provider) Call(_ context.Context, req CallRequest) (CallResponse, error) {
|
|
contract.Assertf(req.Tok != "", "Call requires a token")
|
|
|
|
label := fmt.Sprintf("%s.Call(%s)", p.label(), req.Tok)
|
|
logging.V(7).Infof("%s executing (#args=%d)", label, len(req.Args))
|
|
|
|
// Ensure that the plugin is configured.
|
|
client := p.clientRaw
|
|
pcfg, err := p.configSource.Promise().Result(context.Background())
|
|
if err != nil {
|
|
return CallResult{}, err
|
|
}
|
|
|
|
// If the provider is not fully configured, return an empty property map.
|
|
if !pcfg.known {
|
|
return CallResult{}, nil
|
|
}
|
|
|
|
margs, err := MarshalProperties(req.Args, MarshalOptions{
|
|
Label: label + ".args",
|
|
KeepUnknowns: true,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
// To initially scope the use of this new feature, we only keep output values for
|
|
// Construct and Call (when the client accepts them).
|
|
KeepOutputValues: pcfg.acceptOutputs,
|
|
})
|
|
if err != nil {
|
|
return CallResult{}, err
|
|
}
|
|
|
|
// Marshal the arg dependencies.
|
|
argDependencies := map[string]*pulumirpc.CallRequest_ArgumentDependencies{}
|
|
for name, dependencies := range req.Options.ArgDependencies {
|
|
urns := make([]string, len(dependencies))
|
|
for i, urn := range dependencies {
|
|
urns[i] = string(urn)
|
|
}
|
|
argDependencies[string(name)] = &pulumirpc.CallRequest_ArgumentDependencies{Urns: urns}
|
|
}
|
|
|
|
// Marshal the config.
|
|
config := map[string]string{}
|
|
for k, v := range req.Info.Config {
|
|
config[k.String()] = v
|
|
}
|
|
|
|
resp, err := client.Call(p.requestContext(), &pulumirpc.CallRequest{
|
|
Tok: string(req.Tok),
|
|
Args: margs,
|
|
ArgDependencies: argDependencies,
|
|
Project: req.Info.Project,
|
|
Stack: req.Info.Stack,
|
|
Config: config,
|
|
DryRun: req.Info.DryRun,
|
|
Parallel: int32(req.Info.Parallel),
|
|
MonitorEndpoint: req.Info.MonitorAddress,
|
|
AcceptsOutputValues: true,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: %v", label, rpcError.Message())
|
|
return CallResult{}, rpcError
|
|
}
|
|
|
|
// Unmarshal any return values.
|
|
ret, err := UnmarshalProperties(resp.GetReturn(), MarshalOptions{
|
|
Label: label + ".returns",
|
|
KeepUnknowns: req.Info.DryRun,
|
|
KeepSecrets: true,
|
|
KeepResources: true,
|
|
KeepOutputValues: true,
|
|
})
|
|
if err != nil {
|
|
return CallResult{}, err
|
|
}
|
|
|
|
returnDependencies := map[resource.PropertyKey][]resource.URN{}
|
|
for k, rpcDeps := range resp.GetReturnDependencies() {
|
|
urns := make([]resource.URN, len(rpcDeps.Urns))
|
|
for i, d := range rpcDeps.Urns {
|
|
urns[i] = resource.URN(d)
|
|
}
|
|
returnDependencies[resource.PropertyKey(k)] = urns
|
|
}
|
|
|
|
// And now any properties that failed verification.
|
|
failures := slice.Prealloc[CheckFailure](len(resp.GetFailures()))
|
|
for _, failure := range resp.GetFailures() {
|
|
failures = append(failures, CheckFailure{resource.PropertyKey(failure.Property), failure.Reason})
|
|
}
|
|
|
|
logging.V(7).Infof("%s success (#ret=%d,#failures=%d) success", label, len(ret), len(failures))
|
|
return CallResult{Return: ret, ReturnDependencies: returnDependencies, Failures: failures}, nil
|
|
}
|
|
|
|
// GetPluginInfo returns this plugin's information.
|
|
func (p *provider) GetPluginInfo(ctx context.Context) (workspace.PluginInfo, error) {
|
|
label := p.label() + ".GetPluginInfo()"
|
|
logging.V(7).Infof("%s executing", label)
|
|
|
|
// Calling GetPluginInfo happens immediately after loading, and does not require configuration to proceed.
|
|
// Thus, we access the clientRaw property, rather than calling getClient.
|
|
resp, err := p.clientRaw.GetPluginInfo(p.requestContext(), &emptypb.Empty{})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
|
|
return workspace.PluginInfo{}, rpcError
|
|
}
|
|
|
|
var version *semver.Version
|
|
if v := resp.Version; v != "" {
|
|
sv, err := semver.ParseTolerant(v)
|
|
if err != nil {
|
|
return workspace.PluginInfo{}, err
|
|
}
|
|
version = &sv
|
|
}
|
|
|
|
path := ""
|
|
if p.plug != nil {
|
|
path = p.plug.Bin
|
|
}
|
|
|
|
logging.V(7).Infof("%s success (#version=%v) success", label, version)
|
|
return workspace.PluginInfo{
|
|
Name: string(p.pkg),
|
|
Path: path,
|
|
Kind: apitype.ResourcePlugin,
|
|
Version: version,
|
|
}, nil
|
|
}
|
|
|
|
// Attach attaches this plugin to the engine
|
|
func (p *provider) Attach(address string) error {
|
|
label := p.label() + ".Attach()"
|
|
logging.V(7).Infof("%s executing", label)
|
|
|
|
// Calling Attach happens immediately after loading, and does not require configuration to proceed.
|
|
// Thus, we access the clientRaw property, rather than calling getClient.
|
|
_, err := p.clientRaw.Attach(p.requestContext(), &pulumirpc.PluginAttach{Address: address})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
|
|
return rpcError
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *provider) SignalCancellation(ctx context.Context) error {
|
|
_, err := p.clientRaw.Cancel(p.requestContext(), &emptypb.Empty{})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(8).Infof("provider received rpc error `%s`: `%s`", rpcError.Code(),
|
|
rpcError.Message())
|
|
if rpcError.Code() == codes.Unimplemented {
|
|
// For backwards compatibility, do nothing if it's not implemented.
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Close tears down the underlying plugin RPC connection and process.
|
|
func (p *provider) Close() error {
|
|
if p.plug == nil {
|
|
return nil
|
|
}
|
|
return p.plug.Close()
|
|
}
|
|
|
|
// createConfigureError creates a nice error message from an RPC error that
|
|
// originated from `Configure`.
|
|
//
|
|
// If we requested that a resource configure itself but omitted required configuration
|
|
// variables, resource providers will respond with a list of missing variables and their descriptions.
|
|
// If that is what occurred, we'll use that information here to construct a nice error message.
|
|
func createConfigureError(rpcerr *rpcerror.Error) error {
|
|
var err error
|
|
for _, detail := range rpcerr.Details() {
|
|
if missingKeys, ok := detail.(*pulumirpc.ConfigureErrorMissingKeys); ok {
|
|
for _, missingKey := range missingKeys.MissingKeys {
|
|
singleError := fmt.Errorf("missing required configuration key \"%s\": %s\n"+
|
|
"Set a value using the command `pulumi config set %s <value>`.",
|
|
missingKey.Name, missingKey.Description, missingKey.Name)
|
|
err = multierror.Append(err, singleError)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return rpcerr
|
|
}
|
|
|
|
// resourceStateAndError interprets an error obtained from a gRPC endpoint.
|
|
//
|
|
// gRPC gives us a `status.Status` structure as an `error` whenever our
|
|
// gRPC servers serve up an error. Each `status.Status` contains a code
|
|
// and a message. Based on the error code given to us, we can understand
|
|
// the state of our system and if our resource status is truly unknown.
|
|
//
|
|
// In general, our resource state is only really unknown if the server
|
|
// had an internal error, in which case it will serve one of `codes.Internal`,
|
|
// `codes.DataLoss`, or `codes.Unknown` to us.
|
|
func resourceStateAndError(err error) (resource.Status, *rpcerror.Error) {
|
|
rpcError := rpcerror.Convert(err)
|
|
logging.V(8).Infof("provider received rpc error `%s`: `%s`", rpcError.Code(), rpcError.Message())
|
|
//nolint:exhaustive // We want to handle only some error codes specially
|
|
switch rpcError.Code() {
|
|
case codes.Internal, codes.DataLoss, codes.Unknown:
|
|
logging.V(8).Infof("rpc error kind `%s` may not be recoverable", rpcError.Code())
|
|
return resource.StatusUnknown, rpcError
|
|
}
|
|
|
|
logging.V(8).Infof("rpc error kind `%s` is well-understood and recoverable", rpcError.Code())
|
|
return resource.StatusOK, rpcError
|
|
}
|
|
|
|
// parseError parses a gRPC error into a set of values that represent the state of a resource. They
|
|
// are: (1) the `resourceStatus`, indicating the last known state (e.g., `StatusOK`, representing
|
|
// success, `StatusUnknown`, representing internal failure); (2) the `*rpcerror.Error`, our internal
|
|
// representation for RPC errors; and optionally (3) `liveObject`, containing the last known live
|
|
// version of the object that has successfully created but failed to initialize (e.g., because the
|
|
// object was created, but app code is continually crashing and the resource never achieves
|
|
// liveness).
|
|
func parseError(err error) (
|
|
resourceStatus resource.Status, id resource.ID, liveInputs, liveObject *structpb.Struct, resourceErr error,
|
|
) {
|
|
var responseErr *rpcerror.Error
|
|
resourceStatus, responseErr = resourceStateAndError(err)
|
|
contract.Assertf(responseErr != nil, "resourceStateAndError must never return a nil error")
|
|
|
|
// If resource was successfully created but failed to initialize, the error will be packed
|
|
// with the live properties of the object.
|
|
resourceErr = responseErr
|
|
for _, detail := range responseErr.Details() {
|
|
if initErr, ok := detail.(*pulumirpc.ErrorResourceInitFailed); ok {
|
|
id = resource.ID(initErr.GetId())
|
|
liveObject = initErr.GetProperties()
|
|
liveInputs = initErr.GetInputs()
|
|
resourceStatus = resource.StatusPartialFailure
|
|
resourceErr = &InitError{Reasons: initErr.Reasons}
|
|
break
|
|
}
|
|
}
|
|
|
|
return resourceStatus, id, liveObject, liveInputs, resourceErr
|
|
}
|
|
|
|
// InitError represents a failure to initialize a resource, i.e., the resource has been successfully
|
|
// created, but it has failed to initialize.
|
|
type InitError struct {
|
|
Reasons []string
|
|
}
|
|
|
|
var _ error = (*InitError)(nil)
|
|
|
|
func (ie *InitError) Error() string {
|
|
var err error
|
|
for _, reason := range ie.Reasons {
|
|
err = multierror.Append(err, errors.New(reason))
|
|
}
|
|
if err == nil {
|
|
return "resource init failed"
|
|
}
|
|
return err.Error()
|
|
}
|
|
|
|
func decorateSpanWithType(span opentracing.Span, urn string) {
|
|
if urn := resource.URN(urn); urn.IsValid() {
|
|
span.SetTag("pulumi-decorator", urn.Type())
|
|
}
|
|
}
|
|
|
|
func decorateProviderSpans(span opentracing.Span, method string, req, resp interface{}, grpcError error) {
|
|
if req == nil {
|
|
return
|
|
}
|
|
|
|
switch method {
|
|
case "/pulumirpc.ResourceProvider/Check", "/pulumirpc.ResourceProvider/CheckConfig":
|
|
decorateSpanWithType(span, req.(*pulumirpc.CheckRequest).Urn)
|
|
case "/pulumirpc.ResourceProvider/Diff", "/pulumirpc.ResourceProvider/DiffConfig":
|
|
decorateSpanWithType(span, req.(*pulumirpc.DiffRequest).Urn)
|
|
case "/pulumirpc.ResourceProvider/Create":
|
|
decorateSpanWithType(span, req.(*pulumirpc.CreateRequest).Urn)
|
|
case "/pulumirpc.ResourceProvider/Update":
|
|
decorateSpanWithType(span, req.(*pulumirpc.UpdateRequest).Urn)
|
|
case "/pulumirpc.ResourceProvider/Delete":
|
|
decorateSpanWithType(span, req.(*pulumirpc.DeleteRequest).Urn)
|
|
case "/pulumirpc.ResourceProvider/Invoke":
|
|
span.SetTag("pulumi-decorator", req.(*pulumirpc.InvokeRequest).Tok)
|
|
}
|
|
}
|
|
|
|
// GetMapping fetches the conversion mapping (if any) for this resource provider.
|
|
func (p *provider) GetMapping(ctx context.Context, req GetMappingRequest) (GetMappingResponse, error) {
|
|
label := p.label() + ".GetMapping"
|
|
logging.V(7).Infof("%s executing: key=%s, provider=%s", label, req.Key, req.Provider)
|
|
|
|
resp, err := p.clientRaw.GetMapping(p.requestContext(), &pulumirpc.GetMappingRequest{
|
|
Key: req.Key,
|
|
Provider: req.Provider,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
code := rpcError.Code()
|
|
if code == codes.Unimplemented {
|
|
// For backwards compatibility, just return nothing as if the provider didn't have a mapping for
|
|
// the given key
|
|
logging.V(7).Infof("%s unimplemented", label)
|
|
return GetMappingResponse{}, nil
|
|
}
|
|
logging.V(7).Infof("%s failed: %v", label, rpcError)
|
|
return GetMappingResponse{}, err
|
|
}
|
|
|
|
logging.V(7).Infof("%s success: data=#%d provider=%s", label, len(resp.Data), resp.Provider)
|
|
return GetMappingResponse{
|
|
Data: resp.Data,
|
|
Provider: resp.Provider,
|
|
}, nil
|
|
}
|
|
|
|
func (p *provider) GetMappings(ctx context.Context, req GetMappingsRequest) (GetMappingsResponse, error) {
|
|
label := p.label() + ".GetMappings"
|
|
logging.V(7).Infof("%s executing: key=%s", label, req.Key)
|
|
|
|
resp, err := p.clientRaw.GetMappings(p.requestContext(), &pulumirpc.GetMappingsRequest{
|
|
Key: req.Key,
|
|
})
|
|
if err != nil {
|
|
rpcError := rpcerror.Convert(err)
|
|
code := rpcError.Code()
|
|
if code == codes.Unimplemented {
|
|
// For backwards compatibility just return nil to indicate unimplemented.
|
|
logging.V(7).Infof("%s unimplemented", label)
|
|
return GetMappingsResponse{}, nil
|
|
}
|
|
logging.V(7).Infof("%s failed: %v", label, rpcError)
|
|
return GetMappingsResponse{}, err
|
|
}
|
|
|
|
logging.V(7).Infof("%s success: providers=%v", label, resp.Providers)
|
|
// Ensure we don't return nil here because we use it as an "unimplemented" flag elsewhere in the system
|
|
if resp.Providers == nil {
|
|
resp.Providers = []string{}
|
|
}
|
|
return GetMappingsResponse{resp.Providers}, nil
|
|
}
|