# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple

import pulumi.output
import pytest
from google.protobuf import struct_pb2
from pulumi.provider.server import ProviderServicer
from pulumi.resource import CustomResource, ResourceOptions
from pulumi.runtime import Mocks, ResourceModule, proto, rpc, rpc_manager
from pulumi.runtime.proto.provider_pb2 import ConstructRequest
from pulumi.runtime.settings import Settings, configure
from semver import VersionInfo as Version


def pulumi_test(coro):
    wrapped = pulumi.runtime.test(coro)

    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        configure(Settings("project", "stack"))
        rpc._RESOURCE_PACKAGES.clear()
        rpc._RESOURCE_MODULES.clear()

        wrapped(*args, **kwargs)

    return wrapper


@pytest.mark.asyncio
async def test_construct_inputs_parses_request():
    value = "foobar"
    inputs = _as_struct({"echo": value})
    req = ConstructRequest(inputs=inputs)
    inputs = await ProviderServicer._construct_inputs(
        req.inputs, req.inputDependencies
    )  # pylint: disable=no-member
    assert len(inputs) == 1
    assert inputs["echo"] == value


@pytest.mark.asyncio
async def test_construct_inputs_preserves_unknowns():
    unknown = "04da6b54-80e4-46f7-96ec-b56ff0331ba9"
    inputs = _as_struct({"echo": unknown})
    req = ConstructRequest(inputs=inputs)
    inputs = await ProviderServicer._construct_inputs(
        req.inputs, req.inputDependencies
    )  # pylint: disable=no-member
    assert len(inputs) == 1
    assert isinstance(inputs["echo"], pulumi.output.Unknown)


def _as_struct(key_values: Dict[str, Any]) -> struct_pb2.Struct:
    the_struct = struct_pb2.Struct()
    the_struct.update(key_values)  # pylint: disable=no-member
    return the_struct


class MockResource(CustomResource):
    def __init__(self, name: str, opts: Optional[ResourceOptions] = None):
        CustomResource.__init__(self, "test:index:MockResource", name, opts=opts)


class MockInputDependencies:
    """A mock for ConstructRequest.inputDependencies

    We need only support a `get() -> T where T.urns: List[str]` operation.
    """

    def __init__(self, urns: Optional[List[str]]):
        self.urns = urns if urns else []

    def get(self, *args):
        # pylint: disable=unused-argument
        return self


class TestModule(ResourceModule):
    def construct(self, name: str, typ: str, urn: str):
        if typ == "test:index:MockResource":
            return MockResource(name, opts=ResourceOptions(urn=urn))
        raise Exception(f"unknown resource type {typ}")

    def version(self) -> Optional[Version]:
        return None


class TestMocks(Mocks):
    def call(self, args: pulumi.runtime.MockCallArgs) -> Any:
        raise Exception(f"unknown function {args.token}")

    def new_resource(
        self, args: pulumi.runtime.MockResourceArgs
    ) -> Tuple[Optional[str], dict]:
        return args.name + "_id", args.inputs


def assert_output_equal(
    value: Any, known: bool, secret: bool, deps: Optional[List[str]] = None
):
    async def check(actual: Any):
        assert isinstance(actual, pulumi.Output)

        if callable(value):
            res = value(await actual.future())
            if isinstance(res, Awaitable):
                await res
        else:
            assert (await actual.future()) == value

        assert known == await actual.is_known()
        assert secret == await actual.is_secret()

        actual_deps: Set[Optional[str]] = set()
        resources = await actual.resources()
        for r in resources:
            urn = await r.urn.future()
            actual_deps.add(urn)

        assert actual_deps == set(deps if deps else [])
        return True

    return check


def create_secret(value: Any):
    return {rpc._special_sig_key: rpc._special_secret_sig, "value": value}


def create_resource_ref(urn: str, id_: Optional[str]):
    ref = {rpc._special_sig_key: rpc._special_resource_sig, "urn": urn}
    if id_ is not None:
        ref["id"] = id_
    return ref


def create_output_value(
    value: Optional[Any] = None,
    secret: Optional[bool] = None,
    dependencies: Optional[List[str]] = None,
):
    val: Dict[str, Any] = {rpc._special_sig_key: rpc._special_output_value_sig}
    if value is not None:
        val["value"] = value
    if secret is not None:
        val["secret"] = secret
    if dependencies is not None:
        val["dependencies"] = dependencies
    return val


test_urn = "urn:pulumi:stack::project::test:index:MockResource::name"
test_id = "name_id"


class UnmarshalOutputTestCase:
    def __init__(
        self,
        name: str,
        input_: Any,
        deps: Optional[List[str]] = None,
        expected: Optional[Any] = None,
        assert_: Optional[Callable[[Any], Awaitable]] = None,
    ):
        self.name = name
        self.input_ = input_
        self.deps = deps
        self.expected = expected
        self.assert_ = assert_

    async def run(self):
        pulumi.runtime.set_mocks(TestMocks(), "project", "stack", True)
        pulumi.runtime.register_resource_module("test", "index", TestModule())
        # This registers the resource purely for the purpose of the test.
        pulumi.runtime.settings.get_monitor().resources[test_urn] = (
            pulumi.runtime.mocks.MockMonitor.ResourceRegistration(
                test_urn, test_id, dict()
            )
        )

        inputs = {"value": self.input_}
        input_struct = _as_struct(inputs)
        req = ConstructRequest(inputs=input_struct)
        result = await ProviderServicer._construct_inputs(
            req.inputs, MockInputDependencies(self.deps)
        )  # pylint: disable=no-member
        actual = result["value"]
        if self.assert_:
            await self.assert_(actual)
        else:
            assert actual == self.expected


class Assert:
    """Describes a series of asserts to be performed.

    Each assert can be:
    - An async value to be awaited and asserted.
      assert await val

    - A sync function to be called and asserted on. This will be called on the
      same set of arguments that the class was called on.
      assert fn(actual)

    - A plain value to be asserted on.
      assert val

    """

    def __init__(self, *asserts):
        self.asserts = asserts

    async def __call__(self, *args, **kargs):
        for assert_ in self.asserts:
            assert await Assert.__eval(assert_, *args, **kargs)

    @staticmethod
    async def __eval(a, *args, **kargs) -> Any:
        if isinstance(a, Awaitable):
            return await a
        elif isinstance(a, Callable):
            a_res = a(*args, **kargs)
            return await Assert.__eval(a_res, *args, **kargs)
        return a

    @staticmethod
    def async_equal(a, b):
        """Asserts that two values are equal when evaluated with async and
        given the args that `Asserts` were called on.
        """

        async def check(*args, **kargs):
            a_res = await Assert.__eval(a, *args, **kargs)
            b_res = await Assert.__eval(b, *args, **kargs)
            assert a_res == b_res
            return True

        return check


async def array_nested_resource_ref(actual):
    async def helper(v: Any):
        assert isinstance(v, list)
        assert isinstance(v[0], MockResource)
        assert await v[0].urn.future() == test_urn
        assert await v[0].id.future() == test_id

    await assert_output_equal(helper, True, False, [test_urn])(actual)


async def object_nested_resource_ref(actual):
    async def helper(v: Any):
        assert isinstance(v["foo"], MockResource)
        assert await v["foo"].urn.future() == test_urn
        assert await v["foo"].id.future() == test_id

    await assert_output_equal(helper, True, False, [test_urn])(actual)


async def object_nested_resource_ref_and_secret(actual):
    async def helper(v: Any):
        assert isinstance(v["foo"], MockResource)
        assert await v["foo"].urn.future() == test_urn
        assert await v["foo"].id.future() == test_id
        assert v["bar"] == "ssh"

    await assert_output_equal(helper, True, True, [test_urn])(actual)


deserialization_tests = [
    UnmarshalOutputTestCase(
        name="unknown",
        input_=rpc.UNKNOWN,
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="array nested unknown",
        input_=[rpc.UNKNOWN],
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="object nested unknown",
        input_={"foo": rpc.UNKNOWN},
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="unknown output value",
        input_=create_output_value(None, False, ["fakeURN"]),
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="unknown output value (no deps)",
        input_=create_output_value(),
        assert_=assert_output_equal(None, False, False),
    ),
    UnmarshalOutputTestCase(
        name="array nested unknown output value",
        input_=[create_output_value(None, False, ["fakeURN"])],
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
                actual[0]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested unknown output value (no deps)",
        input_=[create_output_value(None, False, ["fakeURN"])],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
                actual[0]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested unknown output value",
        input_={"foo": create_output_value(None, False, ["fakeURN"])},
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
                actual["foo"]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested unknown output value (no deps)",
        input_={"foo": create_output_value(None, False, ["fakeURN"])},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
                actual["foo"]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="string value (no deps)",
        input_="hi",
        expected="hi",
    ),
    UnmarshalOutputTestCase(
        name="array nested string value (no deps)",
        input_=["hi"],
        expected=["hi"],
    ),
    UnmarshalOutputTestCase(
        name="object nested string value (no deps)",
        input_={"foo": "hi"},
        expected={"foo": "hi"},
    ),
    UnmarshalOutputTestCase(
        name="string output value",
        input_=create_output_value("hi", False, ["fakeURN"]),
        deps=["fakeURN"],
        assert_=assert_output_equal("hi", True, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="string output value (no deps)",
        input_=create_output_value("hi"),
        assert_=assert_output_equal("hi", True, False),
    ),
    UnmarshalOutputTestCase(
        name="array nested string output value",
        input_=[create_output_value("hi", False, ["fakeURN"])],
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
                actual[0]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested string output value (no deps)",
        input_=[create_output_value("hi", False, ["fakeURN"])],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
                actual[0]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string output value",
        input_={"foo": create_output_value("hi", False, ["fakeURN"])},
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
                actual["foo"]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string output value (no deps)",
        input_={"foo": create_output_value("hi", False, ["fakeURN"])},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
                actual["foo"]
            ),
        ),
    ),
    UnmarshalOutputTestCase(
        name="string secrets (no deps)",
        input_=create_secret("shh"),
        assert_=assert_output_equal("shh", True, True),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secrets (no deps)",
        input_=[create_secret("shh")],
        assert_=assert_output_equal(["shh"], True, True),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secrets (no deps)",
        input_={"foo": create_secret("shh")},
        assert_=assert_output_equal({"foo": "shh"}, True, True),
    ),
    UnmarshalOutputTestCase(
        name="string secret output value (no deps)",
        input_=create_output_value("shh", True),
        assert_=assert_output_equal("shh", True, True),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secret output value (no deps)",
        input_=[create_output_value("shh", True)],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("shh", True, True)(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secret output value (no deps)",
        input_={"foo": create_output_value("shh", True)},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("shh", True, True)(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="string secret output value",
        input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
        deps=["fakeURN1", "fakeURN2"],
        assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
    ),
    UnmarshalOutputTestCase(
        name="string secret output value (no deps)",
        input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
        assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secret output value",
        input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
        deps=["fakeURN1", "fakeURN2"],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal(
                "shh", True, True, ["fakeURN1", "fakeURN2"]
            )(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secret output value (no deps)",
        input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal(
                "shh", True, True, ["fakeURN1", "fakeURN2"]
            )(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secret output value",
        input_={"foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
        deps=["fakeURN1", "fakeURN2"],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal(
                "shh", True, True, ["fakeURN1", "fakeURN2"]
            )(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secret output value (no deps)",
        input_={"foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal(
                "shh", True, True, ["fakeURN1", "fakeURN2"]
            )(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="resource ref",
        input_=create_resource_ref(test_urn, test_id),
        deps=[test_urn],
        assert_=Assert(
            lambda actual: isinstance(actual, MockResource),
            Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual.id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="resource ref (no deps)",
        input_=create_resource_ref(test_urn, test_id),
        assert_=Assert(
            lambda actual: isinstance(actual, MockResource),
            Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual.id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested resource ref",
        input_=[create_resource_ref(test_urn, test_id)],
        deps=[test_urn],
        assert_=array_nested_resource_ref,
    ),
    UnmarshalOutputTestCase(
        name="array nested resource ref (no deps)",
        input_=[create_resource_ref(test_urn, test_id)],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: isinstance(actual[0], MockResource),
            Assert.async_equal(lambda actual: actual[0].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual[0].id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref",
        input_={"foo": create_resource_ref(test_urn, test_id)},
        deps=[test_urn],
        assert_=object_nested_resource_ref,
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref (no deps)",
        input_={"foo": create_resource_ref(test_urn, test_id)},
        assert_=Assert(
            lambda actual: isinstance(actual["foo"], MockResource),
            Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref and secret",
        input_={
            "foo": create_resource_ref(test_urn, test_id),
            "bar": create_secret("ssh"),
        },
        deps=[test_urn],
        assert_=object_nested_resource_ref_and_secret,
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref and secret output value",
        input_={
            "foo": create_resource_ref(test_urn, test_id),
            "bar": create_output_value("shh", True),
        },
        deps=[test_urn],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: isinstance(actual["foo"], MockResource),
            Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
            lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref and secret output value (no deps)",
        input_={
            "foo": create_resource_ref(test_urn, test_id),
            "bar": create_output_value("shh", True),
        },
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: isinstance(actual["foo"], MockResource),
            Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
            lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
        ),
    ),
]


@pytest.mark.parametrize(
    "testcase",
    deserialization_tests,
    ids=list(map(lambda x: x.name, deserialization_tests)),
)
@pulumi_test
async def test_deserialize_correctly(testcase):
    await testcase.run()