// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import * as grpc from "@grpc/grpc-js"; import * as dynamic from "../../dynamic"; import * as rpc from "../../runtime/rpc"; import { version } from "../../version"; import * as anyproto from "google-protobuf/google/protobuf/any_pb"; import * as emptyproto from "google-protobuf/google/protobuf/empty_pb"; import * as structproto from "google-protobuf/google/protobuf/struct_pb"; import * as plugproto from "../../proto/plugin_pb"; import * as provrpc from "../../proto/provider_grpc_pb"; import * as provproto from "../../proto/provider_pb"; import * as statusproto from "../../proto/status_pb"; const requireFromString = require("require-from-string"); const providerKey: string = "__provider"; // maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb) const maxRPCMessageSize: number = 1024 * 1024 * 400; // We track all uncaught errors here. If we have any, we will make sure we always have a non-0 exit // code. const uncaughtErrors = new Set<Error>(); const uncaughtHandler = (err: Error) => { if (!uncaughtErrors.has(err)) { uncaughtErrors.add(err); console.error(err.stack || err.message || "" + err); } }; process.on("uncaughtException", uncaughtHandler); // @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so just // suppress the TS strictness here. process.on("unhandledRejection", uncaughtHandler); process.on("exit", (code: number) => { // If there were any uncaught errors at all, we always want to exit with an error code. if (code === 0 && uncaughtErrors.size > 0) { process.exitCode = 1; } }); const providerCache: { [key: string]: dynamic.ResourceProvider } = {}; function getProvider(props: any): dynamic.ResourceProvider { const providerString = props[providerKey]; let provider: any = providerCache[providerString]; if (!provider) { provider = requireFromString(providerString).handler(); providerCache[providerString] = provider; } // TODO[pulumi/pulumi#414]: investigate replacing requireFromString with eval return provider; } // Each of the *RPC functions below implements a single method of the resource provider gRPC interface. The CRUD // functions--checkRPC, diffRPC, createRPC, updateRPC, and deleteRPC--all operate in a similar fashion: // 1. Deserialize the dyanmic provider for the resource on which the function is operating // 2. Call the dynamic provider's corresponding {check,diff,create,update,delete} method // 3. Convert and return the results // In all cases, the dynamic provider is available in its serialized form as a property of the resource; // getProvider` is responsible for handling its deserialization. In the case of diffRPC, if the provider itself // has changed, `diff` reports that the resource requires replacement and does not delegate to the dynamic provider. // This allows the creation of the replacement resource to use the new provider while the deletion of the old // resource uses the provider with which it was created. function cancelRPC(call: any, callback: any): void { callback(undefined, new emptyproto.Empty()); } function configureRPC(call: any, callback: any): void { const resp = new provproto.ConfigureResponse(); resp.setAcceptsecrets(false); callback(undefined, resp); } async function invokeRPC(call: any, callback: any): Promise<void> { const req: any = call.request; // TODO[pulumi/pulumi#406]: implement this. callback(new Error(`unknown function ${req.getTok()}`), undefined); } async function streamInvokeRPC(call: any, callback: any): Promise<void> { const req: any = call.request; // TODO[pulumi/pulumi#406]: implement this. callback(new Error(`unknown function ${req.getTok()}`), undefined); } async function checkRPC(call: any, callback: any): Promise<void> { try { const req: any = call.request; const resp = new provproto.CheckResponse(); const olds = req.getOlds().toJavaScript(); const news = req.getNews().toJavaScript(); const provider = getProvider(news[providerKey] === rpc.unknownValue ? olds : news); let inputs: any = news; let failures: any[] = []; if (provider.check) { const result = await provider.check(olds, news); if (result.inputs) { inputs = result.inputs; } if (result.failures) { failures = result.failures; } } else { // If no check method was provided, propagate the new inputs as-is. inputs = news; } inputs[providerKey] = news[providerKey]; resp.setInputs(structproto.Struct.fromJavaScript(inputs)); if (failures.length !== 0) { const failureList = []; for (const f of failures) { const failure = new provproto.CheckFailure(); failure.setProperty(f.property); failure.setReason(f.reason); failureList.push(failure); } resp.setFailuresList(failureList); } callback(undefined, resp); } catch (e) { console.error(`${e}: ${e.stack}`); callback(e, undefined); } } function checkConfigRPC(call: any, callback: any): void { callback( { code: grpc.status.UNIMPLEMENTED, details: "CheckConfig is not implemented by the dynamic provider", }, undefined, ); } async function diffRPC(call: any, callback: any): Promise<void> { try { const req: any = call.request; const resp = new provproto.DiffResponse(); // Note that we do not take any special action if the provider has changed. This allows a user to iterate on a // dynamic provider's implementation. This does require some care on the part of the user: each iteration of a // dynamic provider's implementation must be able to handle all state produced by prior iterations. // // Prior versions of the dynamic provider required that a dynamic resource be replaced any time its provider // implementation changed. This made iteration painful, especially if the dynamic resource was managing a // physical resource--in this case, the physical resource would be unnecessarily deleted and recreated each // time the provider was updated. const olds = req.getOlds().toJavaScript(); const news = req.getNews().toJavaScript(); const provider = getProvider(news[providerKey] === rpc.unknownValue ? olds : news); if (provider.diff) { const result: any = await provider.diff(req.getId(), olds, news); if (result.changes === true) { resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_SOME); } else if (result.changes === false) { resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_NONE); } else { resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_UNKNOWN); } if (result.replaces && result.replaces.length !== 0) { resp.setReplacesList(result.replaces); } if (result.deleteBeforeReplace) { resp.setDeletebeforereplace(result.deleteBeforeReplace); } } callback(undefined, resp); } catch (e) { console.error(`${e}: ${e.stack}`); callback(e, undefined); } } function diffConfigRPC(call: any, callback: any): void { callback( { code: grpc.status.UNIMPLEMENTED, details: "DiffConfig is not implemented by the dynamic provider", }, undefined, ); } async function createRPC(call: any, callback: any): Promise<void> { try { const req: any = call.request; const resp = new provproto.CreateResponse(); const props = req.getProperties().toJavaScript(); const provider = getProvider(props); const result = await provider.create(props); const resultProps = resultIncludingProvider(result.outs, props); resp.setId(result.id); resp.setProperties(structproto.Struct.fromJavaScript(resultProps)); callback(undefined, resp); } catch (e) { const response = grpcResponseFromError(e); return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata); } } async function readRPC(call: any, callback: any): Promise<void> { try { const req: any = call.request; const resp = new provproto.ReadResponse(); const id = req.getId(); const props = req.getProperties().toJavaScript(); const provider = getProvider(props); if (provider.read) { // If there's a read function, consult the provider. Ensure to propagate the special __provider // value too, so that the provider's CRUD operations continue to function after a refresh. const result: any = await provider.read(id, props); resp.setId(result.id); const resultProps = resultIncludingProvider(result.props, props); resp.setProperties(structproto.Struct.fromJavaScript(resultProps)); } else { // In the event of a missing read, simply return back the input state. resp.setId(id); resp.setProperties(req.getProperties()); } callback(undefined, resp); } catch (e) { console.error(`${e}: ${e.stack}`); callback(e, undefined); } } async function updateRPC(call: any, callback: any): Promise<void> { try { const req: any = call.request; const resp = new provproto.UpdateResponse(); const olds = req.getOlds().toJavaScript(); const news = req.getNews().toJavaScript(); let result: any = {}; const provider = getProvider(news); if (provider.update) { result = (await provider.update(req.getId(), olds, news)) || {}; } const resultProps = resultIncludingProvider(result.outs, news); resp.setProperties(structproto.Struct.fromJavaScript(resultProps)); callback(undefined, resp); } catch (e) { const response = grpcResponseFromError(e); return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata); } } async function deleteRPC(call: any, callback: any): Promise<void> { try { const req: any = call.request; const props: any = req.getProperties().toJavaScript(); const provider: any = await getProvider(props); if (provider.delete) { await provider.delete(req.getId(), props); } callback(undefined, new emptyproto.Empty()); } catch (e) { console.error(`${e}: ${e.stack}`); callback(e, undefined); } } async function getPluginInfoRPC(call: any, callback: any): Promise<void> { const resp: any = new plugproto.PluginInfo(); resp.setVersion(version); callback(undefined, resp); } function getSchemaRPC(call: any, callback: any): void { callback( { code: grpc.status.UNIMPLEMENTED, details: "GetSchema is not implemented by the dynamic provider", }, undefined, ); } function constructRPC(call: any, callback: any): void { callback( { code: grpc.status.UNIMPLEMENTED, details: "Construct is not implemented by the dynamic provider", }, undefined, ); } function resultIncludingProvider(result: any, props: any): any { return Object.assign(result || {}, { [providerKey]: props[providerKey], }); } // grpcResponseFromError creates a gRPC response representing an error from a dynamic provider's // resource. This is typically either a creation error, in which the API server has (virtually) // rejected the resource, or an initialization error, where the API server has accepted the // resource, but it failed to initialize (e.g., the app code is continually crashing and the // resource has failed to become alive). function grpcResponseFromError(e: { id: string; properties: any; message: string; reasons?: string[] }) { // Create response object. const resp = new statusproto.Status(); resp.setCode(grpc.status.UNKNOWN); resp.setMessage(e.message); const metadata = new grpc.Metadata(); if (e.id) { // Object created successfully, but failed to initialize. Pack initialization failure into // details. const detail = new provproto.ErrorResourceInitFailed(); detail.setId(e.id); detail.setProperties(structproto.Struct.fromJavaScript(e.properties || {})); detail.setReasonsList(e.reasons || []); const details = new anyproto.Any(); details.pack(detail.serializeBinary(), "pulumirpc.ErrorResourceInitFailed"); // Add details to metadata. resp.addDetails(details); // NOTE: `grpc-status-details-bin` is a magic field that allows us to send structured // protobuf data as an error back through gRPC. This notion of details is a first-class in // the Go gRPC implementation, and the nodejs implementation has not quite caught up to it, // which is why it's cumbersome here. metadata.add("grpc-status-details-bin", Buffer.from(resp.serializeBinary())); } return { code: grpc.status.UNKNOWN, message: e.message, metadata: metadata, }; } /** @internal */ export async function main(args: string[]) { // The program requires a single argument: the address of the RPC endpoint for the engine. It // optionally also takes a second argument, a reference back to the engine, but this may be missing. if (args.length === 0) { console.error("fatal: Missing <engine> address"); process.exit(-1); } // Finally connect up the gRPC client/server and listen for incoming requests. const server = new grpc.Server({ "grpc.max_receive_message_length": maxRPCMessageSize, }); server.addService(provrpc.ResourceProviderService, { cancel: cancelRPC, configure: configureRPC, invoke: invokeRPC, streamInvoke: streamInvokeRPC, check: checkRPC, checkConfig: checkConfigRPC, diff: diffRPC, diffConfig: diffConfigRPC, create: createRPC, read: readRPC, update: updateRPC, delete: deleteRPC, getPluginInfo: getPluginInfoRPC, getSchema: getSchemaRPC, construct: constructRPC, }); const port: number = await new Promise<number>((resolve, reject) => { server.bindAsync(`127.0.0.1:0`, grpc.ServerCredentials.createInsecure(), (err, p) => { if (err) { reject(err); } else { resolve(p); } }); }); server.start(); // Emit the address so the monitor can read it to connect. The gRPC server will keep the message loop alive. // We explicitly convert the number to a string so that Node doesn't colorize the output. console.log(port.toString()); } main(process.argv.slice(2));