mirror of https://github.com/pulumi/pulumi.git
709 lines
28 KiB
TypeScript
709 lines
28 KiB
TypeScript
// Copyright 2016-2020, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
import * as grpc from "@grpc/grpc-js";
|
|
|
|
import { Provider } from "./provider";
|
|
|
|
import * as log from "../log";
|
|
import { Inputs, Output, output } from "../output";
|
|
import * as resource from "../resource";
|
|
import * as config from "../runtime/config";
|
|
import * as rpc from "../runtime/rpc";
|
|
import * as settings from "../runtime/settings";
|
|
import { parseArgs } from "./internals";
|
|
|
|
import * as gstruct from "google-protobuf/google/protobuf/struct_pb";
|
|
import * as anyproto from "google-protobuf/google/protobuf/any_pb";
|
|
import * as emptyproto from "google-protobuf/google/protobuf/empty_pb";
|
|
import * as structproto from "google-protobuf/google/protobuf/struct_pb";
|
|
import * as plugproto from "../../../proto/pulumi/plugin_pb";
|
|
import * as provrpc from "../../../proto/pulumi/provider_grpc_pb";
|
|
import * as provproto from "../../../proto/pulumi/provider_pb";
|
|
import * as statusproto from "../../../proto/google/protobuf/status_pb";
|
|
|
|
class Server implements grpc.UntypedServiceImplementation {
|
|
engineAddr: string | undefined;
|
|
readonly provider: Provider;
|
|
readonly uncaughtErrors: Set<Error>;
|
|
|
|
/** Queue of construct calls. */
|
|
constructCallQueue = Promise.resolve();
|
|
|
|
constructor(engineAddr: string | undefined, provider: Provider, uncaughtErrors: Set<Error>) {
|
|
this.engineAddr = engineAddr;
|
|
this.provider = provider;
|
|
this.uncaughtErrors = uncaughtErrors;
|
|
}
|
|
|
|
// Satisfy the grpc.UntypedServiceImplementation interface.
|
|
[name: string]: any;
|
|
|
|
// Misc. methods
|
|
|
|
public cancel(call: any, callback: any): void {
|
|
callback(undefined, new emptyproto.Empty());
|
|
}
|
|
|
|
public attach(call: any, callback: any): void {
|
|
const req = call.request;
|
|
const host = req.getAddress();
|
|
this.engineAddr = host;
|
|
callback(undefined, new emptyproto.Empty());
|
|
}
|
|
|
|
public getPluginInfo(call: any, callback: any): void {
|
|
const resp: any = new plugproto.PluginInfo();
|
|
resp.setVersion(this.provider.version);
|
|
callback(undefined, resp);
|
|
}
|
|
|
|
public getSchema(call: any, callback: any): void {
|
|
const req: any = call.request;
|
|
if (req.getVersion() !== 0) {
|
|
callback(new Error(`unsupported schema version ${req.getVersion()}`), undefined);
|
|
}
|
|
const resp: any = new provproto.GetSchemaResponse();
|
|
resp.setSchema(this.provider.schema || "{}");
|
|
callback(undefined, resp);
|
|
}
|
|
|
|
// Config methods
|
|
|
|
public checkConfig(call: any, callback: any): void {
|
|
callback(
|
|
{
|
|
code: grpc.status.UNIMPLEMENTED,
|
|
details: "Not yet implemented: CheckConfig",
|
|
},
|
|
undefined,
|
|
);
|
|
}
|
|
|
|
public diffConfig(call: any, callback: any): void {
|
|
callback(
|
|
{
|
|
code: grpc.status.UNIMPLEMENTED,
|
|
details: "Not yet implemented: DiffConfig",
|
|
},
|
|
undefined,
|
|
);
|
|
}
|
|
|
|
public configure(call: any, callback: any): void {
|
|
const resp = new provproto.ConfigureResponse();
|
|
resp.setAcceptsecrets(true);
|
|
resp.setAcceptresources(true);
|
|
resp.setAcceptoutputs(true);
|
|
callback(undefined, resp);
|
|
}
|
|
|
|
// CRUD resource methods
|
|
|
|
public async check(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
const resp = new provproto.CheckResponse();
|
|
|
|
const olds = req.getOlds().toJavaScript();
|
|
const news = req.getNews().toJavaScript();
|
|
|
|
let inputs: any = news;
|
|
let failures: any[] = [];
|
|
if (this.provider.check) {
|
|
const result = await this.provider.check(req.getUrn(), olds, news);
|
|
if (result.inputs) {
|
|
inputs = result.inputs;
|
|
}
|
|
if (result.failures) {
|
|
failures = result.failures;
|
|
}
|
|
} else {
|
|
// If no check method was provided, propagate the new inputs as-is.
|
|
inputs = news;
|
|
}
|
|
|
|
resp.setInputs(structproto.Struct.fromJavaScript(inputs));
|
|
|
|
if (failures.length !== 0) {
|
|
const failureList = [];
|
|
for (const f of failures) {
|
|
const failure = new provproto.CheckFailure();
|
|
failure.setProperty(f.property);
|
|
failure.setReason(f.reason);
|
|
failureList.push(failure);
|
|
}
|
|
resp.setFailuresList(failureList);
|
|
}
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
}
|
|
}
|
|
|
|
public async diff(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
const resp = new provproto.DiffResponse();
|
|
|
|
const olds = req.getOlds().toJavaScript();
|
|
const news = req.getNews().toJavaScript();
|
|
if (this.provider.diff) {
|
|
const result: any = await this.provider.diff(req.getId(), req.getUrn(), olds, news);
|
|
|
|
if (result.changes === true) {
|
|
resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_SOME);
|
|
} else if (result.changes === false) {
|
|
resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_NONE);
|
|
} else {
|
|
resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_UNKNOWN);
|
|
}
|
|
|
|
if (result.replaces && result.replaces.length !== 0) {
|
|
resp.setReplacesList(result.replaces);
|
|
}
|
|
if (result.deleteBeforeReplace) {
|
|
resp.setDeletebeforereplace(result.deleteBeforeReplace);
|
|
}
|
|
}
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
}
|
|
}
|
|
|
|
public async create(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
if (!this.provider.create) {
|
|
callback(new Error(`unknown resource type ${req.getUrn()}`), undefined);
|
|
return;
|
|
}
|
|
|
|
const resp = new provproto.CreateResponse();
|
|
const props = req.getProperties().toJavaScript();
|
|
const result = await this.provider.create(req.getUrn(), props);
|
|
resp.setId(result.id);
|
|
resp.setProperties(structproto.Struct.fromJavaScript(result.outs));
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
const response = grpcResponseFromError(e);
|
|
return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
|
|
}
|
|
}
|
|
|
|
public async read(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
const resp = new provproto.ReadResponse();
|
|
|
|
const id = req.getId();
|
|
const props = req.getProperties().toJavaScript();
|
|
if (this.provider.read) {
|
|
const result: any = await this.provider.read(id, req.getUrn(), props);
|
|
resp.setId(result.id);
|
|
resp.setProperties(structproto.Struct.fromJavaScript(result.props));
|
|
resp.setInputs(
|
|
result.inputs === undefined ? undefined : structproto.Struct.fromJavaScript(result.inputs),
|
|
);
|
|
} else {
|
|
// In the event of a missing read, simply return back the input state.
|
|
resp.setId(id);
|
|
resp.setProperties(req.getProperties());
|
|
}
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
}
|
|
}
|
|
|
|
public async update(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
const resp = new provproto.UpdateResponse();
|
|
|
|
const olds = req.getOlds().toJavaScript();
|
|
const news = req.getNews().toJavaScript();
|
|
|
|
let result: any = {};
|
|
if (this.provider.update) {
|
|
result = (await this.provider.update(req.getId(), req.getUrn(), olds, news)) || {};
|
|
}
|
|
|
|
resp.setProperties(structproto.Struct.fromJavaScript(result.outs));
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
const response = grpcResponseFromError(e);
|
|
return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
|
|
}
|
|
}
|
|
|
|
public async delete(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
const props: any = req.getProperties().toJavaScript();
|
|
if (this.provider.delete) {
|
|
await this.provider.delete(req.getId(), req.getUrn(), props);
|
|
}
|
|
callback(undefined, new emptyproto.Empty());
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
}
|
|
}
|
|
|
|
public async construct(call: any, callback: any): Promise<void> {
|
|
// Serialize invocations of `construct` and `call` so that each call runs one after another, avoiding concurrent
|
|
// runs. We do this because `construct` and `call` modify global state to reset the SDK's runtime options.
|
|
// This is a short-term workaround to provide correctness, but likely isn't sustainable long-term due to the
|
|
// limits it places on parallelism. We will likely want to investigate if it's possible to run each invocation
|
|
// in its own context, possibly using Node's `createContext` API to avoid modifying global state:
|
|
// https://nodejs.org/api/vm.html#vm_vm_createcontext_contextobject_options
|
|
const res = this.constructCallQueue.then(() => this.constructImpl(call, callback));
|
|
/* eslint-disable no-empty,no-empty-function,@typescript-eslint/no-empty-function */
|
|
this.constructCallQueue = res.catch(() => {});
|
|
return res;
|
|
}
|
|
|
|
async constructImpl(call: any, callback: any): Promise<void> {
|
|
// given that construct calls are serialized, we can attach an uncaught handler to pick up exceptions
|
|
// in underlying user code. When we catch the error, we need to respond to the gRPC request with the error
|
|
// to avoid a hang.
|
|
const uncaughtHandler = (err: Error) => {
|
|
if (!this.uncaughtErrors.has(err)) {
|
|
this.uncaughtErrors.add(err);
|
|
}
|
|
// bubble the uncaught error in the user code back and terminate the outstanding gRPC request.
|
|
callback(err, undefined);
|
|
};
|
|
process.on("uncaughtException", uncaughtHandler);
|
|
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
|
|
// just suppress the TS strictness here.
|
|
process.on("unhandledRejection", uncaughtHandler);
|
|
try {
|
|
const req: any = call.request;
|
|
const type = req.getType();
|
|
const name = req.getName();
|
|
|
|
if (!this.provider.construct) {
|
|
callback(new Error(`unknown resource type ${type}`), undefined);
|
|
return;
|
|
}
|
|
|
|
await configureRuntime(req, this.engineAddr);
|
|
|
|
const inputs = await deserializeInputs(req.getInputs(), req.getInputdependenciesMap());
|
|
|
|
// Rebuild the resource options.
|
|
const dependsOn: resource.Resource[] = [];
|
|
for (const urn of req.getDependenciesList()) {
|
|
dependsOn.push(new resource.DependencyResource(urn));
|
|
}
|
|
const providers: Record<string, resource.ProviderResource> = {};
|
|
const rpcProviders = req.getProvidersMap();
|
|
if (rpcProviders) {
|
|
for (const [pkg, ref] of rpcProviders.entries()) {
|
|
providers[pkg] = createProviderResource(ref);
|
|
}
|
|
}
|
|
const opts: resource.ComponentResourceOptions = {
|
|
aliases: req.getAliasesList(),
|
|
dependsOn: dependsOn,
|
|
protect: req.getProtect(),
|
|
providers: providers,
|
|
parent: req.getParent() ? new resource.DependencyResource(req.getParent()) : undefined,
|
|
};
|
|
|
|
const result = await this.provider.construct(name, type, inputs, opts);
|
|
|
|
const resp = new provproto.ConstructResponse();
|
|
|
|
resp.setUrn(await output(result.urn).promise());
|
|
|
|
const [state, stateDependencies] = await rpc.serializeResourceProperties(
|
|
`construct(${type}, ${name})`,
|
|
result.state,
|
|
);
|
|
const stateDependenciesMap = resp.getStatedependenciesMap();
|
|
for (const [key, resources] of stateDependencies) {
|
|
const deps = new provproto.ConstructResponse.PropertyDependencies();
|
|
deps.setUrnsList(await Promise.all(Array.from(resources).map((r) => r.urn.promise())));
|
|
stateDependenciesMap.set(key, deps);
|
|
}
|
|
resp.setState(structproto.Struct.fromJavaScript(state));
|
|
|
|
// Wait for RPC operations to complete.
|
|
await settings.waitForRPCs();
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
} finally {
|
|
// remove these uncaught handlers that are specific to this gRPC callback context
|
|
process.off("uncaughtException", uncaughtHandler);
|
|
process.off("unhandledRejection", uncaughtHandler);
|
|
}
|
|
}
|
|
|
|
public async call(call: any, callback: any): Promise<void> {
|
|
// Serialize invocations of `construct` and `call` so that each call runs one after another, avoiding concurrent
|
|
// runs. We do this because `construct` and `call` modify global state to reset the SDK's runtime options.
|
|
// This is a short-term workaround to provide correctness, but likely isn't sustainable long-term due to the
|
|
// limits it places on parallelism. We will likely want to investigate if it's possible to run each invocation
|
|
// in its own context, possibly using Node's `createContext` API to avoid modifying global state:
|
|
// https://nodejs.org/api/vm.html#vm_vm_createcontext_contextobject_options
|
|
const res = this.constructCallQueue.then(() => this.callImpl(call, callback));
|
|
/* eslint-disable no-empty, no-empty-function, @typescript-eslint/no-empty-function */
|
|
this.constructCallQueue = res.catch(() => {});
|
|
return res;
|
|
}
|
|
|
|
async callImpl(call: any, callback: any): Promise<void> {
|
|
// given that call calls are serialized, we can attach an uncaught handler to pick up exceptions
|
|
// in underlying user code. When we catch the error, we need to respond to the gRPC request with the error
|
|
// to avoid a hang.
|
|
const uncaughtHandler = (err: Error) => {
|
|
if (!this.uncaughtErrors.has(err)) {
|
|
this.uncaughtErrors.add(err);
|
|
}
|
|
// bubble the uncaught error in the user code back and terminate the outstanding gRPC request.
|
|
callback(err, undefined);
|
|
};
|
|
process.on("uncaughtException", uncaughtHandler);
|
|
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
|
|
// just suppress the TS strictness here.
|
|
process.on("unhandledRejection", uncaughtHandler);
|
|
try {
|
|
const req: any = call.request;
|
|
if (!this.provider.call) {
|
|
callback(new Error(`unknown function ${req.getTok()}`), undefined);
|
|
return;
|
|
}
|
|
|
|
await configureRuntime(req, this.engineAddr);
|
|
|
|
const args = await deserializeInputs(req.getArgs(), req.getArgdependenciesMap());
|
|
|
|
const result = await this.provider.call(req.getTok(), args);
|
|
|
|
const resp = new provproto.CallResponse();
|
|
|
|
if (result.outputs) {
|
|
const [ret, retDependencies] = await rpc.serializeResourceProperties(
|
|
`call(${req.getTok()})`,
|
|
result.outputs,
|
|
);
|
|
const returnDependenciesMap = resp.getReturndependenciesMap();
|
|
for (const [key, resources] of retDependencies) {
|
|
const deps = new provproto.CallResponse.ReturnDependencies();
|
|
deps.setUrnsList(await Promise.all(Array.from(resources).map((r) => r.urn.promise())));
|
|
returnDependenciesMap.set(key, deps);
|
|
}
|
|
resp.setReturn(structproto.Struct.fromJavaScript(ret));
|
|
}
|
|
|
|
if ((result.failures || []).length !== 0) {
|
|
const failureList = [];
|
|
for (const f of result.failures!) {
|
|
const failure = new provproto.CheckFailure();
|
|
failure.setProperty(f.property);
|
|
failure.setReason(f.reason);
|
|
failureList.push(failure);
|
|
}
|
|
resp.setFailuresList(failureList);
|
|
}
|
|
|
|
// Wait for RPC operations to complete.
|
|
await settings.waitForRPCs();
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
} finally {
|
|
// remove these uncaught handlers that are specific to this gRPC callback context
|
|
process.off("uncaughtException", uncaughtHandler);
|
|
process.off("unhandledRejection", uncaughtHandler);
|
|
}
|
|
}
|
|
|
|
public async invoke(call: any, callback: any): Promise<void> {
|
|
try {
|
|
const req: any = call.request;
|
|
if (!this.provider.invoke) {
|
|
callback(new Error(`unknown function ${req.getTok()}`), undefined);
|
|
return;
|
|
}
|
|
|
|
const args: any = req.getArgs().toJavaScript();
|
|
const result = await this.provider.invoke(req.getTok(), args);
|
|
|
|
const resp = new provproto.InvokeResponse();
|
|
resp.setReturn(structproto.Struct.fromJavaScript(result.outputs));
|
|
|
|
if ((result.failures || []).length !== 0) {
|
|
const failureList = [];
|
|
for (const f of result.failures!) {
|
|
const failure = new provproto.CheckFailure();
|
|
failure.setProperty(f.property);
|
|
failure.setReason(f.reason);
|
|
failureList.push(failure);
|
|
}
|
|
resp.setFailuresList(failureList);
|
|
}
|
|
|
|
callback(undefined, resp);
|
|
} catch (e) {
|
|
console.error(`${e}: ${e.stack}`);
|
|
callback(e, undefined);
|
|
}
|
|
}
|
|
|
|
public async streamInvoke(call: any, callback: any): Promise<void> {
|
|
callback(
|
|
{
|
|
code: grpc.status.UNIMPLEMENTED,
|
|
details: "Not yet implemented: StreamInvoke",
|
|
},
|
|
undefined,
|
|
);
|
|
}
|
|
}
|
|
|
|
async function configureRuntime(req: any, engineAddr: string | undefined) {
|
|
// NOTE: these are globals! We should ensure that all settings are identical between calls, and eventually
|
|
// refactor so we can avoid the global state.
|
|
if (engineAddr === undefined) {
|
|
throw new Error("fatal: Missing <engine> address");
|
|
}
|
|
|
|
settings.resetOptions(
|
|
req.getProject(),
|
|
req.getStack(),
|
|
req.getParallel(),
|
|
engineAddr,
|
|
req.getMonitorendpoint(),
|
|
req.getDryrun(),
|
|
req.getOrganization(),
|
|
);
|
|
|
|
// resetOptions doesn't reset the saved features
|
|
await settings.awaitFeatureSupport();
|
|
|
|
const pulumiConfig: { [key: string]: string } = {};
|
|
const rpcConfig = req.getConfigMap();
|
|
if (rpcConfig) {
|
|
for (const [k, v] of rpcConfig.entries()) {
|
|
pulumiConfig[k] = v;
|
|
}
|
|
}
|
|
config.setAllConfig(pulumiConfig, req.getConfigsecretkeysList());
|
|
}
|
|
|
|
/**
|
|
* deserializeInputs deserializes the inputs struct and applies appropriate dependencies.
|
|
* @internal
|
|
*/
|
|
export async function deserializeInputs(inputsStruct: gstruct.Struct, inputDependencies: any): Promise<Inputs> {
|
|
const result: Inputs = {};
|
|
|
|
const deserializedInputs = rpc.deserializeProperties(inputsStruct);
|
|
for (const k of Object.keys(deserializedInputs)) {
|
|
const input = deserializedInputs[k];
|
|
const isSecret = rpc.isRpcSecret(input);
|
|
const depsUrns: resource.URN[] = inputDependencies.get(k)?.getUrnsList() ?? [];
|
|
|
|
if (
|
|
!isSecret &&
|
|
(depsUrns.length === 0 || containsOutputs(input) || (await isResourceReference(input, depsUrns)))
|
|
) {
|
|
// If the input isn't a secret and either doesn't have any dependencies, already contains Outputs (from
|
|
// deserialized output values), or is a resource reference, then we can return it directly without
|
|
// wrapping it as an output.
|
|
result[k] = input;
|
|
} else {
|
|
// Otherwise, wrap it in an output so we can handle secrets and/or track dependencies.
|
|
// Note: If the value is or contains an unknown value, the Output will mark its value as
|
|
// unknown automatically, so we just pass true for isKnown here.
|
|
const deps = depsUrns.map((depUrn) => new resource.DependencyResource(depUrn));
|
|
result[k] = new Output(
|
|
deps,
|
|
Promise.resolve(rpc.unwrapRpcSecret(input)),
|
|
Promise.resolve(true),
|
|
Promise.resolve(isSecret),
|
|
Promise.resolve([]),
|
|
);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Returns true if the input is a resource reference.
|
|
*/
|
|
async function isResourceReference(input: any, deps: string[]): Promise<boolean> {
|
|
return resource.Resource.isInstance(input) && deps.length === 1 && deps[0] === (await input.urn.promise());
|
|
}
|
|
|
|
/**
|
|
* Returns true if the deserialized input contains Outputs (deeply), excluding properties of Resources.
|
|
* @internal
|
|
*/
|
|
export function containsOutputs(input: any): boolean {
|
|
if (Array.isArray(input)) {
|
|
for (const e of input) {
|
|
if (containsOutputs(e)) {
|
|
return true;
|
|
}
|
|
}
|
|
} else if (typeof input === "object") {
|
|
if (Output.isInstance(input)) {
|
|
return true;
|
|
} else if (resource.Resource.isInstance(input)) {
|
|
// Do not drill into instances of Resource because they will have properties that are
|
|
// instances of Output (e.g. urn, id, etc.) and we're only looking for instances of
|
|
// Output that aren't associated with a Resource.
|
|
return false;
|
|
}
|
|
|
|
for (const k of Object.keys(input)) {
|
|
if (containsOutputs(input[k])) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// grpcResponseFromError creates a gRPC response representing an error from a dynamic provider's
|
|
// resource. This is typically either a creation error, in which the API server has (virtually)
|
|
// rejected the resource, or an initialization error, where the API server has accepted the
|
|
// resource, but it failed to initialize (e.g., the app code is continually crashing and the
|
|
// resource has failed to become alive).
|
|
function grpcResponseFromError(e: { id: string; properties: any; message: string; reasons?: string[] }) {
|
|
// Create response object.
|
|
const resp = new statusproto.Status();
|
|
resp.setCode(grpc.status.UNKNOWN);
|
|
resp.setMessage(e.message);
|
|
|
|
const metadata = new grpc.Metadata();
|
|
if (e.id) {
|
|
// Object created successfully, but failed to initialize. Pack initialization failure into
|
|
// details.
|
|
const detail = new provproto.ErrorResourceInitFailed();
|
|
detail.setId(e.id);
|
|
detail.setProperties(structproto.Struct.fromJavaScript(e.properties || {}));
|
|
detail.setReasonsList(e.reasons || []);
|
|
|
|
const details = new anyproto.Any();
|
|
details.pack(detail.serializeBinary(), "pulumirpc.ErrorResourceInitFailed");
|
|
|
|
// Add details to metadata.
|
|
resp.addDetails(details);
|
|
// NOTE: `grpc-status-details-bin` is a magic field that allows us to send structured
|
|
// protobuf data as an error back through gRPC. This notion of details is a first-class in
|
|
// the Go gRPC implementation, and the nodejs implementation has not quite caught up to it,
|
|
// which is why it's cumbersome here.
|
|
metadata.add("grpc-status-details-bin", Buffer.from(resp.serializeBinary()));
|
|
}
|
|
|
|
return {
|
|
code: grpc.status.UNKNOWN,
|
|
message: e.message,
|
|
metadata: metadata,
|
|
};
|
|
}
|
|
|
|
export async function main(provider: Provider, args: string[]) {
|
|
// We track all uncaught errors here. If we have any, we will make sure we always have a non-0 exit
|
|
// code.
|
|
const uncaughtErrors = new Set<Error>();
|
|
const uncaughtHandler = (err: Error) => {
|
|
if (!uncaughtErrors.has(err)) {
|
|
uncaughtErrors.add(err);
|
|
// Use `pulumi.log.error` here to tell the engine there was a fatal error, which should
|
|
// stop processing subsequent resource operations.
|
|
log.error(err.stack || err.message || "" + err);
|
|
}
|
|
};
|
|
|
|
process.on("uncaughtException", uncaughtHandler);
|
|
// @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so just
|
|
// suppress the TS strictness here.
|
|
process.on("unhandledRejection", uncaughtHandler);
|
|
process.on("exit", (code: number) => {
|
|
// If there were any uncaught errors at all, we always want to exit with an error code.
|
|
if (code === 0 && uncaughtErrors.size > 0) {
|
|
process.exitCode = 1;
|
|
}
|
|
});
|
|
|
|
const parsedArgs = parseArgs(args);
|
|
|
|
// Finally connect up the gRPC client/server and listen for incoming requests.
|
|
const server = new grpc.Server({
|
|
"grpc.max_receive_message_length": settings.maxRPCMessageSize,
|
|
});
|
|
|
|
// The program receives a single optional argument: the address of the RPC endpoint for the engine. It
|
|
// optionally also takes a second argument, a reference back to the engine, but this may be missing.
|
|
|
|
const engineAddr = parsedArgs?.engineAddress;
|
|
server.addService(provrpc.ResourceProviderService, new Server(engineAddr, provider, uncaughtErrors));
|
|
const port: number = await new Promise<number>((resolve, reject) => {
|
|
server.bindAsync(`127.0.0.1:0`, grpc.ServerCredentials.createInsecure(), (err, p) => {
|
|
if (err) {
|
|
reject(err);
|
|
} else {
|
|
resolve(p);
|
|
}
|
|
});
|
|
});
|
|
|
|
// Emit the address so the monitor can read it to connect. The gRPC server will keep the message loop alive.
|
|
// We explicitly convert the number to a string so that Node doesn't colorize the output.
|
|
console.log(port.toString());
|
|
}
|
|
|
|
/**
|
|
* Rehydrate the provider reference into a registered ProviderResource,
|
|
* otherwise return an instance of DependencyProviderResource.
|
|
*/
|
|
function createProviderResource(ref: string): resource.ProviderResource {
|
|
const [urn] = resource.parseResourceReference(ref);
|
|
const urnParts = urn.split("::");
|
|
const qualifiedType = urnParts[2];
|
|
const urnName = urnParts[3];
|
|
|
|
const type = qualifiedType.split("$").pop()!;
|
|
const typeParts = type.split(":");
|
|
const typName = typeParts.length > 2 ? typeParts[2] : "";
|
|
|
|
const resourcePackage = rpc.getResourcePackage(typName, /*version:*/ "");
|
|
if (resourcePackage) {
|
|
return resourcePackage.constructProvider(urnName, type, urn);
|
|
}
|
|
return new resource.DependencyProviderResource(ref);
|
|
}
|