// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as grpc from "@grpc/grpc-js";

import { AsyncIterable } from "@pulumi/query/interfaces";

import { InvokeOptions } from "../invoke";
import * as log from "../log";
import { Inputs, Output } from "../output";
import { debuggablePromise } from "./debuggable";
import {
    deserializeProperties,
    isRpcSecret,
    serializeProperties,
    serializePropertiesReturnDeps,
    unwrapRpcSecret,
    unwrapSecretValues,
    containsUnknownValues,
} from "./rpc";
import { awaitStackRegistrations, excessiveDebugOutput, getMonitor, rpcKeepAlive, terminateRpcs } from "./settings";

import { DependencyResource, ProviderResource, Resource } from "../resource";
import * as utils from "../utils";
import { PushableAsyncIterable } from "./asyncIterableUtil";

import * as gstruct from "google-protobuf/google/protobuf/struct_pb";
import * as resourceproto from "../proto/resource_pb";
import * as providerproto from "../proto/provider_pb";

/**
 * Dynamically invokes the function `tok`, which is offered by a provider
 * plugin. `invoke` behaves differently in the case that options contains
 * `{async:true}` or not.
 *
 * In the case where `{async:true}` is present in the options bag:
 *
 * 1. the result of `invoke` will be a Promise resolved to the result value of
 *    the provider plugin.
 *
 * 2. the `props` inputs can be a bag of computed values (including, `T`s,
 *   `Promise<T>`s, `Output<T>`s etc.).
 *
 * In the case where `{ async:true }` is not present in the options bag:
 *
 * 1. the result of `invoke` will be a Promise resolved to the result value of
 *    the provider call. However, that Promise will *also* have the respective
 *    values of the Provider result exposed directly on it as properties.
 *
 * 2. The inputs must be a bag of simple values, and the result is the result
 *    that the Provider produced.
 *
 * Simple values are:
 *
 *  1. `undefined`, `null`, string, number or boolean values.
 *  2. arrays of simple values.
 *  3. objects containing only simple values.
 *
 * Importantly, simple values do *not* include:
 *
 *  1. `Promise`s
 *  2. `Output`s
 *  3. `Asset`s or `Archive`s
 *  4. `Resource`s.
 *
 * All of these contain async values that would prevent `invoke from being able
 * to operate synchronously.
 */
export function invoke(
    tok: string,
    props: Inputs,
    opts: InvokeOptions = {},
    packageRef?: Promise<string | undefined>,
): Promise<any> {
    return invokeAsync(tok, props, opts, packageRef).then((response) => {
        // ignore secrets for plain invoke
        const { result } = response;
        return result;
    });
}

/**
 * Similar to the plain `invoke` but returns the response as an output, maintaining
 * secrets of the response, if any.
 */
export function invokeOutput<T>(
    tok: string,
    props: Inputs,
    opts: InvokeOptions = {},
    packageRef?: Promise<string | undefined>,
): Output<T> {
    const [output, resolve] = createOutput<T>(`invoke(${tok})`);
    invokeAsync(tok, props, opts, packageRef)
        .then((response) => {
            const { result, isKnown, containsSecrets, dependencies } = response;
            resolve(<T>result, isKnown, containsSecrets, dependencies, undefined);
        })
        .catch((err) => {
            resolve(<any>undefined, true, false, [], err);
        });

    return output;
}

function extractSingleValue(result: Inputs | undefined): any {
    if (result === undefined) {
        return result;
    }
    // assume outputs has at least one key
    const keys = Object.keys(result);
    // return the first key's value from the outputs
    return result[keys[0]];
}

/*
 * Dynamically invokes the function `tok`, which is offered by a
 * provider plugin. Similar to `invoke`, but returns a single value instead of
 * an object with a single key.
 */
export function invokeSingle(
    tok: string,
    props: Inputs,
    opts: InvokeOptions = {},
    packageRef?: Promise<string | undefined>,
): Promise<any> {
    return invokeAsync(tok, props, opts, packageRef).then((response) => {
        // ignore secrets for plain invoke
        const { result } = response;
        return extractSingleValue(result);
    });
}

/**
 * Similar to the plain `invokeSingle` but returns the response as an output, maintaining
 * secrets of the response, if any.
 */
export function invokeSingleOutput<T>(
    tok: string,
    props: Inputs,
    opts: InvokeOptions = {},
    packageRef?: Promise<string | undefined>,
): Output<T> {
    const [output, resolve] = createOutput<T>(`invokeSingleOutput(${tok})`);
    invokeAsync(tok, props, opts, packageRef)
        .then((response) => {
            const { result, isKnown, containsSecrets, dependencies } = response;
            const value = extractSingleValue(result);
            resolve(<T>value, isKnown, containsSecrets, dependencies, undefined);
        })
        .catch((err) => {
            resolve(<any>undefined, true, false, [], err);
        });

    return output;
}

export async function streamInvoke(
    tok: string,
    props: Inputs,
    opts: InvokeOptions = {},
): Promise<StreamInvokeResponse<any>> {
    const label = `StreamInvoking function: tok=${tok} asynchronously`;
    log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));

    // Wait for all values to be available, and then perform the RPC.
    const done = rpcKeepAlive();
    try {
        const serialized = await serializeProperties(`streamInvoke:${tok}`, props);
        log.debug(
            `StreamInvoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
        );

        // Fetch the monitor and make an RPC request.
        const monitor: any = getMonitor();

        const provider = await ProviderResource.register(getProvider(tok, opts));
        const req = await createInvokeRequest(tok, serialized, provider, opts);

        // Call `streamInvoke`.
        const result = monitor.streamInvoke(req, {});

        const queue = new PushableAsyncIterable();
        result.on("data", function (thing: any) {
            const live = deserializeResponse(tok, thing);
            queue.push(live);
        });
        result.on("error", (err: any) => {
            if (err.code === 1) {
                return;
            }
            throw err;
        });
        result.on("end", () => {
            queue.complete();
        });

        // Return a cancellable handle to the stream.
        return new StreamInvokeResponse(queue, () => result.cancel());
    } finally {
        done();
    }
}

async function invokeAsync(
    tok: string,
    props: Inputs,
    opts: InvokeOptions,
    packageRef?: Promise<string | undefined>,
): Promise<{
    result: Inputs | undefined;
    isKnown: boolean;
    containsSecrets: boolean;
    dependencies: Resource[];
}> {
    const label = `Invoking function: tok=${tok} asynchronously`;
    log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));

    await awaitStackRegistrations();

    // Wait for all values to be available, and then perform the RPC.
    const done = rpcKeepAlive();
    try {
        const [serialized, deps] = await serializePropertiesReturnDeps(`invoke:${tok}`, props);
        if (containsUnknownValues(serialized)) {
            // if any of the input properties are unknown,
            // make sure the entire response is marked as unknown
            return {
                result: {},
                isKnown: false,
                containsSecrets: false,
                dependencies: [],
            };
        }

        log.debug(
            `Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
        );

        // Fetch the monitor and make an RPC request.
        const monitor: any = getMonitor();

        const provider = await ProviderResource.register(getProvider(tok, opts));
        // keep track of the the secretness of the inputs
        // if any of the inputs are secret, the invoke response should be marked as secret
        const [plainInputs, inputsContainSecrets] = unwrapSecretValues(serialized);
        const req = await createInvokeRequest(tok, plainInputs, provider, opts, packageRef);

        const resp: any = await debuggablePromise(
            new Promise((innerResolve, innerReject) =>
                monitor.invoke(req, (err: grpc.ServiceError, innerResponse: any) => {
                    log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
                    if (err) {
                        // If the monitor is unavailable, it is in the process of shutting down or has already
                        // shut down. Don't emit an error and don't do any more RPCs, just exit.
                        if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
                            terminateRpcs();
                            err.message = "Resource monitor is terminating";
                            innerReject(err);
                            return;
                        }

                        // If the RPC failed, rethrow the error with a native exception and the message that
                        // the engine provided - it's suitable for user presentation.
                        innerReject(new Error(err.details));
                    } else {
                        innerResolve(innerResponse);
                    }
                }),
            ),
            label,
        );

        const flatDependencies: Resource[] = [];
        for (const dep of deps.values()) {
            for (const d of dep) {
                flatDependencies.push(d);
            }
        }

        // Finally propagate any other properties that were given to us as outputs.
        const deserialized = deserializeResponse(tok, resp);
        return {
            result: deserialized.result,
            containsSecrets: deserialized.containsSecrets || inputsContainSecrets,
            dependencies: flatDependencies,
            isKnown: true,
        };
    } finally {
        done();
    }
}

/**
 * {@link StreamInvokeResponse} represents a (potentially infinite) streaming
 * response to `streamInvoke`, with facilities to gracefully cancel and clean up
 * the stream.
 */
export class StreamInvokeResponse<T> implements AsyncIterable<T> {
    constructor(
        private source: AsyncIterable<T>,
        private cancelSource: () => void,
    ) {}

    // cancel signals the `streamInvoke` should be cancelled and cleaned up gracefully.
    public cancel() {
        this.cancelSource();
    }

    [Symbol.asyncIterator]() {
        return this.source[Symbol.asyncIterator]();
    }
}

async function createInvokeRequest(
    tok: string,
    serialized: any,
    provider: string | undefined,
    opts: InvokeOptions,
    packageRef?: Promise<string | undefined>,
) {
    if (provider !== undefined && typeof provider !== "string") {
        throw new Error("Incorrect provider type.");
    }

    const obj = gstruct.Struct.fromJavaScript(serialized);
    let packageRefStr = undefined;
    if (packageRef !== undefined) {
        packageRefStr = await packageRef;
        if (packageRefStr !== undefined) {
            // If we have a package reference we can clear some of the resource options
            opts.version = undefined;
            opts.pluginDownloadURL = undefined;
        }
    }
    const req = new resourceproto.ResourceInvokeRequest();
    req.setTok(tok);
    req.setArgs(obj);
    req.setProvider(provider || "");
    req.setVersion(opts.version || "");
    req.setPlugindownloadurl(opts.pluginDownloadURL || "");
    req.setAcceptresources(!utils.disableResourceReferences);
    req.setPackageref(packageRefStr || "");
    return req;
}

function getProvider(tok: string, opts: InvokeOptions) {
    return opts.provider ? opts.provider : opts.parent ? opts.parent.getProvider(tok) : undefined;
}

function deserializeResponse(
    tok: string,
    resp: { getFailuresList(): Array<providerproto.CheckFailure>; getReturn(): gstruct.Struct | undefined },
): {
    result: Inputs | undefined;
    containsSecrets: boolean;
} {
    const failures = resp.getFailuresList();
    if (failures?.length) {
        let reasons = "";
        for (let i = 0; i < failures.length; i++) {
            if (reasons !== "") {
                reasons += "; ";
            }

            reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
        }

        throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
    }

    let containsSecrets = false;
    const result = resp.getReturn();
    if (result === undefined) {
        return {
            result,
            containsSecrets,
        };
    }

    const properties = deserializeProperties(result);
    // Keep track of whether we need to mark the resulting output a secret.
    // and unwrap each individual value if it is a secret.
    for (const key of Object.keys(properties)) {
        containsSecrets = containsSecrets || isRpcSecret(properties[key]);
        properties[key] = unwrapRpcSecret(properties[key]);
    }

    return {
        result: properties,
        containsSecrets,
    };
}

/**
 * Dynamically calls the function `tok`, which is offered by a provider plugin.
 */
export function call<T>(
    tok: string,
    props: Inputs,
    res?: Resource,
    packageRef?: Promise<string | undefined>,
): Output<T> {
    const label = `Calling function: tok=${tok}`;
    log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));

    const [out, resolver] = createOutput<T>(`call(${tok})`);

    debuggablePromise(
        Promise.resolve().then(async () => {
            const done = rpcKeepAlive();
            try {
                // Construct a provider reference from the given provider, if one is available on the resource.
                let provider: string | undefined = undefined;
                let version: string | undefined = undefined;
                let pluginDownloadURL: string | undefined = undefined;
                if (res) {
                    if (res.__prov) {
                        provider = await ProviderResource.register(res.__prov);
                    }
                    version = res.__version;
                    pluginDownloadURL = res.__pluginDownloadURL;
                }

                // We keep output values when serializing inputs for call.
                const [serialized, propertyDepsResources] = await serializePropertiesReturnDeps(`call:${tok}`, props, {
                    keepOutputValues: true,
                });
                log.debug(
                    `Call RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
                );

                const req = await createCallRequest(
                    tok,
                    serialized,
                    propertyDepsResources,
                    provider,
                    version,
                    pluginDownloadURL,
                    packageRef,
                );

                const monitor = getMonitor();
                const resp = await debuggablePromise(
                    new Promise<providerproto.CallResponse>((innerResolve, innerReject) => {
                        if (monitor === undefined) {
                            throw new Error("No monitor available");
                        }

                        monitor.call(
                            req,
                            (err: grpc.ServiceError | null, innerResponse: providerproto.CallResponse) => {
                                log.debug(`Call RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
                                if (err) {
                                    // If the monitor is unavailable, it is in the process of shutting down or has already
                                    // shut down. Don't emit an error and don't do any more RPCs, just exit.
                                    if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
                                        terminateRpcs();
                                        err.message = "Resource monitor is terminating";
                                        innerReject(err);
                                        return;
                                    }

                                    // If the RPC failed, rethrow the error with a native exception and the message that
                                    // the engine provided - it's suitable for user presentation.
                                    innerReject(new Error(err.details));
                                } else {
                                    innerResolve(innerResponse);
                                }
                            },
                        );
                    }),
                    label,
                );

                // Deserialize the response and resolve the output.
                const { result, containsSecrets } = deserializeResponse(tok, resp);
                const deps: Resource[] = [];

                // Combine the individual dependencies into a single set of dependency resources.
                const rpcDeps = resp.getReturndependenciesMap();
                if (rpcDeps) {
                    const urns = new Set<string>();
                    for (const [, returnDeps] of rpcDeps.entries()) {
                        for (const urn of returnDeps.getUrnsList()) {
                            urns.add(urn);
                        }
                    }
                    for (const urn of urns) {
                        deps.push(new DependencyResource(urn));
                    }
                }

                // If the value the engine handed back is or contains an unknown value, the resolver will mark its value as
                // unknown automatically, so we just pass true for isKnown here. Note that unknown values will only be
                // present during previews (i.e. isDryRun() will be true).
                resolver(<any>result, true, containsSecrets, deps);
            } catch (e) {
                resolver(<any>undefined, true, false, undefined, e);
            } finally {
                done();
            }
        }),
        label,
    );

    return out;
}

function createOutput<T>(
    label: string,
): [Output<T>, (v: T, isKnown: boolean, isSecret: boolean, deps?: Resource[], err?: Error | undefined) => void] {
    let resolveValue: (v: T) => void;
    let rejectValue: (err: Error) => void;
    let resolveIsKnown: (v: boolean) => void;
    let rejectIsKnown: (err: Error) => void;
    let resolveIsSecret: (v: boolean) => void;
    let rejectIsSecret: (err: Error) => void;
    let resolveDeps: (v: Resource[]) => void;
    let rejectDeps: (err: Error) => void;

    const resolver = (v: T, isKnown: boolean, isSecret: boolean, deps: Resource[] = [], err?: Error) => {
        if (err) {
            rejectValue(err);
            rejectIsKnown(err);
            rejectIsSecret(err);
            rejectDeps(err);
        } else {
            resolveValue(v);
            resolveIsKnown(isKnown);
            resolveIsSecret(isSecret);
            resolveDeps(deps);
        }
    };

    const out = new Output(
        [],
        debuggablePromise(
            new Promise<T>((resolve, reject) => {
                resolveValue = resolve;
                rejectValue = reject;
            }),
            `${label}Value`,
        ),
        debuggablePromise(
            new Promise<boolean>((resolve, reject) => {
                resolveIsKnown = resolve;
                rejectIsKnown = reject;
            }),
            `${label}IsKnown`,
        ),
        debuggablePromise(
            new Promise<boolean>((resolve, reject) => {
                resolveIsSecret = resolve;
                rejectIsSecret = reject;
            }),
            `${label}IsSecret`,
        ),
        debuggablePromise(
            new Promise<Resource[]>((resolve, reject) => {
                resolveDeps = resolve;
                rejectDeps = reject;
            }),
            `${label}Deps`,
        ),
    );

    return [out, resolver];
}

async function createCallRequest(
    tok: string,
    serialized: Record<string, any>,
    serializedDeps: Map<string, Set<Resource>>,
    provider?: string,
    version?: string,
    pluginDownloadURL?: string,
    packageRef?: Promise<string | undefined>,
) {
    if (provider !== undefined && typeof provider !== "string") {
        throw new Error("Incorrect provider type.");
    }

    const obj = gstruct.Struct.fromJavaScript(serialized);
    let packageRefStr = undefined;
    if (packageRef !== undefined) {
        packageRefStr = await packageRef;
        if (packageRefStr !== undefined) {
            // If we have a package reference we can clear some of the resource options
            version = undefined;
            pluginDownloadURL = undefined;
        }
    }

    const req = new resourceproto.ResourceCallRequest();
    req.setTok(tok);
    req.setArgs(obj);
    req.setProvider(provider || "");
    req.setVersion(version || "");
    req.setPlugindownloadurl(pluginDownloadURL || "");
    req.setPackageref(packageRefStr || "");

    const argDependencies = req.getArgdependenciesMap();
    for (const [key, propertyDeps] of serializedDeps) {
        const urns = new Set<string>();
        for (const dep of propertyDeps) {
            const urn = await dep.urn.promise();
            urns.add(urn);
        }
        const deps = new resourceproto.ResourceCallRequest.ArgumentDependencies();
        deps.setUrnsList(Array.from(urns));
        argDependencies.set(key, deps);
    }

    return req;
}