pulumi/sdk/nodejs/provider/server.ts

678 lines
26 KiB
TypeScript

// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as grpc from "@grpc/grpc-js";
import { Provider } from "./provider";
import * as log from "../log";
import { Inputs, Output, output } from "../output";
import * as resource from "../resource";
import * as config from "../runtime/config";
import * as rpc from "../runtime/rpc";
import * as settings from "../runtime/settings";
import * as localState from "../runtime/state";
import { parseArgs } from "./internals";
import * as gstruct from "google-protobuf/google/protobuf/struct_pb";
import * as anyproto from "google-protobuf/google/protobuf/any_pb";
import * as emptyproto from "google-protobuf/google/protobuf/empty_pb";
import * as structproto from "google-protobuf/google/protobuf/struct_pb";
import * as plugproto from "../proto/plugin_pb";
import * as provrpc from "../proto/provider_grpc_pb";
import * as provproto from "../proto/provider_pb";
import * as statusproto from "../proto/status_pb";
class Server implements grpc.UntypedServiceImplementation {
engineAddr: string | undefined;
readonly provider: Provider;
readonly uncaughtErrors: Set<Error>;
private readonly _callbacks = new Map<Symbol, grpc.sendUnaryData<any>>();
constructor(engineAddr: string | undefined, provider: Provider, uncaughtErrors: Set<Error>) {
this.engineAddr = engineAddr;
this.provider = provider;
this.uncaughtErrors = uncaughtErrors;
// When we catch an uncaught error, we need to respond to the inflight call/construct gRPC requests
// with the error to avoid a hang.
const uncaughtHandler = (err: Error) => {
if (!this.uncaughtErrors.has(err)) {
this.uncaughtErrors.add(err);
}
// terminate the outstanding gRPC requests.
this._callbacks.forEach((callback) => callback(err, undefined));
};
process.on("uncaughtException", uncaughtHandler);
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
// just suppress the TS strictness here.
process.on("unhandledRejection", uncaughtHandler);
}
// Satisfy the grpc.UntypedServiceImplementation interface.
[name: string]: any;
// Misc. methods
public cancel(call: any, callback: any): void {
callback(undefined, new emptyproto.Empty());
}
public attach(call: any, callback: any): void {
const req = call.request;
const host = req.getAddress();
this.engineAddr = host;
callback(undefined, new emptyproto.Empty());
}
public getPluginInfo(call: any, callback: any): void {
const resp: any = new plugproto.PluginInfo();
resp.setVersion(this.provider.version);
callback(undefined, resp);
}
public getSchema(call: any, callback: any): void {
const req: any = call.request;
if (req.getVersion() !== 0) {
callback(new Error(`unsupported schema version ${req.getVersion()}`), undefined);
}
const resp: any = new provproto.GetSchemaResponse();
resp.setSchema(this.provider.schema || "{}");
callback(undefined, resp);
}
// Config methods
public checkConfig(call: any, callback: any): void {
callback(
{
code: grpc.status.UNIMPLEMENTED,
details: "Not yet implemented: CheckConfig",
},
undefined,
);
}
public diffConfig(call: any, callback: any): void {
callback(
{
code: grpc.status.UNIMPLEMENTED,
details: "Not yet implemented: DiffConfig",
},
undefined,
);
}
public configure(call: any, callback: any): void {
const resp = new provproto.ConfigureResponse();
resp.setAcceptsecrets(true);
resp.setAcceptresources(true);
resp.setAcceptoutputs(true);
callback(undefined, resp);
}
// CRUD resource methods
public async check(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
const resp = new provproto.CheckResponse();
const olds = req.getOlds().toJavaScript();
const news = req.getNews().toJavaScript();
let inputs: any = news;
let failures: any[] = [];
if (this.provider.check) {
const result = await this.provider.check(req.getUrn(), olds, news);
if (result.inputs) {
inputs = result.inputs;
}
if (result.failures) {
failures = result.failures;
}
} else {
// If no check method was provided, propagate the new inputs as-is.
inputs = news;
}
resp.setInputs(structproto.Struct.fromJavaScript(inputs));
if (failures.length !== 0) {
const failureList = [];
for (const f of failures) {
const failure = new provproto.CheckFailure();
failure.setProperty(f.property);
failure.setReason(f.reason);
failureList.push(failure);
}
resp.setFailuresList(failureList);
}
callback(undefined, resp);
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
}
}
public async diff(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
const resp = new provproto.DiffResponse();
const olds = req.getOlds().toJavaScript();
const news = req.getNews().toJavaScript();
if (this.provider.diff) {
const result: any = await this.provider.diff(req.getId(), req.getUrn(), olds, news);
if (result.changes === true) {
resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_SOME);
} else if (result.changes === false) {
resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_NONE);
} else {
resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_UNKNOWN);
}
if (result.replaces && result.replaces.length !== 0) {
resp.setReplacesList(result.replaces);
}
if (result.deleteBeforeReplace) {
resp.setDeletebeforereplace(result.deleteBeforeReplace);
}
}
callback(undefined, resp);
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
}
}
public async create(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
if (!this.provider.create) {
callback(new Error(`unknown resource type ${req.getUrn()}`), undefined);
return;
}
const resp = new provproto.CreateResponse();
const props = req.getProperties().toJavaScript();
const result = await this.provider.create(req.getUrn(), props);
resp.setId(result.id);
resp.setProperties(structproto.Struct.fromJavaScript(result.outs));
callback(undefined, resp);
} catch (e) {
const response = grpcResponseFromError(e);
return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
}
}
public async read(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
const resp = new provproto.ReadResponse();
const id = req.getId();
const props = req.getProperties().toJavaScript();
if (this.provider.read) {
const result: any = await this.provider.read(id, req.getUrn(), props);
resp.setId(result.id);
resp.setProperties(structproto.Struct.fromJavaScript(result.props));
resp.setInputs(
result.inputs === undefined ? undefined : structproto.Struct.fromJavaScript(result.inputs),
);
} else {
// In the event of a missing read, simply return back the input state.
resp.setId(id);
resp.setProperties(req.getProperties());
}
callback(undefined, resp);
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
}
}
public async update(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
const resp = new provproto.UpdateResponse();
const olds = req.getOlds().toJavaScript();
const news = req.getNews().toJavaScript();
let result: any = {};
if (this.provider.update) {
result = (await this.provider.update(req.getId(), req.getUrn(), olds, news)) || {};
}
resp.setProperties(structproto.Struct.fromJavaScript(result.outs));
callback(undefined, resp);
} catch (e) {
const response = grpcResponseFromError(e);
return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
}
}
public async delete(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
const props: any = req.getProperties().toJavaScript();
if (this.provider.delete) {
await this.provider.delete(req.getId(), req.getUrn(), props);
}
callback(undefined, new emptyproto.Empty());
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
}
}
public async construct(call: any, callback: any): Promise<void> {
// Setup a new async state store for this run
const store = new localState.LocalStore();
return localState.asyncLocalStorage.run(store, async () => {
const callbackId = Symbol("id");
this._callbacks.set(callbackId, callback);
try {
const req: any = call.request;
const type = req.getType();
const name = req.getName();
if (!this.provider.construct) {
callback(new Error(`unknown resource type ${type}`), undefined);
return;
}
await configureRuntime(req, this.engineAddr);
const inputs = await deserializeInputs(req.getInputs(), req.getInputdependenciesMap());
// Rebuild the resource options.
const dependsOn: resource.Resource[] = [];
for (const urn of req.getDependenciesList()) {
dependsOn.push(new resource.DependencyResource(urn));
}
const providers: Record<string, resource.ProviderResource> = {};
const rpcProviders = req.getProvidersMap();
if (rpcProviders) {
for (const [pkg, ref] of rpcProviders.entries()) {
providers[pkg] = createProviderResource(ref);
}
}
const opts: resource.ComponentResourceOptions = {
aliases: req.getAliasesList(),
dependsOn: dependsOn,
protect: req.getProtect(),
providers: providers,
parent: req.getParent() ? new resource.DependencyResource(req.getParent()) : undefined,
};
const result = await this.provider.construct(name, type, inputs, opts);
const resp = new provproto.ConstructResponse();
resp.setUrn(await output(result.urn).promise());
const [state, stateDependencies] = await rpc.serializeResourceProperties(
`construct(${type}, ${name})`,
result.state,
);
const stateDependenciesMap = resp.getStatedependenciesMap();
for (const [key, resources] of stateDependencies) {
const deps = new provproto.ConstructResponse.PropertyDependencies();
deps.setUrnsList(await Promise.all(Array.from(resources).map((r) => r.urn.promise())));
stateDependenciesMap.set(key, deps);
}
resp.setState(structproto.Struct.fromJavaScript(state));
// Wait for RPC operations to complete.
await settings.waitForRPCs();
callback(undefined, resp);
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
} finally {
// remove the gRPC callback context from the map of in-flight callbacks
this._callbacks.delete(callbackId);
}
});
}
public async call(call: any, callback: any): Promise<void> {
// Setup a new async state store for this run
const store = new localState.LocalStore();
return localState.asyncLocalStorage.run(store, async () => {
const callbackId = Symbol("id");
this._callbacks.set(callbackId, callback);
try {
const req: any = call.request;
if (!this.provider.call) {
callback(new Error(`unknown function ${req.getTok()}`), undefined);
return;
}
await configureRuntime(req, this.engineAddr);
const args = await deserializeInputs(req.getArgs(), req.getArgdependenciesMap());
const result = await this.provider.call(req.getTok(), args);
const resp = new provproto.CallResponse();
if (result.outputs) {
const [ret, retDependencies] = await rpc.serializeResourceProperties(
`call(${req.getTok()})`,
result.outputs,
);
const returnDependenciesMap = resp.getReturndependenciesMap();
for (const [key, resources] of retDependencies) {
const deps = new provproto.CallResponse.ReturnDependencies();
deps.setUrnsList(await Promise.all(Array.from(resources).map((r) => r.urn.promise())));
returnDependenciesMap.set(key, deps);
}
resp.setReturn(structproto.Struct.fromJavaScript(ret));
}
if ((result.failures || []).length !== 0) {
const failureList = [];
for (const f of result.failures!) {
const failure = new provproto.CheckFailure();
failure.setProperty(f.property);
failure.setReason(f.reason);
failureList.push(failure);
}
resp.setFailuresList(failureList);
}
// Wait for RPC operations to complete.
await settings.waitForRPCs();
callback(undefined, resp);
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
} finally {
// remove the gRPC callback context from the map of in-flight callbacks
this._callbacks.delete(callbackId);
}
});
}
public async invoke(call: any, callback: any): Promise<void> {
try {
const req: any = call.request;
if (!this.provider.invoke) {
callback(new Error(`unknown function ${req.getTok()}`), undefined);
return;
}
const args: any = req.getArgs().toJavaScript();
const result = await this.provider.invoke(req.getTok(), args);
const resp = new provproto.InvokeResponse();
resp.setReturn(structproto.Struct.fromJavaScript(result.outputs));
if ((result.failures || []).length !== 0) {
const failureList = [];
for (const f of result.failures!) {
const failure = new provproto.CheckFailure();
failure.setProperty(f.property);
failure.setReason(f.reason);
failureList.push(failure);
}
resp.setFailuresList(failureList);
}
callback(undefined, resp);
} catch (e) {
console.error(`${e}: ${e.stack}`);
callback(e, undefined);
}
}
public async streamInvoke(call: any, callback: any): Promise<void> {
callback(
{
code: grpc.status.UNIMPLEMENTED,
details: "Not yet implemented: StreamInvoke",
},
undefined,
);
}
}
async function configureRuntime(req: any, engineAddr: string | undefined) {
// NOTE: these are globals! We should ensure that all settings are identical between calls, and eventually
// refactor so we can avoid the global state.
if (engineAddr === undefined) {
throw new Error("fatal: Missing <engine> address");
}
settings.resetOptions(
req.getProject(),
req.getStack(),
req.getParallel(),
engineAddr,
req.getMonitorendpoint(),
req.getDryrun(),
req.getOrganization(),
);
// resetOptions doesn't reset the saved features
await settings.awaitFeatureSupport();
const pulumiConfig: { [key: string]: string } = {};
const rpcConfig = req.getConfigMap();
if (rpcConfig) {
for (const [k, v] of rpcConfig.entries()) {
pulumiConfig[k] = v;
}
}
config.setAllConfig(pulumiConfig, req.getConfigsecretkeysList());
}
/**
* deserializeInputs deserializes the inputs struct and applies appropriate dependencies.
* @internal
*/
export async function deserializeInputs(inputsStruct: gstruct.Struct, inputDependencies: any): Promise<Inputs> {
const result: Inputs = {};
const deserializedInputs = rpc.deserializeProperties(inputsStruct);
for (const k of Object.keys(deserializedInputs)) {
const input = deserializedInputs[k];
const isSecret = rpc.isRpcSecret(input);
const depsUrns: resource.URN[] = inputDependencies.get(k)?.getUrnsList() ?? [];
if (
!isSecret &&
(depsUrns.length === 0 || containsOutputs(input) || (await isResourceReference(input, depsUrns)))
) {
// If the input isn't a secret and either doesn't have any dependencies, already contains Outputs (from
// deserialized output values), or is a resource reference, then we can return it directly without
// wrapping it as an output.
result[k] = input;
} else {
// Otherwise, wrap it in an output so we can handle secrets and/or track dependencies.
// Note: If the value is or contains an unknown value, the Output will mark its value as
// unknown automatically, so we just pass true for isKnown here.
const deps = depsUrns.map((depUrn) => new resource.DependencyResource(depUrn));
result[k] = new Output(
deps,
Promise.resolve(rpc.unwrapRpcSecret(input)),
Promise.resolve(true),
Promise.resolve(isSecret),
Promise.resolve([]),
);
}
}
return result;
}
/**
* Returns true if the input is a resource reference.
*/
async function isResourceReference(input: any, deps: string[]): Promise<boolean> {
return resource.Resource.isInstance(input) && deps.length === 1 && deps[0] === (await input.urn.promise());
}
/**
* Returns true if the deserialized input contains Outputs (deeply), excluding properties of Resources.
* @internal
*/
export function containsOutputs(input: any): boolean {
if (Array.isArray(input)) {
for (const e of input) {
if (containsOutputs(e)) {
return true;
}
}
} else if (typeof input === "object") {
if (Output.isInstance(input)) {
return true;
} else if (resource.Resource.isInstance(input)) {
// Do not drill into instances of Resource because they will have properties that are
// instances of Output (e.g. urn, id, etc.) and we're only looking for instances of
// Output that aren't associated with a Resource.
return false;
}
for (const k of Object.keys(input)) {
if (containsOutputs(input[k])) {
return true;
}
}
}
return false;
}
// grpcResponseFromError creates a gRPC response representing an error from a dynamic provider's
// resource. This is typically either a creation error, in which the API server has (virtually)
// rejected the resource, or an initialization error, where the API server has accepted the
// resource, but it failed to initialize (e.g., the app code is continually crashing and the
// resource has failed to become alive).
function grpcResponseFromError(e: { id: string; properties: any; message: string; reasons?: string[] }) {
// Create response object.
const resp = new statusproto.Status();
resp.setCode(grpc.status.UNKNOWN);
resp.setMessage(e.message);
const metadata = new grpc.Metadata();
if (e.id) {
// Object created successfully, but failed to initialize. Pack initialization failure into
// details.
const detail = new provproto.ErrorResourceInitFailed();
detail.setId(e.id);
detail.setProperties(structproto.Struct.fromJavaScript(e.properties || {}));
detail.setReasonsList(e.reasons || []);
const details = new anyproto.Any();
details.pack(detail.serializeBinary(), "pulumirpc.ErrorResourceInitFailed");
// Add details to metadata.
resp.addDetails(details);
// NOTE: `grpc-status-details-bin` is a magic field that allows us to send structured
// protobuf data as an error back through gRPC. This notion of details is a first-class in
// the Go gRPC implementation, and the nodejs implementation has not quite caught up to it,
// which is why it's cumbersome here.
metadata.add("grpc-status-details-bin", Buffer.from(resp.serializeBinary()));
}
return {
code: grpc.status.UNKNOWN,
message: e.message,
metadata: metadata,
};
}
export async function main(provider: Provider, args: string[]) {
// We track all uncaught errors here. If we have any, we will make sure we always have a non-0 exit
// code.
const uncaughtErrors = new Set<Error>();
const uncaughtHandler = (err: Error) => {
if (!uncaughtErrors.has(err)) {
uncaughtErrors.add(err);
// Use `pulumi.log.error` here to tell the engine there was a fatal error, which should
// stop processing subsequent resource operations.
log.error(err.stack || err.message || "" + err);
}
};
process.on("uncaughtException", uncaughtHandler);
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so just
// suppress the TS strictness here.
process.on("unhandledRejection", uncaughtHandler);
process.on("exit", (code: number) => {
// If there were any uncaught errors at all, we always want to exit with an error code.
if (code === 0 && uncaughtErrors.size > 0) {
process.exitCode = 1;
}
});
const parsedArgs = parseArgs(args);
// Finally connect up the gRPC client/server and listen for incoming requests.
const server = new grpc.Server({
"grpc.max_receive_message_length": settings.maxRPCMessageSize,
});
// The program receives a single optional argument: the address of the RPC endpoint for the engine. It
// optionally also takes a second argument, a reference back to the engine, but this may be missing.
const engineAddr = parsedArgs?.engineAddress;
server.addService(provrpc.ResourceProviderService, new Server(engineAddr, provider, uncaughtErrors));
const port: number = await new Promise<number>((resolve, reject) => {
server.bindAsync(`127.0.0.1:0`, grpc.ServerCredentials.createInsecure(), (err, p) => {
if (err) {
reject(err);
} else {
resolve(p);
}
});
});
// Emit the address so the monitor can read it to connect. The gRPC server will keep the message loop alive.
// We explicitly convert the number to a string so that Node doesn't colorize the output.
console.log(port.toString());
}
/**
* Rehydrate the provider reference into a registered ProviderResource,
* otherwise return an instance of DependencyProviderResource.
*/
function createProviderResource(ref: string): resource.ProviderResource {
const [urn] = resource.parseResourceReference(ref);
const urnParts = urn.split("::");
const qualifiedType = urnParts[2];
const urnName = urnParts[3];
const type = qualifiedType.split("$").pop()!;
const typeParts = type.split(":");
const typName = typeParts.length > 2 ? typeParts[2] : "";
const resourcePackage = rpc.getResourcePackage(typName, /*version:*/ "");
if (resourcePackage) {
return resourcePackage.constructProvider(urnName, type, urn);
}
return new resource.DependencyProviderResource(ref);
}