# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple

import pulumi.output
import pytest
from google.protobuf import struct_pb2
from pulumi.provider.server import ProviderServicer
from pulumi.resource import CustomResource, ResourceOptions
from pulumi.runtime import Mocks, ResourceModule, proto, rpc, rpc_manager
from pulumi.runtime.proto.provider_pb2 import ConstructRequest
from pulumi.runtime.settings import Settings, configure
from semver import VersionInfo as Version


def pulumi_test(coro):
    wrapped = pulumi.runtime.test(coro)

    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        configure(Settings("project", "stack"))
        rpc._RESOURCE_PACKAGES.clear()
        rpc._RESOURCE_MODULES.clear()

        wrapped(*args, **kwargs)

    return wrapper


@pytest.mark.asyncio
async def test_construct_inputs_parses_request():
    value = 'foobar'
    inputs = _as_struct({'echo': value})
    req = ConstructRequest(inputs=inputs)
    inputs = await ProviderServicer._construct_inputs(req.inputs, req.inputDependencies) # pylint: disable=no-member
    assert len(inputs) == 1
    assert inputs['echo'] == value


@pytest.mark.asyncio
async def test_construct_inputs_preserves_unknowns():
    unknown = '04da6b54-80e4-46f7-96ec-b56ff0331ba9'
    inputs = _as_struct({'echo': unknown})
    req = ConstructRequest(inputs=inputs)
    inputs = await ProviderServicer._construct_inputs(req.inputs, req.inputDependencies) # pylint: disable=no-member
    assert len(inputs) == 1
    assert isinstance(inputs['echo'], pulumi.output.Unknown)


def _as_struct(key_values: Dict[str, Any]) -> struct_pb2.Struct:
    the_struct = struct_pb2.Struct()
    the_struct.update(key_values)  # pylint: disable=no-member
    return the_struct

class MockResource(CustomResource):
    def __init__(self, name: str, opts: Optional[ResourceOptions] = None):
        CustomResource.__init__(self, "test:index:MockResource", name, opts=opts)

class MockInputDependencies:
    """ A mock for ConstructRequest.inputDependencies

    We need only support a `get() -> T where T.urns: List[str]` operation.
    """
    def __init__(self, urns: Optional[List[str]]):
        self.urns = urns if urns else []

    def get(self, *args):
        #pylint: disable=unused-argument
        return self

class TestModule(ResourceModule):
    def construct(self, name: str, typ: str, urn: str):
        if typ == "test:index:MockResource":
            return MockResource(name, opts=ResourceOptions(urn=urn))
        raise Exception(f"unknown resource type {typ}")

    def version(self) -> Optional[Version]:
        return None

class TestMocks(Mocks):
    def call(self, args: pulumi.runtime.MockCallArgs) -> Any:
        raise Exception(f"unknown function {args.token}")

    def new_resource(self, args: pulumi.runtime.MockResourceArgs) -> Tuple[Optional[str], dict]:
        return args.name+"_id", args.inputs

def assert_output_equal(value: Any,
                              known: bool, secret: bool,
                              deps: Optional[List[str]] = None):
    async def check(actual: Any):
        assert isinstance(actual, pulumi.Output)

        if callable(value):
            res = value(await actual.future())
            if isinstance(res, Awaitable):
                await res
        else:
            assert (await actual.future()) == value

        assert known == await actual.is_known()
        assert secret == await actual.is_secret()

        actual_deps: Set[Optional[str]] = set()
        resources = await actual.resources()
        for r in resources:
            urn = await r.urn.future()
            actual_deps.add(urn)

        assert actual_deps == set(deps if deps else [])
        return True
    return check


def create_secret(value: Any):
    return {rpc._special_sig_key: rpc._special_secret_sig, "value": value}

def create_resource_ref(urn: str, id_: Optional[str]):
    ref = {rpc._special_sig_key: rpc._special_resource_sig, "urn": urn}
    if id_ is not None:
        ref["id"] = id_
    return ref

def create_output_value(value: Optional[Any] = None,
                        secret: Optional[bool] = None,
                        dependencies: Optional[List[str]] = None):
    val: Dict[str, Any] = {rpc._special_sig_key: rpc._special_output_value_sig}
    if value is not None:
        val["value"] = value
    if secret is not None:
        val["secret"] = secret
    if dependencies is not None:
        val["dependencies"] = dependencies
    return val

test_urn = "urn:pulumi:stack::project::test:index:MockResource::name"
test_id = "name_id"

class UnmarshalOutputTestCase:
    def __init__(self,
                 name: str,
                 input_: Any,
                 deps: Optional[List[str]] = None,
                 expected: Optional[Any] = None,
                 assert_: Optional[Callable[[Any], Awaitable]] = None):
        self.name = name
        self.input_ = input_
        self.deps = deps
        self.expected = expected
        self.assert_ = assert_

    async def run(self):
        pulumi.runtime.set_mocks(TestMocks(), "project", "stack", True)
        pulumi.runtime.register_resource_module("test", "index", TestModule())
        # This registers the resource purely for the purpose of the test.
        pulumi.runtime.settings.get_monitor().resources[test_urn] = \
            pulumi.runtime.mocks.MockMonitor.ResourceRegistration(test_urn, test_id, dict())

        inputs = { "value": self.input_ }
        input_struct = _as_struct(inputs)
        req = ConstructRequest(inputs=input_struct)
        result = await ProviderServicer._construct_inputs(
            req.inputs, MockInputDependencies(self.deps)) # pylint: disable=no-member
        actual = result["value"]
        if self.assert_:
            await self.assert_(actual)
        else:
            assert actual == self.expected


class Assert:
    """Describes a series of asserts to be performed.

    Each assert can be:
    - An async value to be awaited and asserted.
      assert await val

    - A sync function to be called and asserted on. This will be called on the
      same set of arguments that the class was called on.
      assert fn(actual)

    - A plain value to be asserted on.
      assert val

    """
    def __init__(self, *asserts):
        self.asserts = asserts

    async def __call__(self, *args, **kargs):
        for assert_ in self.asserts:
            assert await Assert.__eval(assert_, *args, **kargs)

    @staticmethod
    async def __eval(a, *args, **kargs) -> Any:
        if isinstance(a, Awaitable):
            return await a
        elif isinstance(a, Callable):
            a_res = a(*args, **kargs)
            return await Assert.__eval(a_res, *args, **kargs)
        return a

    @staticmethod
    def async_equal(a, b):
        """Asserts that two values are equal when evaluated with async and
        given the args that `Asserts` were called on.
        """
        async def check(*args, **kargs):
            a_res = await Assert.__eval(a, *args, **kargs)
            b_res = await Assert.__eval(b, *args, **kargs)
            assert a_res == b_res
            return True
        return check

async def array_nested_resource_ref(actual):
    async def helper(v: Any):
        assert isinstance(v, list)
        assert isinstance(v[0], MockResource)
        assert await v[0].urn.future() == test_urn
        assert await v[0].id.future() == test_id
    await assert_output_equal(helper, True, False, [test_urn])(actual)

async def object_nested_resource_ref(actual):
    async def helper(v: Any):
        assert isinstance(v["foo"], MockResource)
        assert await v["foo"].urn.future() == test_urn
        assert await v["foo"].id.future() == test_id
    await assert_output_equal(helper, True, False, [test_urn])(actual)

async def object_nested_resource_ref_and_secret(actual):
    async def helper(v: Any):
        assert isinstance(v["foo"], MockResource)
        assert await v["foo"].urn.future() == test_urn
        assert await v["foo"].id.future() == test_id
        assert v["bar"] == "ssh"
    await assert_output_equal(helper, True, True, [test_urn])(actual)


deserialization_tests = [
    UnmarshalOutputTestCase(
        name="unknown",
        input_=rpc.UNKNOWN,
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="array nested unknown",
        input_=[rpc.UNKNOWN],
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="object nested unknown",
        input_={"foo": rpc.UNKNOWN},
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="unknown output value",
        input_=create_output_value(None, False, ["fakeURN"]),
        deps=["fakeURN"],
        assert_=assert_output_equal(None, False, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="unknown output value (no deps)",
        input_=create_output_value(),
        assert_=assert_output_equal(None, False, False),
    ),
    UnmarshalOutputTestCase(
        name="array nested unknown output value",
        input_=[create_output_value(None, False, ["fakeURN"])],
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual[0])
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested unknown output value (no deps)",
        input_=[create_output_value(None, False, ["fakeURN"])],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual[0])
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested unknown output value",
        input_= { "foo": create_output_value(None, False, ["fakeURN"]) },
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested unknown output value (no deps)",
        input_= { "foo": create_output_value(None, False, ["fakeURN"]) },
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="string value (no deps)",
        input_="hi",
        expected="hi",
    ),
    UnmarshalOutputTestCase(
        name="array nested string value (no deps)",
        input_=["hi"],
        expected=["hi"],
    ),
    UnmarshalOutputTestCase(
        name="object nested string value (no deps)",
        input_= { "foo": "hi" },
        expected= { "foo": "hi" },
    ),
    UnmarshalOutputTestCase(
        name="string output value",
        input_=create_output_value("hi", False, ["fakeURN"]),
        deps=["fakeURN"],
        assert_=assert_output_equal("hi", True, False, ["fakeURN"]),
    ),
    UnmarshalOutputTestCase(
        name="string output value (no deps)",
        input_=create_output_value("hi"),
        assert_=assert_output_equal("hi", True, False),
    ),
    UnmarshalOutputTestCase(
        name="array nested string output value",
        input_=[create_output_value("hi", False, ["fakeURN"])],
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested string output value (no deps)",
        input_=[create_output_value("hi", False, ["fakeURN"])],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string output value",
        input_={ "foo": create_output_value("hi", False, ["fakeURN"])},
        deps=["fakeURN"],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual["foo"])
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string output value (no deps)",
        input_={ "foo": create_output_value("hi", False, ["fakeURN"])},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual["foo"])
        ),
    ),
    UnmarshalOutputTestCase(
        name="string secrets (no deps)",
        input_=create_secret("shh"),
        assert_=assert_output_equal("shh", True, True),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secrets (no deps)",
        input_=[create_secret("shh")],
        assert_=assert_output_equal(["shh"], True, True),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secrets (no deps)",
        input_={ "foo": create_secret("shh")},
        assert_=assert_output_equal({"foo": "shh"}, True, True),
    ),
    UnmarshalOutputTestCase(
        name="string secret output value (no deps)",
        input_=create_output_value("shh", True),
        assert_=assert_output_equal("shh", True, True),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secret output value (no deps)",
        input_=[create_output_value("shh", True)],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("shh", True, True)(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secret output value (no deps)",
        input_={"foo": create_output_value("shh", True)},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("shh", True, True)(actual["foo"]),
            ),
    ),
    UnmarshalOutputTestCase(
        name="string secret output value",
        input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
        deps=["fakeURN1", "fakeURN2"],
        assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
    ),
    UnmarshalOutputTestCase(
        name="string secret output value (no deps)",
        input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
        assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secret output value",
        input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
        deps=["fakeURN1", "fakeURN2"],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested string secret output value (no deps)",
        input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual[0]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secret output value",
        input_={ "foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
        deps=["fakeURN1", "fakeURN2"],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested string secret output value (no deps)",
        input_={ "foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual["foo"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="resource ref",
        input_=create_resource_ref(test_urn, test_id),
        deps=[test_urn],
        assert_=Assert(
            lambda actual: isinstance(actual, MockResource),
            Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual.id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="resource ref (no deps)",
        input_=create_resource_ref(test_urn, test_id),
        assert_=Assert(
            lambda actual: isinstance(actual, MockResource),
            Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual.id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="array nested resource ref",
        input_=[create_resource_ref(test_urn, test_id)],
        deps=[test_urn],
        assert_=array_nested_resource_ref
    ),
    UnmarshalOutputTestCase(
        name="array nested resource ref (no deps)",
        input_=[create_resource_ref(test_urn, test_id)],
        assert_=Assert(
            lambda actual: isinstance(actual, list),
            lambda actual: isinstance(actual[0], MockResource),
            Assert.async_equal(lambda actual: actual[0].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual[0].id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref",
        input_={ "foo": create_resource_ref(test_urn, test_id) },
        deps=[test_urn],
        assert_=object_nested_resource_ref
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref (no deps)",
        input_={ "foo": create_resource_ref(test_urn, test_id) },
        assert_=Assert(
            lambda actual: isinstance(actual["foo"], MockResource),
            Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref and secret",
        input_={
            "foo": create_resource_ref(test_urn, test_id),
            "bar": create_secret("ssh"),
        },
        deps=[test_urn],
        assert_=object_nested_resource_ref_and_secret
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref and secret output value",
        input_={
            "foo": create_resource_ref(test_urn, test_id),
            "bar": create_output_value("shh", True),
        },
        deps=[test_urn],
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: isinstance(actual["foo"], MockResource),
            Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
            lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
        ),
    ),
    UnmarshalOutputTestCase(
        name="object nested resource ref and secret output value (no deps)",
        input_={
            "foo": create_resource_ref(test_urn, test_id),
            "bar": create_output_value("shh", True),
        },
        assert_=Assert(
            lambda actual: not isinstance(actual, pulumi.Output),
            lambda actual: isinstance(actual["foo"], MockResource),
            Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
            Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
            lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
        ),
    ),
]


@pytest.mark.parametrize(
    "testcase", deserialization_tests, ids=list(map(lambda x: x.name, deserialization_tests)))
@pulumi_test
async def test_deserialize_correctly(testcase):
    await testcase.run()