pulumi/sdk/python/lib/test/provider/test_server.py

608 lines
21 KiB
Python

# Copyright 2016-2021, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple
import pulumi.output
import pytest
from google.protobuf import struct_pb2
from pulumi.provider.server import ProviderServicer
from pulumi.resource import CustomResource, ResourceOptions
from pulumi.runtime import Mocks, ResourceModule, proto, rpc, rpc_manager
from pulumi.runtime.proto.provider_pb2 import ConstructRequest
from pulumi.runtime.settings import Settings, configure
from semver import VersionInfo as Version
def pulumi_test(coro):
wrapped = pulumi.runtime.test(coro)
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
configure(Settings("project", "stack"))
rpc._RESOURCE_PACKAGES.clear()
rpc._RESOURCE_MODULES.clear()
wrapped(*args, **kwargs)
return wrapper
@pytest.mark.asyncio
async def test_construct_inputs_parses_request():
value = "foobar"
inputs = _as_struct({"echo": value})
req = ConstructRequest(inputs=inputs)
inputs = await ProviderServicer._construct_inputs(req.inputs, req.inputDependencies) # pylint: disable=no-member
assert len(inputs) == 1
assert inputs["echo"] == value
@pytest.mark.asyncio
async def test_construct_inputs_preserves_unknowns():
unknown = "04da6b54-80e4-46f7-96ec-b56ff0331ba9"
inputs = _as_struct({"echo": unknown})
req = ConstructRequest(inputs=inputs)
inputs = await ProviderServicer._construct_inputs(req.inputs, req.inputDependencies) # pylint: disable=no-member
assert len(inputs) == 1
assert isinstance(inputs["echo"], pulumi.output.Unknown)
def _as_struct(key_values: Dict[str, Any]) -> struct_pb2.Struct:
the_struct = struct_pb2.Struct()
the_struct.update(key_values) # pylint: disable=no-member
return the_struct
class MockResource(CustomResource):
def __init__(self, name: str, opts: Optional[ResourceOptions] = None):
CustomResource.__init__(self, "test:index:MockResource", name, opts=opts)
class MockInputDependencies:
"""A mock for ConstructRequest.inputDependencies
We need only support a `get() -> T where T.urns: List[str]` operation.
"""
def __init__(self, urns: Optional[List[str]]):
self.urns = urns if urns else []
def get(self, *args):
# pylint: disable=unused-argument
return self
class TestModule(ResourceModule):
def construct(self, name: str, typ: str, urn: str):
if typ == "test:index:MockResource":
return MockResource(name, opts=ResourceOptions(urn=urn))
raise Exception(f"unknown resource type {typ}")
def version(self) -> Optional[Version]:
return None
class TestMocks(Mocks):
def call(self, args: pulumi.runtime.MockCallArgs) -> Any:
raise Exception(f"unknown function {args.token}")
def new_resource(
self, args: pulumi.runtime.MockResourceArgs
) -> Tuple[Optional[str], dict]:
return args.name + "_id", args.inputs
def assert_output_equal(
value: Any, known: bool, secret: bool, deps: Optional[List[str]] = None
):
async def check(actual: Any):
assert isinstance(actual, pulumi.Output)
if callable(value):
res = value(await actual.future())
if isinstance(res, Awaitable):
await res
else:
assert (await actual.future()) == value
assert known == await actual.is_known()
assert secret == await actual.is_secret()
actual_deps: Set[Optional[str]] = set()
resources = await actual.resources()
for r in resources:
urn = await r.urn.future()
actual_deps.add(urn)
assert actual_deps == set(deps if deps else [])
return True
return check
def create_secret(value: Any):
return {rpc._special_sig_key: rpc._special_secret_sig, "value": value}
def create_resource_ref(urn: str, id_: Optional[str]):
ref = {rpc._special_sig_key: rpc._special_resource_sig, "urn": urn}
if id_ is not None:
ref["id"] = id_
return ref
def create_output_value(
value: Optional[Any] = None,
secret: Optional[bool] = None,
dependencies: Optional[List[str]] = None,
):
val: Dict[str, Any] = {rpc._special_sig_key: rpc._special_output_value_sig}
if value is not None:
val["value"] = value
if secret is not None:
val["secret"] = secret
if dependencies is not None:
val["dependencies"] = dependencies
return val
test_urn = "urn:pulumi:stack::project::test:index:MockResource::name"
test_id = "name_id"
class UnmarshalOutputTestCase:
def __init__(
self,
name: str,
input_: Any,
deps: Optional[List[str]] = None,
expected: Optional[Any] = None,
assert_: Optional[Callable[[Any], Awaitable]] = None,
):
self.name = name
self.input_ = input_
self.deps = deps
self.expected = expected
self.assert_ = assert_
async def run(self):
pulumi.runtime.set_mocks(TestMocks(), "project", "stack", True)
pulumi.runtime.register_resource_module("test", "index", TestModule())
# This registers the resource purely for the purpose of the test.
pulumi.runtime.settings.get_monitor().resources[test_urn] = (
pulumi.runtime.mocks.MockMonitor.ResourceRegistration(
test_urn, test_id, dict()
)
)
inputs = {"value": self.input_}
input_struct = _as_struct(inputs)
req = ConstructRequest(inputs=input_struct)
result = await ProviderServicer._construct_inputs(
req.inputs, MockInputDependencies(self.deps)
) # pylint: disable=no-member
actual = result["value"]
if self.assert_:
await self.assert_(actual)
else:
assert actual == self.expected
class Assert:
"""Describes a series of asserts to be performed.
Each assert can be:
- An async value to be awaited and asserted.
assert await val
- A sync function to be called and asserted on. This will be called on the
same set of arguments that the class was called on.
assert fn(actual)
- A plain value to be asserted on.
assert val
"""
def __init__(self, *asserts):
self.asserts = asserts
async def __call__(self, *args, **kargs):
for assert_ in self.asserts:
assert await Assert.__eval(assert_, *args, **kargs)
@staticmethod
async def __eval(a, *args, **kargs) -> Any:
if isinstance(a, Awaitable):
return await a
elif isinstance(a, Callable):
a_res = a(*args, **kargs)
return await Assert.__eval(a_res, *args, **kargs)
return a
@staticmethod
def async_equal(a, b):
"""Asserts that two values are equal when evaluated with async and
given the args that `Asserts` were called on.
"""
async def check(*args, **kargs):
a_res = await Assert.__eval(a, *args, **kargs)
b_res = await Assert.__eval(b, *args, **kargs)
assert a_res == b_res
return True
return check
async def array_nested_resource_ref(actual):
async def helper(v: Any):
assert isinstance(v, list)
assert isinstance(v[0], MockResource)
assert await v[0].urn.future() == test_urn
assert await v[0].id.future() == test_id
await assert_output_equal(helper, True, False, [test_urn])(actual)
async def object_nested_resource_ref(actual):
async def helper(v: Any):
assert isinstance(v["foo"], MockResource)
assert await v["foo"].urn.future() == test_urn
assert await v["foo"].id.future() == test_id
await assert_output_equal(helper, True, False, [test_urn])(actual)
async def object_nested_resource_ref_and_secret(actual):
async def helper(v: Any):
assert isinstance(v["foo"], MockResource)
assert await v["foo"].urn.future() == test_urn
assert await v["foo"].id.future() == test_id
assert v["bar"] == "ssh"
await assert_output_equal(helper, True, True, [test_urn])(actual)
deserialization_tests = [
UnmarshalOutputTestCase(
name="unknown",
input_=rpc.UNKNOWN,
deps=["fakeURN"],
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
),
UnmarshalOutputTestCase(
name="array nested unknown",
input_=[rpc.UNKNOWN],
deps=["fakeURN"],
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
),
UnmarshalOutputTestCase(
name="object nested unknown",
input_={"foo": rpc.UNKNOWN},
deps=["fakeURN"],
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
),
UnmarshalOutputTestCase(
name="unknown output value",
input_=create_output_value(None, False, ["fakeURN"]),
deps=["fakeURN"],
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
),
UnmarshalOutputTestCase(
name="unknown output value (no deps)",
input_=create_output_value(),
assert_=assert_output_equal(None, False, False),
),
UnmarshalOutputTestCase(
name="array nested unknown output value",
input_=[create_output_value(None, False, ["fakeURN"])],
deps=["fakeURN"],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
actual[0]
),
),
),
UnmarshalOutputTestCase(
name="array nested unknown output value (no deps)",
input_=[create_output_value(None, False, ["fakeURN"])],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
actual[0]
),
),
),
UnmarshalOutputTestCase(
name="object nested unknown output value",
input_={"foo": create_output_value(None, False, ["fakeURN"])},
deps=["fakeURN"],
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
actual["foo"]
),
),
),
UnmarshalOutputTestCase(
name="object nested unknown output value (no deps)",
input_={"foo": create_output_value(None, False, ["fakeURN"])},
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(
actual["foo"]
),
),
),
UnmarshalOutputTestCase(
name="string value (no deps)",
input_="hi",
expected="hi",
),
UnmarshalOutputTestCase(
name="array nested string value (no deps)",
input_=["hi"],
expected=["hi"],
),
UnmarshalOutputTestCase(
name="object nested string value (no deps)",
input_={"foo": "hi"},
expected={"foo": "hi"},
),
UnmarshalOutputTestCase(
name="string output value",
input_=create_output_value("hi", False, ["fakeURN"]),
deps=["fakeURN"],
assert_=assert_output_equal("hi", True, False, ["fakeURN"]),
),
UnmarshalOutputTestCase(
name="string output value (no deps)",
input_=create_output_value("hi"),
assert_=assert_output_equal("hi", True, False),
),
UnmarshalOutputTestCase(
name="array nested string output value",
input_=[create_output_value("hi", False, ["fakeURN"])],
deps=["fakeURN"],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
actual[0]
),
),
),
UnmarshalOutputTestCase(
name="array nested string output value (no deps)",
input_=[create_output_value("hi", False, ["fakeURN"])],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
actual[0]
),
),
),
UnmarshalOutputTestCase(
name="object nested string output value",
input_={"foo": create_output_value("hi", False, ["fakeURN"])},
deps=["fakeURN"],
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
actual["foo"]
),
),
),
UnmarshalOutputTestCase(
name="object nested string output value (no deps)",
input_={"foo": create_output_value("hi", False, ["fakeURN"])},
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(
actual["foo"]
),
),
),
UnmarshalOutputTestCase(
name="string secrets (no deps)",
input_=create_secret("shh"),
assert_=assert_output_equal("shh", True, True),
),
UnmarshalOutputTestCase(
name="array nested string secrets (no deps)",
input_=[create_secret("shh")],
assert_=assert_output_equal(["shh"], True, True),
),
UnmarshalOutputTestCase(
name="object nested string secrets (no deps)",
input_={"foo": create_secret("shh")},
assert_=assert_output_equal({"foo": "shh"}, True, True),
),
UnmarshalOutputTestCase(
name="string secret output value (no deps)",
input_=create_output_value("shh", True),
assert_=assert_output_equal("shh", True, True),
),
UnmarshalOutputTestCase(
name="array nested string secret output value (no deps)",
input_=[create_output_value("shh", True)],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal("shh", True, True)(actual[0]),
),
),
UnmarshalOutputTestCase(
name="object nested string secret output value (no deps)",
input_={"foo": create_output_value("shh", True)},
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal("shh", True, True)(actual["foo"]),
),
),
UnmarshalOutputTestCase(
name="string secret output value",
input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
deps=["fakeURN1", "fakeURN2"],
assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
),
UnmarshalOutputTestCase(
name="string secret output value (no deps)",
input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
),
UnmarshalOutputTestCase(
name="array nested string secret output value",
input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
deps=["fakeURN1", "fakeURN2"],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal(
"shh", True, True, ["fakeURN1", "fakeURN2"]
)(actual[0]),
),
),
UnmarshalOutputTestCase(
name="array nested string secret output value (no deps)",
input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: assert_output_equal(
"shh", True, True, ["fakeURN1", "fakeURN2"]
)(actual[0]),
),
),
UnmarshalOutputTestCase(
name="object nested string secret output value",
input_={"foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
deps=["fakeURN1", "fakeURN2"],
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal(
"shh", True, True, ["fakeURN1", "fakeURN2"]
)(actual["foo"]),
),
),
UnmarshalOutputTestCase(
name="object nested string secret output value (no deps)",
input_={"foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: assert_output_equal(
"shh", True, True, ["fakeURN1", "fakeURN2"]
)(actual["foo"]),
),
),
UnmarshalOutputTestCase(
name="resource ref",
input_=create_resource_ref(test_urn, test_id),
deps=[test_urn],
assert_=Assert(
lambda actual: isinstance(actual, MockResource),
Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
Assert.async_equal(lambda actual: actual.id.future(), test_id),
),
),
UnmarshalOutputTestCase(
name="resource ref (no deps)",
input_=create_resource_ref(test_urn, test_id),
assert_=Assert(
lambda actual: isinstance(actual, MockResource),
Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
Assert.async_equal(lambda actual: actual.id.future(), test_id),
),
),
UnmarshalOutputTestCase(
name="array nested resource ref",
input_=[create_resource_ref(test_urn, test_id)],
deps=[test_urn],
assert_=array_nested_resource_ref,
),
UnmarshalOutputTestCase(
name="array nested resource ref (no deps)",
input_=[create_resource_ref(test_urn, test_id)],
assert_=Assert(
lambda actual: isinstance(actual, list),
lambda actual: isinstance(actual[0], MockResource),
Assert.async_equal(lambda actual: actual[0].urn.future(), test_urn),
Assert.async_equal(lambda actual: actual[0].id.future(), test_id),
),
),
UnmarshalOutputTestCase(
name="object nested resource ref",
input_={"foo": create_resource_ref(test_urn, test_id)},
deps=[test_urn],
assert_=object_nested_resource_ref,
),
UnmarshalOutputTestCase(
name="object nested resource ref (no deps)",
input_={"foo": create_resource_ref(test_urn, test_id)},
assert_=Assert(
lambda actual: isinstance(actual["foo"], MockResource),
Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
),
),
UnmarshalOutputTestCase(
name="object nested resource ref and secret",
input_={
"foo": create_resource_ref(test_urn, test_id),
"bar": create_secret("ssh"),
},
deps=[test_urn],
assert_=object_nested_resource_ref_and_secret,
),
UnmarshalOutputTestCase(
name="object nested resource ref and secret output value",
input_={
"foo": create_resource_ref(test_urn, test_id),
"bar": create_output_value("shh", True),
},
deps=[test_urn],
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: isinstance(actual["foo"], MockResource),
Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
),
),
UnmarshalOutputTestCase(
name="object nested resource ref and secret output value (no deps)",
input_={
"foo": create_resource_ref(test_urn, test_id),
"bar": create_output_value("shh", True),
},
assert_=Assert(
lambda actual: not isinstance(actual, pulumi.Output),
lambda actual: isinstance(actual["foo"], MockResource),
Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
),
),
]
@pytest.mark.parametrize(
"testcase",
deserialization_tests,
ids=list(map(lambda x: x.name, deserialization_tests)),
)
@pulumi_test
async def test_deserialize_correctly(testcase):
await testcase.run()