// Copyright 2016-2024, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import * as grpc from "@grpc/grpc-js"; import { randomUUID } from "crypto"; import * as jspb from "google-protobuf"; import * as gstruct from "google-protobuf/google/protobuf/struct_pb"; import * as log from "../log"; import { output } from "../output"; import * as callrpc from "../proto/callback_grpc_pb"; import * as callproto from "../proto/callback_pb"; import { Callback, CallbackInvokeRequest, CallbackInvokeResponse } from "../proto/callback_pb"; import * as resrpc from "../proto/resource_grpc_pb"; import * as resproto from "../proto/resource_pb"; import { Alias, ComponentResourceOptions, CustomResourceOptions, DependencyProviderResource, DependencyResource, ProviderResource, Resource, ResourceOptions, ResourceTransform, ResourceTransformArgs, URN, rootStackResource, } from "../resource"; import { mapAliasesForRequest } from "./resource"; import { deserializeProperties, serializeProperties, unknownValue } from "./rpc"; // maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb) /** @internal */ const maxRPCMessageSize: number = 1024 * 1024 * 400; type CallbackFunction = (args: Uint8Array) => Promise<jspb.Message>; export interface ICallbackServer { registerTransform(callback: ResourceTransform): Promise<callproto.Callback>; registerStackTransform(callback: ResourceTransform): void; shutdown(): void; // Wait for any pendind registerStackTransform calls to complete. awaitStackRegistrations(): Promise<void>; } export class CallbackServer implements ICallbackServer { private readonly _callbacks = new Map<string, CallbackFunction>(); private readonly _monitor: resrpc.IResourceMonitorClient; private readonly _server: grpc.Server; private readonly _target: Promise<string>; private _pendingRegistrations: number = 0; private _awaitQueue: ((reason?: any) => void)[] = []; constructor(monitor: resrpc.IResourceMonitorClient) { this._monitor = monitor; this._server = new grpc.Server({ "grpc.max_receive_message_length": maxRPCMessageSize, }); const implementation: callrpc.ICallbacksServer = { invoke: this.invoke.bind(this), }; this._server.addService(callrpc.CallbacksService, implementation); const self = this; this._target = new Promise<string>((resolve, reject) => { self._server.bindAsync(`127.0.0.1:0`, grpc.ServerCredentials.createInsecure(), (err, port) => { if (err !== null) { reject(err); return; } // The server takes a while to _actually_ startup so we need to keep trying to send an invoke // to ourselves before we resolve the address to tell the engine about it. const target = `127.0.0.1:${port}`; const client = new callrpc.CallbacksClient(target, grpc.credentials.createInsecure()); const connect = () => { client.invoke(new CallbackInvokeRequest(), (error, _) => { if (error?.code === grpc.status.UNAVAILABLE) { setTimeout(connect, 1000); return; } // The expected error given we didn't give a token to the invoke. if (error?.details === "callback not found: ") { resolve(target); return; } reject(error); }); }; connect(); }); }); } awaitStackRegistrations(): Promise<void> { if (this._pendingRegistrations === 0) { return Promise.resolve(); } return new Promise<void>((resolve, reject) => { this._awaitQueue.push((reason?: any) => { if (reason !== undefined) { reject(reason); } else { resolve(); } }); }); } shutdown(): void { this._server.forceShutdown(); } private async invoke( call: grpc.ServerUnaryCall<CallbackInvokeRequest, CallbackInvokeResponse>, callback: grpc.sendUnaryData<CallbackInvokeResponse>, ) { const req = call.request; const cb = this._callbacks.get(req.getToken()); if (cb === undefined) { const err = new grpc.StatusBuilder(); err.withCode(grpc.status.INVALID_ARGUMENT); err.withDetails("callback not found: " + req.getToken()); callback(err.build()); return; } try { const response = await cb(req.getRequest_asU8()); const resp = new CallbackInvokeResponse(); resp.setResponse(response.serializeBinary()); callback(null, resp); } catch (e) { const err = new grpc.StatusBuilder(); err.withCode(grpc.status.UNKNOWN); if (e instanceof Error) { err.withDetails(e.message); } else { err.withDetails(JSON.stringify(e)); } callback(err.build()); } } async registerTransform(transform: ResourceTransform): Promise<callproto.Callback> { const cb = async (bytes: Uint8Array): Promise<jspb.Message> => { const request = resproto.TransformRequest.deserializeBinary(bytes); let opts = request.getOptions() || new resproto.TransformResourceOptions(); let ropts: ResourceOptions; if (request.getCustom()) { ropts = { deleteBeforeReplace: opts.getDeleteBeforeReplace(), additionalSecretOutputs: opts.getAdditionalSecretOutputsList(), } as CustomResourceOptions; } else { const providers: Record<string, ProviderResource> = {}; for (const [key, value] of opts.getProvidersMap().entries()) { providers[key] = new DependencyProviderResource(value); } ropts = { providers: providers, } as ComponentResourceOptions; } ropts.aliases = opts.getAliasesList().map((alias): string | Alias => { if (alias.hasUrn()) { return alias.getUrn(); } else { const spec = alias.getSpec(); if (spec === undefined) { throw new Error("alias must have either a urn or a spec"); } const nodeAlias: Alias = { name: spec.getName(), type: spec.getType(), project: spec.getProject(), stack: spec.getStack(), parent: spec.getParenturn() !== "" ? new DependencyResource(spec.getParenturn()) : undefined, }; if (spec.getNoparent()) { nodeAlias.parent = rootStackResource; } return nodeAlias; } }); const timeouts = opts.getCustomTimeouts(); if (timeouts !== undefined) { ropts.customTimeouts = { create: timeouts.getCreate(), update: timeouts.getUpdate(), delete: timeouts.getDelete(), }; } ropts.deletedWith = opts.getDeletedWith() !== "" ? new DependencyResource(opts.getDeletedWith()) : undefined; ropts.dependsOn = opts.getDependsOnList().map((dep) => new DependencyResource(dep)); ropts.ignoreChanges = opts.getIgnoreChangesList(); ropts.parent = request.getParent() !== "" ? new DependencyResource(request.getParent()) : undefined; ropts.pluginDownloadURL = opts.getPluginDownloadUrl() !== "" ? opts.getPluginDownloadUrl() : undefined; ropts.protect = opts.getProtect(); ropts.provider = opts.getProvider() !== "" ? new DependencyProviderResource(opts.getProvider()) : undefined; ropts.replaceOnChanges = opts.getReplaceOnChangesList(); ropts.retainOnDelete = opts.getRetainOnDelete(); ropts.version = opts.getVersion() !== "" ? opts.getVersion() : undefined; const props = request.getProperties(); const args: ResourceTransformArgs = { custom: request.getCustom(), type: request.getType(), name: request.getName(), props: props === undefined ? {} : deserializeProperties(props), opts: ropts, }; const result = await transform(args); const response = new resproto.TransformResponse(); if (result === undefined) { response.setProperties(request.getProperties()); response.setOptions(request.getOptions()); } else { const mprops = await serializeProperties("props", result.props); response.setProperties(gstruct.Struct.fromJavaScript(mprops)); // Copy the options over. if (result.opts !== undefined) { opts = new resproto.TransformResourceOptions(); if (result.opts.aliases !== undefined) { const aliases = []; const uniqueAliases = new Set<Alias | URN>(); for (const alias of result.opts.aliases || []) { const aliasVal = await output(alias).promise(); if (!uniqueAliases.has(aliasVal)) { uniqueAliases.add(aliasVal); aliases.push(aliasVal); } } opts.setAliasesList(await mapAliasesForRequest(aliases, request.getParent())); } if (result.opts.customTimeouts !== undefined) { const customTimeouts = new resproto.RegisterResourceRequest.CustomTimeouts(); if (result.opts.customTimeouts.create !== undefined) { customTimeouts.setCreate(result.opts.customTimeouts.create); } if (result.opts.customTimeouts.update !== undefined) { customTimeouts.setUpdate(result.opts.customTimeouts.update); } if (result.opts.customTimeouts.delete !== undefined) { customTimeouts.setDelete(result.opts.customTimeouts.delete); } opts.setCustomTimeouts(customTimeouts); } if (result.opts.deletedWith !== undefined) { opts.setDeletedWith(await result.opts.deletedWith.urn.promise()); } if (result.opts.dependsOn !== undefined) { const resolvedDeps = await output(result.opts.dependsOn).promise(); const deps = []; if (Resource.isInstance(resolvedDeps)) { deps.push(await resolvedDeps.urn.promise()); } else { for (const dep of resolvedDeps) { deps.push(await dep.urn.promise()); } } opts.setDependsOnList(deps); } if (result.opts.ignoreChanges !== undefined) { opts.setIgnoreChangesList(result.opts.ignoreChanges); } if (result.opts.pluginDownloadURL !== undefined) { opts.setPluginDownloadUrl(result.opts.pluginDownloadURL); } if (result.opts.protect !== undefined) { opts.setProtect(result.opts.protect); } if (result.opts.provider !== undefined) { const providerURN = await result.opts.provider.urn.promise(); const providerID = (await result.opts.provider.id.promise()) || unknownValue; opts.setProvider(`${providerURN}::${providerID}`); } if (result.opts.replaceOnChanges !== undefined) { opts.setReplaceOnChangesList(result.opts.replaceOnChanges); } if (result.opts.retainOnDelete !== undefined) { opts.setRetainOnDelete(result.opts.retainOnDelete); } if (result.opts.version !== undefined) { opts.setVersion(result.opts.version); } if (request.getCustom()) { const copts = result.opts as CustomResourceOptions; if (copts.deleteBeforeReplace !== undefined) { opts.setDeleteBeforeReplace(copts.deleteBeforeReplace); } if (copts.additionalSecretOutputs !== undefined) { opts.setAdditionalSecretOutputsList(copts.additionalSecretOutputs); } } else { const copts = result.opts as ComponentResourceOptions; if (copts.providers !== undefined) { const providers = opts.getProvidersMap(); if (copts.providers && !Array.isArray(copts.providers)) { for (const k in copts.providers) { if (Object.prototype.hasOwnProperty.call(copts.providers, k)) { const v = copts.providers[k]; if (k !== v.getPackage()) { const message = `provider resource map where key ${k} doesn't match provider ${v.getPackage()}`; log.warn(message); } } } } const provs = Object.values(copts.providers); for (const prov of provs) { const providerURN = await prov.urn.promise(); const providerID = (await prov.id.promise()) || unknownValue; providers.set(prov.getPackage(), `${providerURN}::${providerID}`); } opts.clearProvidersMap(); } } } response.setOptions(opts); } return response; }; const tryCb = async (bytes: Uint8Array): Promise<jspb.Message> => { try { return await cb(bytes); } catch (e) { throw new Error(`transform failed: ${e}`); } }; const uuid = randomUUID(); this._callbacks.set(uuid, tryCb); const req = new Callback(); req.setToken(uuid); req.setTarget(await this._target); return req; } registerStackTransform(transform: ResourceTransform): void { this._pendingRegistrations++; this.registerTransform(transform) .then( (req) => { this._monitor.registerStackTransform(req, (err, _) => { if (err !== null) { // Remove this from the list of callbacks given we didn't manage to actually register it. this._callbacks.delete(req.getToken()); return; } }); }, (err) => log.error(`failed to register stack transform: ${err}`), ) .finally(() => { this._pendingRegistrations--; if (this._pendingRegistrations === 0) { const queue = this._awaitQueue; this._awaitQueue = []; for (const waiter of queue) { waiter(); } } }); } }