// Copyright 2016-2020, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as grpc from "@grpc/grpc-js";

import { Provider } from "./provider";

import * as log from "../log";
import { Inputs, Output, output } from "../output";
import * as resource from "../resource";
import * as config from "../runtime/config";
import * as rpc from "../runtime/rpc";
import * as settings from "../runtime/settings";
import * as localState from "../runtime/state";
import { parseArgs } from "./internals";

import * as gstruct from "google-protobuf/google/protobuf/struct_pb";
import * as anyproto from "google-protobuf/google/protobuf/any_pb";
import * as emptyproto from "google-protobuf/google/protobuf/empty_pb";
import * as structproto from "google-protobuf/google/protobuf/struct_pb";
import * as plugproto from "../proto/plugin_pb";
import * as provrpc from "../proto/provider_grpc_pb";
import * as provproto from "../proto/provider_pb";
import * as statusproto from "../proto/status_pb";

class Server implements grpc.UntypedServiceImplementation {
    engineAddr: string | undefined;
    readonly provider: Provider;
    readonly uncaughtErrors: Set<Error>;
    private readonly _callbacks = new Map<Symbol, grpc.sendUnaryData<any>>();

    constructor(engineAddr: string | undefined, provider: Provider, uncaughtErrors: Set<Error>) {
        this.engineAddr = engineAddr;
        this.provider = provider;
        this.uncaughtErrors = uncaughtErrors;

        // When we catch an uncaught error, we need to respond to the inflight call/construct gRPC requests
        // with the error to avoid a hang.
        const uncaughtHandler = (err: Error) => {
            if (!this.uncaughtErrors.has(err)) {
                this.uncaughtErrors.add(err);
            }
            // terminate the outstanding gRPC requests.
            this._callbacks.forEach((callback) => callback(err, undefined));
        };
        process.on("uncaughtException", uncaughtHandler);
        // @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so
        // just suppress the TS strictness here.
        process.on("unhandledRejection", uncaughtHandler);
    }

    // Satisfy the grpc.UntypedServiceImplementation interface.
    [name: string]: any;

    // Misc. methods

    public cancel(call: any, callback: any): void {
        callback(undefined, new emptyproto.Empty());
    }

    public attach(call: any, callback: any): void {
        const req = call.request;
        const host = req.getAddress();
        this.engineAddr = host;
        callback(undefined, new emptyproto.Empty());
    }

    public getPluginInfo(call: any, callback: any): void {
        const resp: any = new plugproto.PluginInfo();
        resp.setVersion(this.provider.version);
        callback(undefined, resp);
    }

    public getSchema(call: any, callback: any): void {
        const req: any = call.request;
        if (req.getVersion() !== 0) {
            callback(new Error(`unsupported schema version ${req.getVersion()}`), undefined);
        }
        const resp: any = new provproto.GetSchemaResponse();
        resp.setSchema(this.provider.schema || "{}");
        callback(undefined, resp);
    }

    // Config methods

    public checkConfig(call: any, callback: any): void {
        callback(
            {
                code: grpc.status.UNIMPLEMENTED,
                details: "Not yet implemented: CheckConfig",
            },
            undefined,
        );
    }

    public diffConfig(call: any, callback: any): void {
        callback(
            {
                code: grpc.status.UNIMPLEMENTED,
                details: "Not yet implemented: DiffConfig",
            },
            undefined,
        );
    }

    public configure(call: any, callback: any): void {
        const resp = new provproto.ConfigureResponse();
        resp.setAcceptsecrets(true);
        resp.setAcceptresources(true);
        resp.setAcceptoutputs(true);
        callback(undefined, resp);
    }

    // CRUD resource methods

    public async check(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            const resp = new provproto.CheckResponse();

            const olds = req.getOlds().toJavaScript();
            const news = req.getNews().toJavaScript();

            let inputs: any = news;
            let failures: any[] = [];
            if (this.provider.check) {
                const result = await this.provider.check(req.getUrn(), olds, news);
                if (result.inputs) {
                    inputs = result.inputs;
                }
                if (result.failures) {
                    failures = result.failures;
                }
            } else {
                // If no check method was provided, propagate the new inputs as-is.
                inputs = news;
            }

            resp.setInputs(structproto.Struct.fromJavaScript(inputs));

            if (failures.length !== 0) {
                const failureList = [];
                for (const f of failures) {
                    const failure = new provproto.CheckFailure();
                    failure.setProperty(f.property);
                    failure.setReason(f.reason);
                    failureList.push(failure);
                }
                resp.setFailuresList(failureList);
            }

            callback(undefined, resp);
        } catch (e) {
            console.error(`${e}: ${e.stack}`);
            callback(e, undefined);
        }
    }

    public async diff(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            const resp = new provproto.DiffResponse();

            const olds = req.getOlds().toJavaScript();
            const news = req.getNews().toJavaScript();
            if (this.provider.diff) {
                const result: any = await this.provider.diff(req.getId(), req.getUrn(), olds, news);

                if (result.changes === true) {
                    resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_SOME);
                } else if (result.changes === false) {
                    resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_NONE);
                } else {
                    resp.setChanges(provproto.DiffResponse.DiffChanges.DIFF_UNKNOWN);
                }

                if (result.replaces && result.replaces.length !== 0) {
                    resp.setReplacesList(result.replaces);
                }
                if (result.deleteBeforeReplace) {
                    resp.setDeletebeforereplace(result.deleteBeforeReplace);
                }
            }

            callback(undefined, resp);
        } catch (e) {
            console.error(`${e}: ${e.stack}`);
            callback(e, undefined);
        }
    }

    public async create(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            if (!this.provider.create) {
                callback(new Error(`unknown resource type ${req.getUrn()}`), undefined);
                return;
            }

            const resp = new provproto.CreateResponse();
            const props = req.getProperties().toJavaScript();
            const result = await this.provider.create(req.getUrn(), props);
            resp.setId(result.id);
            resp.setProperties(structproto.Struct.fromJavaScript(result.outs));

            callback(undefined, resp);
        } catch (e) {
            const response = grpcResponseFromError(e);
            return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
        }
    }

    public async read(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            const resp = new provproto.ReadResponse();

            const id = req.getId();
            const props = req.getProperties().toJavaScript();
            if (this.provider.read) {
                const result: any = await this.provider.read(id, req.getUrn(), props);
                resp.setId(result.id);
                resp.setProperties(structproto.Struct.fromJavaScript(result.props));
                resp.setInputs(
                    result.inputs === undefined ? undefined : structproto.Struct.fromJavaScript(result.inputs),
                );
            } else {
                // In the event of a missing read, simply return back the input state.
                resp.setId(id);
                resp.setProperties(req.getProperties());
            }

            callback(undefined, resp);
        } catch (e) {
            console.error(`${e}: ${e.stack}`);
            callback(e, undefined);
        }
    }

    public async update(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            const resp = new provproto.UpdateResponse();

            const olds = req.getOlds().toJavaScript();
            const news = req.getNews().toJavaScript();

            let result: any = {};
            if (this.provider.update) {
                result = (await this.provider.update(req.getId(), req.getUrn(), olds, news)) || {};
            }

            resp.setProperties(structproto.Struct.fromJavaScript(result.outs));

            callback(undefined, resp);
        } catch (e) {
            const response = grpcResponseFromError(e);
            return callback(/*err:*/ response, /*value:*/ null, /*metadata:*/ response.metadata);
        }
    }

    public async delete(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            const props: any = req.getProperties().toJavaScript();
            if (this.provider.delete) {
                await this.provider.delete(req.getId(), req.getUrn(), props);
            }
            callback(undefined, new emptyproto.Empty());
        } catch (e) {
            console.error(`${e}: ${e.stack}`);
            callback(e, undefined);
        }
    }

    public async construct(call: any, callback: any): Promise<void> {
        // Setup a new async state store for this run
        const store = new localState.LocalStore();
        return localState.asyncLocalStorage.run(store, async () => {
            const callbackId = Symbol("id");
            this._callbacks.set(callbackId, callback);
            try {
                const req: any = call.request;
                const type = req.getType();
                const name = req.getName();

                if (!this.provider.construct) {
                    callback(new Error(`unknown resource type ${type}`), undefined);
                    return;
                }

                await configureRuntime(req, this.engineAddr);

                const inputs = await deserializeInputs(req.getInputs(), req.getInputdependenciesMap());

                // Rebuild the resource options.
                const dependsOn: resource.Resource[] = [];
                for (const urn of req.getDependenciesList()) {
                    dependsOn.push(new resource.DependencyResource(urn));
                }
                const providers: Record<string, resource.ProviderResource> = {};
                const rpcProviders = req.getProvidersMap();
                if (rpcProviders) {
                    for (const [pkg, ref] of rpcProviders.entries()) {
                        providers[pkg] = createProviderResource(ref);
                    }
                }
                const opts: resource.ComponentResourceOptions = {
                    aliases: req.getAliasesList(),
                    dependsOn: dependsOn,
                    protect: req.getProtect(),
                    providers: providers,
                    parent: req.getParent() ? new resource.DependencyResource(req.getParent()) : undefined,
                };

                const result = await this.provider.construct(name, type, inputs, opts);

                const resp = new provproto.ConstructResponse();

                resp.setUrn(await output(result.urn).promise());

                const [state, stateDependencies] = await rpc.serializeResourceProperties(
                    `construct(${type}, ${name})`,
                    result.state,
                );
                const stateDependenciesMap = resp.getStatedependenciesMap();
                for (const [key, resources] of stateDependencies) {
                    const deps = new provproto.ConstructResponse.PropertyDependencies();
                    deps.setUrnsList(await Promise.all(Array.from(resources).map((r) => r.urn.promise())));
                    stateDependenciesMap.set(key, deps);
                }
                resp.setState(structproto.Struct.fromJavaScript(state));

                // Wait for RPC operations to complete.
                await settings.waitForRPCs();

                callback(undefined, resp);
            } catch (e) {
                console.error(`${e}: ${e.stack}`);
                callback(e, undefined);
            } finally {
                // remove the gRPC callback context from the map of in-flight callbacks
                this._callbacks.delete(callbackId);
            }
        });
    }

    public async call(call: any, callback: any): Promise<void> {
        // Setup a new async state store for this run
        const store = new localState.LocalStore();
        return localState.asyncLocalStorage.run(store, async () => {
            const callbackId = Symbol("id");
            this._callbacks.set(callbackId, callback);
            try {
                const req: any = call.request;
                if (!this.provider.call) {
                    callback(new Error(`unknown function ${req.getTok()}`), undefined);
                    return;
                }

                await configureRuntime(req, this.engineAddr);

                const args = await deserializeInputs(req.getArgs(), req.getArgdependenciesMap());

                const result = await this.provider.call(req.getTok(), args);

                const resp = new provproto.CallResponse();

                if (result.outputs) {
                    const [ret, retDependencies] = await rpc.serializeResourceProperties(
                        `call(${req.getTok()})`,
                        result.outputs,
                    );
                    const returnDependenciesMap = resp.getReturndependenciesMap();
                    for (const [key, resources] of retDependencies) {
                        const deps = new provproto.CallResponse.ReturnDependencies();
                        deps.setUrnsList(await Promise.all(Array.from(resources).map((r) => r.urn.promise())));
                        returnDependenciesMap.set(key, deps);
                    }
                    resp.setReturn(structproto.Struct.fromJavaScript(ret));
                }

                if ((result.failures || []).length !== 0) {
                    const failureList = [];
                    for (const f of result.failures!) {
                        const failure = new provproto.CheckFailure();
                        failure.setProperty(f.property);
                        failure.setReason(f.reason);
                        failureList.push(failure);
                    }
                    resp.setFailuresList(failureList);
                }

                // Wait for RPC operations to complete.
                await settings.waitForRPCs();

                callback(undefined, resp);
            } catch (e) {
                console.error(`${e}: ${e.stack}`);
                callback(e, undefined);
            } finally {
                // remove the gRPC callback context from the map of in-flight callbacks
                this._callbacks.delete(callbackId);
            }
        });
    }

    public async invoke(call: any, callback: any): Promise<void> {
        try {
            const req: any = call.request;
            if (!this.provider.invoke) {
                callback(new Error(`unknown function ${req.getTok()}`), undefined);
                return;
            }

            const args: any = req.getArgs().toJavaScript();
            const result = await this.provider.invoke(req.getTok(), args);

            const resp = new provproto.InvokeResponse();
            resp.setReturn(structproto.Struct.fromJavaScript(result.outputs));

            if ((result.failures || []).length !== 0) {
                const failureList = [];
                for (const f of result.failures!) {
                    const failure = new provproto.CheckFailure();
                    failure.setProperty(f.property);
                    failure.setReason(f.reason);
                    failureList.push(failure);
                }
                resp.setFailuresList(failureList);
            }

            callback(undefined, resp);
        } catch (e) {
            console.error(`${e}: ${e.stack}`);
            callback(e, undefined);
        }
    }

    public async streamInvoke(call: any, callback: any): Promise<void> {
        callback(
            {
                code: grpc.status.UNIMPLEMENTED,
                details: "Not yet implemented: StreamInvoke",
            },
            undefined,
        );
    }
}

async function configureRuntime(req: any, engineAddr: string | undefined) {
    // NOTE: these are globals! We should ensure that all settings are identical between calls, and eventually
    // refactor so we can avoid the global state.
    if (engineAddr === undefined) {
        throw new Error("fatal: Missing <engine> address");
    }

    settings.resetOptions(
        req.getProject(),
        req.getStack(),
        req.getParallel(),
        engineAddr,
        req.getMonitorendpoint(),
        req.getDryrun(),
        req.getOrganization(),
    );

    // resetOptions doesn't reset the saved features
    await settings.awaitFeatureSupport();

    const pulumiConfig: { [key: string]: string } = {};
    const rpcConfig = req.getConfigMap();
    if (rpcConfig) {
        for (const [k, v] of rpcConfig.entries()) {
            pulumiConfig[k] = v;
        }
    }
    config.setAllConfig(pulumiConfig, req.getConfigsecretkeysList());
}

/**
 * Deserializes the inputs struct and applies appropriate dependencies.
 *
 * @internal
 */
export async function deserializeInputs(inputsStruct: gstruct.Struct, inputDependencies: any): Promise<Inputs> {
    const result: Inputs = {};

    const deserializedInputs = rpc.deserializeProperties(inputsStruct);
    for (const k of Object.keys(deserializedInputs)) {
        const input = deserializedInputs[k];
        const isSecret = rpc.isRpcSecret(input);
        const depsUrns: resource.URN[] = inputDependencies.get(k)?.getUrnsList() ?? [];

        if (
            !isSecret &&
            (depsUrns.length === 0 || containsOutputs(input) || (await isResourceReference(input, depsUrns)))
        ) {
            // If the input isn't a secret and either doesn't have any dependencies, already contains Outputs (from
            // deserialized output values), or is a resource reference, then we can return it directly without
            // wrapping it as an output.
            result[k] = input;
        } else {
            // Otherwise, wrap it in an output so we can handle secrets and/or track dependencies.
            // Note: If the value is or contains an unknown value, the Output will mark its value as
            // unknown automatically, so we just pass true for isKnown here.
            const deps = depsUrns.map((depUrn) => new resource.DependencyResource(depUrn));
            result[k] = new Output(
                deps,
                Promise.resolve(rpc.unwrapRpcSecret(input)),
                Promise.resolve(true),
                Promise.resolve(isSecret),
                Promise.resolve([]),
            );
        }
    }

    return result;
}

/**
 * Returns true if the input is a resource reference.
 */
async function isResourceReference(input: any, deps: string[]): Promise<boolean> {
    return resource.Resource.isInstance(input) && deps.length === 1 && deps[0] === (await input.urn.promise());
}

/**
 * Returns true if the deserialized input contains outputs (deeply), excluding
 * properties of resources.
 *
 * @internal
 */
export function containsOutputs(input: any): boolean {
    if (Array.isArray(input)) {
        for (const e of input) {
            if (containsOutputs(e)) {
                return true;
            }
        }
    } else if (typeof input === "object") {
        if (Output.isInstance(input)) {
            return true;
        } else if (resource.Resource.isInstance(input)) {
            // Do not drill into instances of Resource because they will have properties that are
            // instances of Output (e.g. urn, id, etc.) and we're only looking for instances of
            // Output that aren't associated with a Resource.
            return false;
        }

        for (const k of Object.keys(input)) {
            if (containsOutputs(input[k])) {
                return true;
            }
        }
    }
    return false;
}

/**
 * Creates a gRPC response representing an error from a dynamic provider's
 * resource. This is typically either a creation error, in which the API server
 * has (virtually) rejected the resource, or an initialization error, where the
 * API server has accepted the resource, but it failed to initialize (e.g., the
 * app code is continually crashing and the resource has failed to become
 * alive).
 */
function grpcResponseFromError(e: { id: string; properties: any; message: string; reasons?: string[] }) {
    // Create response object.
    const resp = new statusproto.Status();
    resp.setCode(grpc.status.UNKNOWN);
    resp.setMessage(e.message);

    const metadata = new grpc.Metadata();
    if (e.id) {
        // Object created successfully, but failed to initialize. Pack initialization failure into
        // details.
        const detail = new provproto.ErrorResourceInitFailed();
        detail.setId(e.id);
        detail.setProperties(structproto.Struct.fromJavaScript(e.properties || {}));
        detail.setReasonsList(e.reasons || []);

        const details = new anyproto.Any();
        details.pack(detail.serializeBinary(), "pulumirpc.ErrorResourceInitFailed");

        // Add details to metadata.
        resp.addDetails(details);
        // NOTE: `grpc-status-details-bin` is a magic field that allows us to send structured
        // protobuf data as an error back through gRPC. This notion of details is a first-class in
        // the Go gRPC implementation, and the nodejs implementation has not quite caught up to it,
        // which is why it's cumbersome here.
        metadata.add("grpc-status-details-bin", Buffer.from(resp.serializeBinary()));
    }

    return {
        code: grpc.status.UNKNOWN,
        message: e.message,
        metadata: metadata,
    };
}

export async function main(provider: Provider, args: string[]) {
    // We track all uncaught errors here.  If we have any, we will make sure we always have a non-0 exit
    // code.
    const uncaughtErrors = new Set<Error>();
    const uncaughtHandler = (err: Error) => {
        if (!uncaughtErrors.has(err)) {
            uncaughtErrors.add(err);
            // Use `pulumi.log.error` here to tell the engine there was a fatal error, which should
            // stop processing subsequent resource operations.
            log.error(err.stack || err.message || "" + err);
        }
    };

    process.on("uncaughtException", uncaughtHandler);
    // @ts-ignore 'unhandledRejection' will almost always invoke uncaughtHandler with an Error. so just
    // suppress the TS strictness here.
    process.on("unhandledRejection", uncaughtHandler);
    process.on("exit", (code: number) => {
        // If there were any uncaught errors at all, we always want to exit with an error code.
        if (code === 0 && uncaughtErrors.size > 0) {
            process.exitCode = 1;
        }
    });

    const parsedArgs = parseArgs(args);

    // Finally connect up the gRPC client/server and listen for incoming requests.
    const server = new grpc.Server({
        "grpc.max_receive_message_length": settings.maxRPCMessageSize,
    });

    // The program receives a single optional argument: the address of the RPC endpoint for the engine.  It
    // optionally also takes a second argument, a reference back to the engine, but this may be missing.

    const engineAddr = parsedArgs?.engineAddress;
    server.addService(provrpc.ResourceProviderService, new Server(engineAddr, provider, uncaughtErrors));
    const port: number = await new Promise<number>((resolve, reject) => {
        server.bindAsync(`127.0.0.1:0`, grpc.ServerCredentials.createInsecure(), (err, p) => {
            if (err) {
                reject(err);
            } else {
                resolve(p);
            }
        });
    });

    // Emit the address so the monitor can read it to connect.  The gRPC server will keep the message loop alive.
    // We explicitly convert the number to a string so that Node doesn't colorize the output.
    console.log(port.toString());
}

/**
 * Rehydrate the provider reference into a registered ProviderResource,
 * otherwise return an instance of DependencyProviderResource.
 */
function createProviderResource(ref: string): resource.ProviderResource {
    const [urn] = resource.parseResourceReference(ref);
    const urnParts = urn.split("::");
    const qualifiedType = urnParts[2];
    const urnName = urnParts[3];

    const type = qualifiedType.split("$").pop()!;
    const typeParts = type.split(":");
    const typName = typeParts.length > 2 ? typeParts[2] : "";

    const resourcePackage = rpc.getResourcePackage(typName, /*version:*/ "");
    if (resourcePackage) {
        return resourcePackage.constructProvider(urnName, type, urn);
    }
    return new resource.DependencyProviderResource(ref);
}