pulumi/sdk/nodejs/runtime/invoke.ts

466 lines
18 KiB
TypeScript

// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as grpc from "@grpc/grpc-js";
import { AsyncIterable } from "@pulumi/query/interfaces";
import { InvokeOptions } from "../invoke";
import * as log from "../log";
import { Inputs, Output } from "../output";
import { debuggablePromise } from "./debuggable";
import {
deserializeProperties,
isRpcSecret,
serializeProperties,
serializePropertiesReturnDeps,
unwrapRpcSecret,
} from "./rpc";
import { excessiveDebugOutput, getMonitor, rpcKeepAlive, terminateRpcs } from "./settings";
import { DependencyResource, ProviderResource, Resource } from "../resource";
import * as utils from "../utils";
import { PushableAsyncIterable } from "./asyncIterableUtil";
import * as gstruct from "google-protobuf/google/protobuf/struct_pb";
import * as resourceproto from "../proto/resource_pb";
import * as providerproto from "../proto/provider_pb";
/**
* `invoke` dynamically invokes the function, `tok`, which is offered by a provider plugin. `invoke`
* behaves differently in the case that options contains `{async:true}` or not.
*
* In the case where `{async:true}` is present in the options bag:
*
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider plugin.
* 2. the `props` inputs can be a bag of computed values (including, `T`s, `Promise<T>`s,
* `Output<T>`s etc.).
*
*
* In the case where `{async:true}` is not present in the options bag:
*
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider call.
* However, that Promise will *also* have the respective values of the Provider result exposed
* directly on it as properties.
*
* 2. The inputs must be a bag of simple values, and the result is the result that the Provider
* produced.
*
* Simple values are:
* 1. `undefined`, `null`, string, number or boolean values.
* 2. arrays of simple values.
* 3. objects containing only simple values.
*
* Importantly, simple values do *not* include:
* 1. `Promise`s
* 2. `Output`s
* 3. `Asset`s or `Archive`s
* 4. `Resource`s.
*
* All of these contain async values that would prevent `invoke from being able to operate
* synchronously.
*/
export function invoke(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
return invokeAsync(tok, props, opts);
}
/*
* `invokeSingle` dynamically invokes the function, `tok`, which is offered by a provider plugin.
* Similar to `invoke`, but returns a single value instead of an object with a single key.
*/
export function invokeSingle(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
return invokeAsync(tok, props, opts).then((outputs) => {
// assume outputs have a single key
const keys = Object.keys(outputs);
// return the first key's value from the outputs
return outputs[keys[0]];
});
}
export async function streamInvoke(
tok: string,
props: Inputs,
opts: InvokeOptions = {},
): Promise<StreamInvokeResponse<any>> {
const label = `StreamInvoking function: tok=${tok} asynchronously`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const serialized = await serializeProperties(`streamInvoke:${tok}`, props);
log.debug(
`StreamInvoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
);
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
const provider = await ProviderResource.register(getProvider(tok, opts));
const req = createInvokeRequest(tok, serialized, provider, opts);
// Call `streamInvoke`.
const result = monitor.streamInvoke(req, {});
const queue = new PushableAsyncIterable();
result.on("data", function (thing: any) {
const live = deserializeResponse(tok, thing);
queue.push(live);
});
result.on("error", (err: any) => {
if (err.code === 1) {
return;
}
throw err;
});
result.on("end", () => {
queue.complete();
});
// Return a cancellable handle to the stream.
return new StreamInvokeResponse(queue, () => result.cancel());
} finally {
done();
}
}
async function invokeAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
const label = `Invoking function: tok=${tok} asynchronously`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
// Wait for all values to be available, and then perform the RPC.
const done = rpcKeepAlive();
try {
const serialized = await serializeProperties(`invoke:${tok}`, props);
log.debug(
`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
);
// Fetch the monitor and make an RPC request.
const monitor: any = getMonitor();
const provider = await ProviderResource.register(getProvider(tok, opts));
const req = createInvokeRequest(tok, serialized, provider, opts);
const resp: any = await debuggablePromise(
new Promise((innerResolve, innerReject) =>
monitor.invoke(req, (err: grpc.ServiceError, innerResponse: any) => {
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
if (err) {
// If the monitor is unavailable, it is in the process of shutting down or has already
// shut down. Don't emit an error and don't do any more RPCs, just exit.
if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
terminateRpcs();
err.message = "Resource monitor is terminating";
innerReject(err);
return;
}
// If the RPC failed, rethrow the error with a native exception and the message that
// the engine provided - it's suitable for user presentation.
innerReject(new Error(err.details));
} else {
innerResolve(innerResponse);
}
}),
),
label,
);
// Finally propagate any other properties that were given to us as outputs.
return deserializeResponse(tok, resp);
} finally {
done();
}
}
// StreamInvokeResponse represents a (potentially infinite) streaming response to `streamInvoke`,
// with facilities to gracefully cancel and clean up the stream.
export class StreamInvokeResponse<T> implements AsyncIterable<T> {
constructor(
private source: AsyncIterable<T>,
private cancelSource: () => void,
) {}
// cancel signals the `streamInvoke` should be cancelled and cleaned up gracefully.
public cancel() {
this.cancelSource();
}
[Symbol.asyncIterator]() {
return this.source[Symbol.asyncIterator]();
}
}
function createInvokeRequest(tok: string, serialized: any, provider: string | undefined, opts: InvokeOptions) {
if (provider !== undefined && typeof provider !== "string") {
throw new Error("Incorrect provider type.");
}
const obj = gstruct.Struct.fromJavaScript(serialized);
const req = new resourceproto.ResourceInvokeRequest();
req.setTok(tok);
req.setArgs(obj);
req.setProvider(provider || "");
req.setVersion(opts.version || "");
req.setAcceptresources(!utils.disableResourceReferences);
return req;
}
function getProvider(tok: string, opts: InvokeOptions) {
return opts.provider ? opts.provider : opts.parent ? opts.parent.getProvider(tok) : undefined;
}
function deserializeResponse(
tok: string,
resp: { getFailuresList(): Array<providerproto.CheckFailure>; getReturn(): gstruct.Struct | undefined },
): Inputs | undefined {
const failures = resp.getFailuresList();
if (failures?.length) {
let reasons = "";
for (let i = 0; i < failures.length; i++) {
if (reasons !== "") {
reasons += "; ";
}
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
}
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
}
const ret = resp.getReturn();
return ret === undefined ? ret : deserializeProperties(ret);
}
/**
* `call` dynamically calls the function, `tok`, which is offered by a provider plugin.
*/
export function call<T>(tok: string, props: Inputs, res?: Resource): Output<T> {
const label = `Calling function: tok=${tok}`;
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
const [out, resolver] = createOutput<T>(`call(${tok})`);
debuggablePromise(
Promise.resolve().then(async () => {
const done = rpcKeepAlive();
try {
// Construct a provider reference from the given provider, if one is available on the resource.
let provider: string | undefined = undefined;
let version: string | undefined = undefined;
let pluginDownloadURL: string | undefined = undefined;
if (res) {
if (res.__prov) {
provider = await ProviderResource.register(res.__prov);
}
version = res.__version;
pluginDownloadURL = res.__pluginDownloadURL;
}
// We keep output values when serializing inputs for call.
const [serialized, propertyDepsResources] = await serializePropertiesReturnDeps(`call:${tok}`, props, {
keepOutputValues: true,
});
log.debug(
`Call RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
);
const req = await createCallRequest(
tok,
serialized,
propertyDepsResources,
provider,
version,
pluginDownloadURL,
);
const monitor = getMonitor();
const resp = await debuggablePromise(
new Promise<providerproto.CallResponse>((innerResolve, innerReject) => {
if (monitor === undefined) {
throw new Error("No monitor available");
}
monitor.call(
req,
(err: grpc.ServiceError | null, innerResponse: providerproto.CallResponse) => {
log.debug(`Call RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
if (err) {
// If the monitor is unavailable, it is in the process of shutting down or has already
// shut down. Don't emit an error and don't do any more RPCs, just exit.
if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
terminateRpcs();
err.message = "Resource monitor is terminating";
innerReject(err);
return;
}
// If the RPC failed, rethrow the error with a native exception and the message that
// the engine provided - it's suitable for user presentation.
innerReject(new Error(err.details));
} else {
innerResolve(innerResponse);
}
},
);
}),
label,
);
// Deserialize the response and resolve the output.
const deserialized = deserializeResponse(tok, resp);
let isSecret = false;
const deps: Resource[] = [];
// Keep track of whether we need to mark the resulting output a secret.
// and unwrap each individual value.
if (deserialized !== undefined) {
for (const k of Object.keys(deserialized)) {
const v = deserialized[k];
if (isRpcSecret(v)) {
isSecret = true;
deserialized[k] = unwrapRpcSecret(v);
}
}
}
// Combine the individual dependencies into a single set of dependency resources.
const rpcDeps = resp.getReturndependenciesMap();
if (rpcDeps) {
const urns = new Set<string>();
for (const [, returnDeps] of rpcDeps.entries()) {
for (const urn of returnDeps.getUrnsList()) {
urns.add(urn);
}
}
for (const urn of urns) {
deps.push(new DependencyResource(urn));
}
}
// If the value the engine handed back is or contains an unknown value, the resolver will mark its value as
// unknown automatically, so we just pass true for isKnown here. Note that unknown values will only be
// present during previews (i.e. isDryRun() will be true).
resolver(<any>deserialized, true, isSecret, deps);
} catch (e) {
resolver(<any>undefined, true, false, undefined, e);
} finally {
done();
}
}),
label,
);
return out;
}
function createOutput<T>(
label: string,
): [Output<T>, (v: T, isKnown: boolean, isSecret: boolean, deps?: Resource[], err?: Error | undefined) => void] {
let resolveValue: (v: T) => void;
let rejectValue: (err: Error) => void;
let resolveIsKnown: (v: boolean) => void;
let rejectIsKnown: (err: Error) => void;
let resolveIsSecret: (v: boolean) => void;
let rejectIsSecret: (err: Error) => void;
let resolveDeps: (v: Resource[]) => void;
let rejectDeps: (err: Error) => void;
const resolver = (v: T, isKnown: boolean, isSecret: boolean, deps: Resource[] = [], err?: Error) => {
if (err) {
rejectValue(err);
rejectIsKnown(err);
rejectIsSecret(err);
rejectDeps(err);
} else {
resolveValue(v);
resolveIsKnown(isKnown);
resolveIsSecret(isSecret);
resolveDeps(deps);
}
};
const out = new Output(
[],
debuggablePromise(
new Promise<T>((resolve, reject) => {
resolveValue = resolve;
rejectValue = reject;
}),
`${label}Value`,
),
debuggablePromise(
new Promise<boolean>((resolve, reject) => {
resolveIsKnown = resolve;
rejectIsKnown = reject;
}),
`${label}IsKnown`,
),
debuggablePromise(
new Promise<boolean>((resolve, reject) => {
resolveIsSecret = resolve;
rejectIsSecret = reject;
}),
`${label}IsSecret`,
),
debuggablePromise(
new Promise<Resource[]>((resolve, reject) => {
resolveDeps = resolve;
rejectDeps = reject;
}),
`${label}Deps`,
),
);
return [out, resolver];
}
async function createCallRequest(
tok: string,
serialized: Record<string, any>,
serializedDeps: Map<string, Set<Resource>>,
provider?: string,
version?: string,
pluginDownloadURL?: string,
) {
if (provider !== undefined && typeof provider !== "string") {
throw new Error("Incorrect provider type.");
}
const obj = gstruct.Struct.fromJavaScript(serialized);
const req = new resourceproto.ResourceCallRequest();
req.setTok(tok);
req.setArgs(obj);
req.setProvider(provider || "");
req.setVersion(version || "");
req.setPlugindownloadurl(pluginDownloadURL || "");
const argDependencies = req.getArgdependenciesMap();
for (const [key, propertyDeps] of serializedDeps) {
const urns = new Set<string>();
for (const dep of propertyDeps) {
const urn = await dep.urn.promise();
urns.add(urn);
}
const deps = new resourceproto.ResourceCallRequest.ArgumentDependencies();
deps.setUrnsList(Array.from(urns));
argDependencies.set(key, deps);
}
return req;
}