mirror of https://github.com/pulumi/pulumi.git
550 lines
18 KiB
TypeScript
550 lines
18 KiB
TypeScript
// Copyright 2016-2021, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
import * as grpc from "@grpc/grpc-js";
|
|
import * as fs from "fs";
|
|
import * as path from "path";
|
|
import { ComponentResource } from "../resource";
|
|
import { CallbackServer, ICallbackServer } from "./callbacks";
|
|
import { debuggablePromise } from "./debuggable";
|
|
import { getLocalStore, getStore } from "./state";
|
|
|
|
console.log(process.cwd())
|
|
|
|
import * as engrpc from "../../../proto/pulumi/engine_grpc_pb";
|
|
import * as engproto from "../../../proto/pulumi/engine_pb";
|
|
import * as resrpc from "../../../proto/pulumi/resource_grpc_pb";
|
|
import * as resproto from "../../../proto/pulumi/resource_pb";
|
|
|
|
// maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb)
|
|
/** @internal */
|
|
export const maxRPCMessageSize: number = 1024 * 1024 * 400;
|
|
const grpcChannelOptions = { "grpc.max_receive_message_length": maxRPCMessageSize };
|
|
|
|
/**
|
|
* excessiveDebugOutput enables, well, pretty excessive debug output pertaining to resources and properties.
|
|
*/
|
|
export const excessiveDebugOutput: boolean = false;
|
|
|
|
/**
|
|
* Options is a bag of settings that controls the behavior of previews and deployments
|
|
*/
|
|
export interface Options {
|
|
readonly project?: string; // the name of the current project.
|
|
readonly stack?: string; // the name of the current stack being deployed into.
|
|
readonly parallel?: number; // the degree of parallelism for resource operations (default is serial).
|
|
readonly engineAddr?: string; // a connection string to the engine's RPC, in case we need to reestablish.
|
|
readonly monitorAddr?: string; // a connection string to the monitor's RPC, in case we need to reestablish.
|
|
readonly dryRun?: boolean; // whether we are performing a preview (true) or a real deployment (false).
|
|
readonly testModeEnabled?: boolean; // true if we're in testing mode (allows execution without the CLI).
|
|
readonly queryMode?: boolean; // true if we're in query mode (does not allow resource registration).
|
|
readonly legacyApply?: boolean; // true if we will resolve missing outputs to inputs during preview.
|
|
readonly cacheDynamicProviders?: boolean; // true if we will cache serialized dynamic providers on the program side.
|
|
readonly organization?: string; // the name of the current organization.
|
|
|
|
/**
|
|
* Directory containing the send/receive files for making synchronous invokes to the engine.
|
|
*/
|
|
readonly syncDir?: string;
|
|
}
|
|
|
|
let monitor: resrpc.ResourceMonitorClient | undefined;
|
|
let engine: engrpc.EngineClient | undefined;
|
|
|
|
// reset options resets nodejs runtime global state (such as rpc clients),
|
|
// and sets nodejs runtime option env vars to the specified values.
|
|
export function resetOptions(
|
|
project: string,
|
|
stack: string,
|
|
parallel: number,
|
|
engineAddr: string,
|
|
monitorAddr: string,
|
|
preview: boolean,
|
|
organization: string,
|
|
) {
|
|
const store = getStore();
|
|
|
|
monitor = undefined;
|
|
engine = undefined;
|
|
|
|
store.settings.monitor = undefined;
|
|
store.settings.engine = undefined;
|
|
store.settings.rpcDone = Promise.resolve();
|
|
store.settings.featureSupport = {};
|
|
|
|
// reset node specific environment variables in the process
|
|
store.settings.options.project = project;
|
|
store.settings.options.stack = stack;
|
|
store.settings.options.dryRun = preview;
|
|
store.settings.options.queryMode = isQueryMode();
|
|
store.settings.options.parallel = parallel;
|
|
store.settings.options.monitorAddr = monitorAddr;
|
|
store.settings.options.engineAddr = engineAddr;
|
|
store.settings.options.organization = organization;
|
|
|
|
store.leakCandidates = new Set<Promise<any>>();
|
|
store.logErrorCount = 0;
|
|
store.stackResource = undefined;
|
|
store.supportsSecrets = false;
|
|
store.supportsResourceReferences = false;
|
|
store.supportsOutputValues = false;
|
|
store.supportsDeletedWith = false;
|
|
store.supportsAliasSpecs = false;
|
|
store.supportsTransforms = false;
|
|
store.callbacks = undefined;
|
|
}
|
|
|
|
export function setMockOptions(
|
|
mockMonitor: any,
|
|
project?: string,
|
|
stack?: string,
|
|
preview?: boolean,
|
|
organization?: string,
|
|
) {
|
|
const opts = options();
|
|
resetOptions(
|
|
project || opts.project || "project",
|
|
stack || opts.stack || "stack",
|
|
opts.parallel || -1,
|
|
opts.engineAddr || "",
|
|
opts.monitorAddr || "",
|
|
preview || false,
|
|
organization || "",
|
|
);
|
|
|
|
monitor = mockMonitor;
|
|
}
|
|
|
|
/** @internal Used only for testing purposes. */
|
|
export function _setIsDryRun(val: boolean) {
|
|
const { settings } = getStore();
|
|
settings.options.dryRun = val;
|
|
}
|
|
|
|
/**
|
|
* Returns whether or not we are currently doing a preview.
|
|
*
|
|
* When writing unit tests, you can set this flag via either `setMocks` or `_setIsDryRun`.
|
|
*/
|
|
export function isDryRun(): boolean {
|
|
return options().dryRun === true;
|
|
}
|
|
|
|
/**
|
|
* monitorSupportsFeature returns a promise that when resolved tells you if the resource monitor we are connected
|
|
* to is able to support a particular feature.
|
|
*
|
|
* @internal
|
|
*/
|
|
async function monitorSupportsFeature(monitorClient: resrpc.IResourceMonitorClient, feature: string): Promise<boolean> {
|
|
const req = new resproto.SupportsFeatureRequest();
|
|
req.setId(feature);
|
|
|
|
const result = await new Promise<boolean>((resolve, reject) => {
|
|
monitorClient.supportsFeature(
|
|
req,
|
|
(err: grpc.ServiceError | null, resp: resproto.SupportsFeatureResponse | undefined) => {
|
|
// Back-compat case - if the monitor doesn't let us ask if it supports a feature, it doesn't support
|
|
// any features.
|
|
if (err && err.code === grpc.status.UNIMPLEMENTED) {
|
|
return resolve(false);
|
|
}
|
|
|
|
if (err) {
|
|
return reject(err);
|
|
}
|
|
|
|
if (resp === undefined) {
|
|
return reject(new Error("No response from resource monitor"));
|
|
}
|
|
|
|
return resolve(resp.getHassupport());
|
|
},
|
|
);
|
|
});
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Queries the resource monitor for its capabilities and sets the appropriate flags in the store.
|
|
*
|
|
* @internal
|
|
**/
|
|
export async function awaitFeatureSupport(): Promise<void> {
|
|
const monitorRef = getMonitor();
|
|
if (monitorRef !== undefined) {
|
|
const store = getStore();
|
|
store.supportsSecrets = await monitorSupportsFeature(monitorRef, "secrets");
|
|
store.supportsResourceReferences = await monitorSupportsFeature(monitorRef, "resourceReferences");
|
|
store.supportsOutputValues = await monitorSupportsFeature(monitorRef, "outputValues");
|
|
store.supportsDeletedWith = await monitorSupportsFeature(monitorRef, "deletedWith");
|
|
store.supportsAliasSpecs = await monitorSupportsFeature(monitorRef, "aliasSpecs");
|
|
store.supportsTransforms = await monitorSupportsFeature(monitorRef, "transforms");
|
|
}
|
|
}
|
|
|
|
/** @internal Used only for testing purposes. */
|
|
export function _setQueryMode(val: boolean) {
|
|
const { settings } = getStore();
|
|
settings.options.queryMode = val;
|
|
}
|
|
|
|
/** @internal Used only for testing purposes */
|
|
export function _reset(): void {
|
|
resetOptions("", "", -1, "", "", false, "");
|
|
}
|
|
|
|
/**
|
|
* Returns true if query mode is enabled.
|
|
*/
|
|
export function isQueryMode(): boolean {
|
|
return options().queryMode === true;
|
|
}
|
|
|
|
/**
|
|
* Returns true if we will resolve missing outputs to inputs during preview (PULUMI_ENABLE_LEGACY_APPLY).
|
|
*/
|
|
export function isLegacyApplyEnabled(): boolean {
|
|
return options().legacyApply === true;
|
|
}
|
|
|
|
/**
|
|
* Returns true (default) if we will cache serialized dynamic providers on the program side
|
|
*/
|
|
export function cacheDynamicProviders(): boolean {
|
|
return options().cacheDynamicProviders === true;
|
|
}
|
|
|
|
/**
|
|
* Get the organization being run by the current update.
|
|
*/
|
|
export function getOrganization(): string {
|
|
const organization = options().organization;
|
|
if (organization) {
|
|
return organization;
|
|
}
|
|
|
|
// If the organization is missing, specialize the error.
|
|
// Throw an error if test mode is enabled, instructing how to manually configure the organization:
|
|
throw new Error("Missing organization name; for test mode, please call `pulumi.runtime.setMocks`");
|
|
}
|
|
|
|
/** @internal Used only for testing purposes. */
|
|
export function _setOrganization(val: string | undefined) {
|
|
const { settings } = getStore();
|
|
settings.options.organization = val;
|
|
return settings.options.organization;
|
|
}
|
|
|
|
/**
|
|
* Get the project being run by the current update.
|
|
*/
|
|
export function getProject(): string {
|
|
const { project } = options();
|
|
return project || "";
|
|
}
|
|
|
|
/** @internal Used only for testing purposes. */
|
|
export function _setProject(val: string | undefined) {
|
|
const { settings } = getStore();
|
|
settings.options.project = val;
|
|
return settings.options.project;
|
|
}
|
|
|
|
/**
|
|
* Get the stack being targeted by the current update.
|
|
*/
|
|
export function getStack(): string {
|
|
const { stack } = options();
|
|
return stack || "";
|
|
}
|
|
|
|
/** @internal Used only for testing purposes. */
|
|
export function _setStack(val: string | undefined) {
|
|
const { settings } = getStore();
|
|
settings.options.stack = val;
|
|
return settings.options.stack;
|
|
}
|
|
|
|
/**
|
|
* hasMonitor returns true if we are currently connected to a resource monitoring service.
|
|
*/
|
|
export function hasMonitor(): boolean {
|
|
return !!monitor && !!options().monitorAddr;
|
|
}
|
|
|
|
/**
|
|
* getMonitor returns the current resource monitoring service client for RPC communications.
|
|
*/
|
|
export function getMonitor(): resrpc.IResourceMonitorClient | undefined {
|
|
const { settings } = getStore();
|
|
const addr = options().monitorAddr;
|
|
if (getLocalStore() === undefined) {
|
|
if (monitor === undefined) {
|
|
if (addr) {
|
|
// Lazily initialize the RPC connection to the monitor.
|
|
monitor = new resrpc.ResourceMonitorClient(addr, grpc.credentials.createInsecure(), grpcChannelOptions);
|
|
settings.options.monitorAddr = addr;
|
|
}
|
|
}
|
|
return monitor;
|
|
} else {
|
|
if (settings.monitor === undefined) {
|
|
if (addr) {
|
|
// Lazily initialize the RPC connection to the monitor.
|
|
settings.monitor = new resrpc.ResourceMonitorClient(
|
|
addr,
|
|
grpc.credentials.createInsecure(),
|
|
grpcChannelOptions,
|
|
);
|
|
settings.options.monitorAddr = addr;
|
|
}
|
|
}
|
|
return settings.monitor;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Waits for any pending stack transforms to register.
|
|
*/
|
|
export async function awaitStackRegistrations(): Promise<void> {
|
|
const store = getStore();
|
|
const callbacks = store.callbacks;
|
|
if (callbacks === undefined) {
|
|
return;
|
|
}
|
|
return await callbacks.awaitStackRegistrations();
|
|
}
|
|
|
|
/**
|
|
* getCallbacks returns the current callbacks for RPC communications.
|
|
*/
|
|
export function getCallbacks(): ICallbackServer | undefined {
|
|
const store = getStore();
|
|
const callbacks = store.callbacks;
|
|
if (callbacks !== undefined) {
|
|
return callbacks;
|
|
}
|
|
|
|
const monitorRef = getMonitor();
|
|
if (monitorRef === undefined) {
|
|
return undefined;
|
|
}
|
|
|
|
const callbackServer = new CallbackServer(monitorRef);
|
|
store.callbacks = callbackServer;
|
|
return callbackServer;
|
|
}
|
|
|
|
/** @internal */
|
|
export interface SyncInvokes {
|
|
requests: number;
|
|
responses: number;
|
|
}
|
|
|
|
let syncInvokes: SyncInvokes | undefined;
|
|
|
|
/** @internal */
|
|
export function tryGetSyncInvokes(): SyncInvokes | undefined {
|
|
const syncDir = options().syncDir;
|
|
if (syncInvokes === undefined && syncDir) {
|
|
const requests = fs.openSync(path.join(syncDir, "invoke_req"), fs.constants.O_WRONLY | fs.constants.O_SYNC);
|
|
const responses = fs.openSync(path.join(syncDir, "invoke_res"), fs.constants.O_RDONLY | fs.constants.O_SYNC);
|
|
syncInvokes = { requests, responses };
|
|
}
|
|
|
|
return syncInvokes;
|
|
}
|
|
|
|
/**
|
|
* hasEngine returns true if we are currently connected to an engine.
|
|
*/
|
|
export function hasEngine(): boolean {
|
|
return !!engine && !!options().engineAddr;
|
|
}
|
|
|
|
/**
|
|
* getEngine returns the current engine, if any, for RPC communications back to the resource engine.
|
|
*/
|
|
export function getEngine(): engrpc.IEngineClient | undefined {
|
|
const { settings } = getStore();
|
|
if (getLocalStore() === undefined) {
|
|
if (engine === undefined) {
|
|
const addr = options().engineAddr;
|
|
if (addr) {
|
|
// Lazily initialize the RPC connection to the engine.
|
|
engine = new engrpc.EngineClient(addr, grpc.credentials.createInsecure(), grpcChannelOptions);
|
|
}
|
|
}
|
|
return engine;
|
|
} else {
|
|
if (settings.engine === undefined) {
|
|
const addr = options().engineAddr;
|
|
if (addr) {
|
|
// Lazily initialize the RPC connection to the engine.
|
|
settings.engine = new engrpc.EngineClient(addr, grpc.credentials.createInsecure(), grpcChannelOptions);
|
|
}
|
|
}
|
|
return settings.engine;
|
|
}
|
|
}
|
|
|
|
export function terminateRpcs() {
|
|
disconnectSync();
|
|
}
|
|
|
|
/**
|
|
* serialize returns true if resource operations should be serialized.
|
|
*/
|
|
export function serialize(): boolean {
|
|
return options().parallel === 1;
|
|
}
|
|
|
|
/**
|
|
* options returns the options from the environment, which is the source of truth. Options are global per process.
|
|
* For CLI driven programs, pulumi-language-nodejs sets environment variables prior to the user program loading,
|
|
* meaning that options could be loaded up front and cached.
|
|
* Automation API and multi-language components introduced more complex lifecycles for runtime options().
|
|
* These language hosts manage the lifecycle of options manually throughout the lifetime of the nodejs process.
|
|
* In addition, node module resolution can lead to duplicate copies of @pulumi/pulumi and thus duplicate options
|
|
* objects that may not be synced if options are cached upfront. Mutating options must write to the environment
|
|
* and reading options must always read directly from the environment.
|
|
|
|
*/
|
|
function options(): Options {
|
|
const { settings } = getStore();
|
|
|
|
return settings.options;
|
|
}
|
|
|
|
/**
|
|
* disconnect permanently disconnects from the server, closing the connections. It waits for the existing RPC
|
|
* queue to drain. If any RPCs come in afterwards, however, they will crash the process.
|
|
*/
|
|
export function disconnect(): Promise<void> {
|
|
return waitForRPCs(/*disconnectFromServers*/ true);
|
|
}
|
|
|
|
/** @internal */
|
|
export function waitForRPCs(disconnectFromServers = false): Promise<void> {
|
|
const localStore = getStore();
|
|
let done: Promise<any> | undefined;
|
|
const closeCallback: () => Promise<void> = () => {
|
|
if (done !== localStore.settings.rpcDone) {
|
|
// If the done promise has changed, some activity occurred in between callbacks. Wait again.
|
|
done = localStore.settings.rpcDone;
|
|
return debuggablePromise(done.then(closeCallback), "disconnect");
|
|
}
|
|
if (disconnectFromServers) {
|
|
disconnectSync();
|
|
}
|
|
return Promise.resolve();
|
|
};
|
|
return closeCallback();
|
|
}
|
|
|
|
/**
|
|
* getMaximumListeners returns the configured number of process listeners available
|
|
*/
|
|
export function getMaximumListeners(): number {
|
|
const { settings } = getStore();
|
|
return settings.options.maximumProcessListeners;
|
|
}
|
|
|
|
/**
|
|
* disconnectSync permanently disconnects from the server, closing the connections. Unlike `disconnect`. it does not
|
|
* wait for the existing RPC queue to drain. Any RPCs that come in after this call will crash the process.
|
|
*/
|
|
export function disconnectSync(): void {
|
|
// Otherwise, actually perform the close activities (ignoring errors and crashes).
|
|
const store = getStore();
|
|
if (store.callbacks) {
|
|
store.callbacks.shutdown();
|
|
store.callbacks = undefined;
|
|
}
|
|
|
|
if (monitor) {
|
|
try {
|
|
monitor.close();
|
|
} catch (err) {
|
|
// ignore.
|
|
}
|
|
monitor = undefined;
|
|
}
|
|
if (engine) {
|
|
try {
|
|
engine.close();
|
|
} catch (err) {
|
|
// ignore.
|
|
}
|
|
engine = undefined;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* rpcKeepAlive registers a pending call to ensure that we don't prematurely disconnect from the server. It returns
|
|
* a function that, when invoked, signals that the RPC has completed.
|
|
*/
|
|
export function rpcKeepAlive(): () => void {
|
|
const localStore = getStore();
|
|
let done: (() => void) | undefined = undefined;
|
|
const donePromise = debuggablePromise(
|
|
new Promise<void>((resolve) => {
|
|
done = resolve;
|
|
return done;
|
|
}),
|
|
"rpcKeepAlive",
|
|
);
|
|
localStore.settings.rpcDone = localStore.settings.rpcDone.then(() => donePromise);
|
|
return done!;
|
|
}
|
|
|
|
/**
|
|
* setRootResource registers a resource that will become the default parent for all resources without explicit parents.
|
|
*/
|
|
export async function setRootResource(res: ComponentResource): Promise<void> {
|
|
// This is the first async point of program startup where we can query the resource monitor for its capabilities.
|
|
await awaitFeatureSupport();
|
|
|
|
const engineRef = getEngine();
|
|
if (!engineRef) {
|
|
return Promise.resolve();
|
|
}
|
|
|
|
// Back-compat case - Try to set the root URN for SxS old SDKs that expect the engine to roundtrip the
|
|
// stack URN.
|
|
const req = new engproto.SetRootResourceRequest();
|
|
const urn = await res.urn.promise();
|
|
req.setUrn(urn);
|
|
return new Promise<void>((resolve, reject) => {
|
|
engineRef.setRootResource(
|
|
req,
|
|
(err: grpc.ServiceError | null, resp: engproto.SetRootResourceResponse | undefined) => {
|
|
// Back-compat case - if the engine we're speaking to isn't aware that it can save and load root
|
|
// resources, just ignore there's nothing we can do.
|
|
if (err && err.code === grpc.status.UNIMPLEMENTED) {
|
|
return resolve();
|
|
}
|
|
|
|
if (err) {
|
|
return reject(err);
|
|
}
|
|
|
|
return resolve();
|
|
},
|
|
);
|
|
});
|
|
}
|