// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as grpc from "@grpc/grpc-js";
import * as fs from "fs";
import * as path from "path";
import { ComponentResource } from "../resource";
import { CallbackServer, ICallbackServer } from "./callbacks";
import { debuggablePromise } from "./debuggable";
import { getLocalStore, getStore } from "./state";

import * as engrpc from "../proto/engine_grpc_pb";
import * as engproto from "../proto/engine_pb";
import * as resrpc from "../proto/resource_grpc_pb";
import * as resproto from "../proto/resource_pb";

// maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb)
/** @internal */
export const maxRPCMessageSize: number = 1024 * 1024 * 400;
const grpcChannelOptions = { "grpc.max_receive_message_length": maxRPCMessageSize };

/**
 * excessiveDebugOutput enables, well, pretty excessive debug output pertaining to resources and properties.
 */
export const excessiveDebugOutput: boolean = false;

/**
 * Options is a bag of settings that controls the behavior of previews and deployments
 */
export interface Options {
    readonly project?: string; // the name of the current project.
    readonly stack?: string; // the name of the current stack being deployed into.
    readonly parallel?: number; // the degree of parallelism for resource operations (default is serial).
    readonly engineAddr?: string; // a connection string to the engine's RPC, in case we need to reestablish.
    readonly monitorAddr?: string; // a connection string to the monitor's RPC, in case we need to reestablish.
    readonly dryRun?: boolean; // whether we are performing a preview (true) or a real deployment (false).
    readonly testModeEnabled?: boolean; // true if we're in testing mode (allows execution without the CLI).
    readonly queryMode?: boolean; // true if we're in query mode (does not allow resource registration).
    readonly legacyApply?: boolean; // true if we will resolve missing outputs to inputs during preview.
    readonly cacheDynamicProviders?: boolean; // true if we will cache serialized dynamic providers on the program side.
    readonly organization?: string; // the name of the current organization.

    /**
     * Directory containing the send/receive files for making synchronous invokes to the engine.
     */
    readonly syncDir?: string;
}

let monitor: resrpc.ResourceMonitorClient | undefined;
let engine: engrpc.EngineClient | undefined;

// reset options resets nodejs runtime global state (such as rpc clients),
// and sets nodejs runtime option env vars to the specified values.
export function resetOptions(
    project: string,
    stack: string,
    parallel: number,
    engineAddr: string,
    monitorAddr: string,
    preview: boolean,
    organization: string,
) {
    const store = getStore();

    monitor = undefined;
    engine = undefined;

    store.settings.monitor = undefined;
    store.settings.engine = undefined;
    store.settings.rpcDone = Promise.resolve();
    store.settings.featureSupport = {};

    // reset node specific environment variables in the process
    store.settings.options.project = project;
    store.settings.options.stack = stack;
    store.settings.options.dryRun = preview;
    store.settings.options.queryMode = isQueryMode();
    store.settings.options.parallel = parallel;
    store.settings.options.monitorAddr = monitorAddr;
    store.settings.options.engineAddr = engineAddr;
    store.settings.options.organization = organization;

    store.leakCandidates = new Set<Promise<any>>();
    store.logErrorCount = 0;
    store.stackResource = undefined;
    store.supportsSecrets = false;
    store.supportsResourceReferences = false;
    store.supportsOutputValues = false;
    store.supportsDeletedWith = false;
    store.supportsAliasSpecs = false;
    store.supportsTransforms = false;
    store.callbacks = undefined;
}

export function setMockOptions(
    mockMonitor: any,
    project?: string,
    stack?: string,
    preview?: boolean,
    organization?: string,
) {
    const opts = options();
    resetOptions(
        project || opts.project || "project",
        stack || opts.stack || "stack",
        opts.parallel || -1,
        opts.engineAddr || "",
        opts.monitorAddr || "",
        preview || false,
        organization || "",
    );

    monitor = mockMonitor;
}

/** @internal Used only for testing purposes. */
export function _setIsDryRun(val: boolean) {
    const { settings } = getStore();
    settings.options.dryRun = val;
}

/**
 * Returns whether or not we are currently doing a preview.
 *
 * When writing unit tests, you can set this flag via either `setMocks` or `_setIsDryRun`.
 */
export function isDryRun(): boolean {
    return options().dryRun === true;
}

/**
 * monitorSupportsFeature returns a promise that when resolved tells you if the resource monitor we are connected
 * to is able to support a particular feature.
 *
 * @internal
 */
async function monitorSupportsFeature(monitorClient: resrpc.IResourceMonitorClient, feature: string): Promise<boolean> {
    const req = new resproto.SupportsFeatureRequest();
    req.setId(feature);

    const result = await new Promise<boolean>((resolve, reject) => {
        monitorClient.supportsFeature(
            req,
            (err: grpc.ServiceError | null, resp: resproto.SupportsFeatureResponse | undefined) => {
                // Back-compat case - if the monitor doesn't let us ask if it supports a feature, it doesn't support
                // any features.
                if (err && err.code === grpc.status.UNIMPLEMENTED) {
                    return resolve(false);
                }

                if (err) {
                    return reject(err);
                }

                if (resp === undefined) {
                    return reject(new Error("No response from resource monitor"));
                }

                return resolve(resp.getHassupport());
            },
        );
    });

    return result;
}

/**
 * Queries the resource monitor for its capabilities and sets the appropriate flags in the store.
 *
 * @internal
 **/
export async function awaitFeatureSupport(): Promise<void> {
    const monitorRef = getMonitor();
    if (monitorRef !== undefined) {
        const store = getStore();
        store.supportsSecrets = await monitorSupportsFeature(monitorRef, "secrets");
        store.supportsResourceReferences = await monitorSupportsFeature(monitorRef, "resourceReferences");
        store.supportsOutputValues = await monitorSupportsFeature(monitorRef, "outputValues");
        store.supportsDeletedWith = await monitorSupportsFeature(monitorRef, "deletedWith");
        store.supportsAliasSpecs = await monitorSupportsFeature(monitorRef, "aliasSpecs");
        store.supportsTransforms = await monitorSupportsFeature(monitorRef, "transforms");
    }
}

/** @internal Used only for testing purposes. */
export function _setQueryMode(val: boolean) {
    const { settings } = getStore();
    settings.options.queryMode = val;
}

/** @internal Used only for testing purposes */
export function _reset(): void {
    resetOptions("", "", -1, "", "", false, "");
}

/**
 * Returns true if query mode is enabled.
 */
export function isQueryMode(): boolean {
    return options().queryMode === true;
}

/**
 * Returns true if we will resolve missing outputs to inputs during preview (PULUMI_ENABLE_LEGACY_APPLY).
 */
export function isLegacyApplyEnabled(): boolean {
    return options().legacyApply === true;
}

/**
 * Returns true (default) if we will cache serialized dynamic providers on the program side
 */
export function cacheDynamicProviders(): boolean {
    return options().cacheDynamicProviders === true;
}

/**
 * Get the organization being run by the current update.
 */
export function getOrganization(): string {
    const organization = options().organization;
    if (organization) {
        return organization;
    }

    // If the organization is missing, specialize the error.
    // Throw an error if test mode is enabled, instructing how to manually configure the organization:
    throw new Error("Missing organization name; for test mode, please call `pulumi.runtime.setMocks`");
}

/** @internal Used only for testing purposes. */
export function _setOrganization(val: string | undefined) {
    const { settings } = getStore();
    settings.options.organization = val;
    return settings.options.organization;
}

/**
 * Get the project being run by the current update.
 */
export function getProject(): string {
    const { project } = options();
    return project || "";
}

/** @internal Used only for testing purposes. */
export function _setProject(val: string | undefined) {
    const { settings } = getStore();
    settings.options.project = val;
    return settings.options.project;
}

/**
 * Get the stack being targeted by the current update.
 */
export function getStack(): string {
    const { stack } = options();
    return stack || "";
}

/** @internal Used only for testing purposes. */
export function _setStack(val: string | undefined) {
    const { settings } = getStore();
    settings.options.stack = val;
    return settings.options.stack;
}

/**
 * hasMonitor returns true if we are currently connected to a resource monitoring service.
 */
export function hasMonitor(): boolean {
    return !!monitor && !!options().monitorAddr;
}

/**
 * getMonitor returns the current resource monitoring service client for RPC communications.
 */
export function getMonitor(): resrpc.IResourceMonitorClient | undefined {
    const { settings } = getStore();
    const addr = options().monitorAddr;
    if (getLocalStore() === undefined) {
        if (monitor === undefined) {
            if (addr) {
                // Lazily initialize the RPC connection to the monitor.
                monitor = new resrpc.ResourceMonitorClient(addr, grpc.credentials.createInsecure(), grpcChannelOptions);
                settings.options.monitorAddr = addr;
            }
        }
        return monitor;
    } else {
        if (settings.monitor === undefined) {
            if (addr) {
                // Lazily initialize the RPC connection to the monitor.
                settings.monitor = new resrpc.ResourceMonitorClient(
                    addr,
                    grpc.credentials.createInsecure(),
                    grpcChannelOptions,
                );
                settings.options.monitorAddr = addr;
            }
        }
        return settings.monitor;
    }
}

/**
 * Waits for any pending stack transforms to register.
 */
export async function awaitStackRegistrations(): Promise<void> {
    const store = getStore();
    const callbacks = store.callbacks;
    if (callbacks === undefined) {
        return;
    }
    return await callbacks.awaitStackRegistrations();
}

/**
 * getCallbacks returns the current callbacks for RPC communications.
 */
export function getCallbacks(): ICallbackServer | undefined {
    const store = getStore();
    const callbacks = store.callbacks;
    if (callbacks !== undefined) {
        return callbacks;
    }

    const monitorRef = getMonitor();
    if (monitorRef === undefined) {
        return undefined;
    }

    const callbackServer = new CallbackServer(monitorRef);
    store.callbacks = callbackServer;
    return callbackServer;
}

/** @internal */
export interface SyncInvokes {
    requests: number;
    responses: number;
}

let syncInvokes: SyncInvokes | undefined;

/** @internal */
export function tryGetSyncInvokes(): SyncInvokes | undefined {
    const syncDir = options().syncDir;
    if (syncInvokes === undefined && syncDir) {
        const requests = fs.openSync(path.join(syncDir, "invoke_req"), fs.constants.O_WRONLY | fs.constants.O_SYNC);
        const responses = fs.openSync(path.join(syncDir, "invoke_res"), fs.constants.O_RDONLY | fs.constants.O_SYNC);
        syncInvokes = { requests, responses };
    }

    return syncInvokes;
}

/**
 * hasEngine returns true if we are currently connected to an engine.
 */
export function hasEngine(): boolean {
    return !!engine && !!options().engineAddr;
}

/**
 * getEngine returns the current engine, if any, for RPC communications back to the resource engine.
 */
export function getEngine(): engrpc.IEngineClient | undefined {
    const { settings } = getStore();
    if (getLocalStore() === undefined) {
        if (engine === undefined) {
            const addr = options().engineAddr;
            if (addr) {
                // Lazily initialize the RPC connection to the engine.
                engine = new engrpc.EngineClient(addr, grpc.credentials.createInsecure(), grpcChannelOptions);
            }
        }
        return engine;
    } else {
        if (settings.engine === undefined) {
            const addr = options().engineAddr;
            if (addr) {
                // Lazily initialize the RPC connection to the engine.
                settings.engine = new engrpc.EngineClient(addr, grpc.credentials.createInsecure(), grpcChannelOptions);
            }
        }
        return settings.engine;
    }
}

export function terminateRpcs() {
    disconnectSync();
}

/**
 * serialize returns true if resource operations should be serialized.
 */
export function serialize(): boolean {
    return options().parallel === 1;
}

/**
 * options returns the options from the environment, which is the source of truth. Options are global per process.
 * For CLI driven programs, pulumi-language-nodejs sets environment variables prior to the user program loading,
 * meaning that options could be loaded up front and cached.
 * Automation API and multi-language components introduced more complex lifecycles for runtime options().
 * These language hosts manage the lifecycle of options manually throughout the lifetime of the nodejs process.
 * In addition, node module resolution can lead to duplicate copies of @pulumi/pulumi and thus duplicate options
 *  objects that may not be synced if options are cached upfront. Mutating options must write to the environment
 * and reading options must always read directly from the environment.

 */
function options(): Options {
    const { settings } = getStore();

    return settings.options;
}

/**
 * disconnect permanently disconnects from the server, closing the connections.  It waits for the existing RPC
 * queue to drain.  If any RPCs come in afterwards, however, they will crash the process.
 */
export function disconnect(): Promise<void> {
    return waitForRPCs(/*disconnectFromServers*/ true);
}

/** @internal */
export function waitForRPCs(disconnectFromServers = false): Promise<void> {
    const localStore = getStore();
    let done: Promise<any> | undefined;
    const closeCallback: () => Promise<void> = () => {
        if (done !== localStore.settings.rpcDone) {
            // If the done promise has changed, some activity occurred in between callbacks.  Wait again.
            done = localStore.settings.rpcDone;
            return debuggablePromise(done.then(closeCallback), "disconnect");
        }
        if (disconnectFromServers) {
            disconnectSync();
        }
        return Promise.resolve();
    };
    return closeCallback();
}

/**
 * getMaximumListeners returns the configured number of process listeners available
 */
export function getMaximumListeners(): number {
    const { settings } = getStore();
    return settings.options.maximumProcessListeners;
}

/**
 * disconnectSync permanently disconnects from the server, closing the connections. Unlike `disconnect`. it does not
 * wait for the existing RPC queue to drain. Any RPCs that come in after this call will crash the process.
 */
export function disconnectSync(): void {
    // Otherwise, actually perform the close activities (ignoring errors and crashes).
    const store = getStore();
    if (store.callbacks) {
        store.callbacks.shutdown();
        store.callbacks = undefined;
    }

    if (monitor) {
        try {
            monitor.close();
        } catch (err) {
            // ignore.
        }
        monitor = undefined;
    }
    if (engine) {
        try {
            engine.close();
        } catch (err) {
            // ignore.
        }
        engine = undefined;
    }
}

/**
 * rpcKeepAlive registers a pending call to ensure that we don't prematurely disconnect from the server.  It returns
 * a function that, when invoked, signals that the RPC has completed.
 */
export function rpcKeepAlive(): () => void {
    const localStore = getStore();
    let done: (() => void) | undefined = undefined;
    const donePromise = debuggablePromise(
        new Promise<void>((resolve) => {
            done = resolve;
            return done;
        }),
        "rpcKeepAlive",
    );
    localStore.settings.rpcDone = localStore.settings.rpcDone.then(() => donePromise);
    return done!;
}

/**
 * setRootResource registers a resource that will become the default parent for all resources without explicit parents.
 */
export async function setRootResource(res: ComponentResource): Promise<void> {
    // This is the first async point of program startup where we can query the resource monitor for its capabilities.
    await awaitFeatureSupport();

    const engineRef = getEngine();
    if (!engineRef) {
        return Promise.resolve();
    }

    // Back-compat case - Try to set the root URN for SxS old SDKs that expect the engine to roundtrip the
    // stack URN.
    const req = new engproto.SetRootResourceRequest();
    const urn = await res.urn.promise();
    req.setUrn(urn);
    return new Promise<void>((resolve, reject) => {
        engineRef.setRootResource(
            req,
            (err: grpc.ServiceError | null, resp: engproto.SetRootResourceResponse | undefined) => {
                // Back-compat case - if the engine we're speaking to isn't aware that it can save and load root
                // resources, just ignore there's nothing we can do.
                if (err && err.code === grpc.status.UNIMPLEMENTED) {
                    return resolve();
                }

                if (err) {
                    return reject(err);
                }

                return resolve();
            },
        );
    });
}