mirror of https://github.com/pulumi/pulumi.git
551 lines
20 KiB
Python
551 lines
20 KiB
Python
# Copyright 2016-2021, Pulumi Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import functools
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple
|
|
|
|
import pulumi.output
|
|
import pytest
|
|
from google.protobuf import struct_pb2
|
|
from pulumi.provider.server import ProviderServicer
|
|
from pulumi.resource import CustomResource, ResourceOptions
|
|
from pulumi.runtime import Mocks, ResourceModule, proto, rpc, rpc_manager
|
|
from pulumi.runtime.proto.provider_pb2 import ConstructRequest
|
|
from pulumi.runtime.settings import Settings, configure
|
|
from semver import VersionInfo as Version
|
|
|
|
|
|
def pulumi_test(coro):
|
|
wrapped = pulumi.runtime.test(coro)
|
|
|
|
@functools.wraps(wrapped)
|
|
def wrapper(*args, **kwargs):
|
|
configure(Settings("project", "stack"))
|
|
rpc._RESOURCE_PACKAGES.clear()
|
|
rpc._RESOURCE_MODULES.clear()
|
|
|
|
wrapped(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_construct_inputs_parses_request():
|
|
value = 'foobar'
|
|
inputs = _as_struct({'echo': value})
|
|
req = ConstructRequest(inputs=inputs)
|
|
inputs = await ProviderServicer._construct_inputs(req.inputs, req.inputDependencies) # pylint: disable=no-member
|
|
assert len(inputs) == 1
|
|
assert inputs['echo'] == value
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_construct_inputs_preserves_unknowns():
|
|
unknown = '04da6b54-80e4-46f7-96ec-b56ff0331ba9'
|
|
inputs = _as_struct({'echo': unknown})
|
|
req = ConstructRequest(inputs=inputs)
|
|
inputs = await ProviderServicer._construct_inputs(req.inputs, req.inputDependencies) # pylint: disable=no-member
|
|
assert len(inputs) == 1
|
|
assert isinstance(inputs['echo'], pulumi.output.Unknown)
|
|
|
|
|
|
def _as_struct(key_values: Dict[str, Any]) -> struct_pb2.Struct:
|
|
the_struct = struct_pb2.Struct()
|
|
the_struct.update(key_values) # pylint: disable=no-member
|
|
return the_struct
|
|
|
|
class MockResource(CustomResource):
|
|
def __init__(self, name: str, opts: Optional[ResourceOptions] = None):
|
|
CustomResource.__init__(self, "test:index:MockResource", name, opts=opts)
|
|
|
|
class MockInputDependencies:
|
|
""" A mock for ConstructRequest.inputDependencies
|
|
|
|
We need only support a `get() -> T where T.urns: List[str]` operation.
|
|
"""
|
|
def __init__(self, urns: Optional[List[str]]):
|
|
self.urns = urns if urns else []
|
|
|
|
def get(self, *args):
|
|
#pylint: disable=unused-argument
|
|
return self
|
|
|
|
class TestModule(ResourceModule):
|
|
def construct(self, name: str, typ: str, urn: str):
|
|
if typ == "test:index:MockResource":
|
|
return MockResource(name, opts=ResourceOptions(urn=urn))
|
|
raise Exception(f"unknown resource type {typ}")
|
|
|
|
def version(self) -> Optional[Version]:
|
|
return None
|
|
|
|
class TestMocks(Mocks):
|
|
def call(self, args: pulumi.runtime.MockCallArgs) -> Any:
|
|
raise Exception(f"unknown function {args.token}")
|
|
|
|
def new_resource(self, args: pulumi.runtime.MockResourceArgs) -> Tuple[Optional[str], dict]:
|
|
return args.name+"_id", args.inputs
|
|
|
|
def assert_output_equal(value: Any,
|
|
known: bool, secret: bool,
|
|
deps: Optional[List[str]] = None):
|
|
async def check(actual: Any):
|
|
assert isinstance(actual, pulumi.Output)
|
|
|
|
if callable(value):
|
|
res = value(await actual.future())
|
|
if isinstance(res, Awaitable):
|
|
await res
|
|
else:
|
|
assert (await actual.future()) == value
|
|
|
|
assert known == await actual.is_known()
|
|
assert secret == await actual.is_secret()
|
|
|
|
actual_deps: Set[Optional[str]] = set()
|
|
resources = await actual.resources()
|
|
for r in resources:
|
|
urn = await r.urn.future()
|
|
actual_deps.add(urn)
|
|
|
|
assert actual_deps == set(deps if deps else [])
|
|
return True
|
|
return check
|
|
|
|
|
|
def create_secret(value: Any):
|
|
return {rpc._special_sig_key: rpc._special_secret_sig, "value": value}
|
|
|
|
def create_resource_ref(urn: str, id_: Optional[str]):
|
|
ref = {rpc._special_sig_key: rpc._special_resource_sig, "urn": urn}
|
|
if id_ is not None:
|
|
ref["id"] = id_
|
|
return ref
|
|
|
|
def create_output_value(value: Optional[Any] = None,
|
|
secret: Optional[bool] = None,
|
|
dependencies: Optional[List[str]] = None):
|
|
val: Dict[str, Any] = {rpc._special_sig_key: rpc._special_output_value_sig}
|
|
if value is not None:
|
|
val["value"] = value
|
|
if secret is not None:
|
|
val["secret"] = secret
|
|
if dependencies is not None:
|
|
val["dependencies"] = dependencies
|
|
return val
|
|
|
|
test_urn = "urn:pulumi:stack::project::test:index:MockResource::name"
|
|
test_id = "name_id"
|
|
|
|
class UnmarshalOutputTestCase:
|
|
def __init__(self,
|
|
name: str,
|
|
input_: Any,
|
|
deps: Optional[List[str]] = None,
|
|
expected: Optional[Any] = None,
|
|
assert_: Optional[Callable[[Any], Awaitable]] = None):
|
|
self.name = name
|
|
self.input_ = input_
|
|
self.deps = deps
|
|
self.expected = expected
|
|
self.assert_ = assert_
|
|
|
|
async def run(self):
|
|
pulumi.runtime.set_mocks(TestMocks(), "project", "stack", True)
|
|
pulumi.runtime.register_resource_module("test", "index", TestModule())
|
|
# This registers the resource purely for the purpose of the test.
|
|
pulumi.runtime.settings.get_monitor().resources[test_urn] = \
|
|
pulumi.runtime.mocks.MockMonitor.ResourceRegistration(test_urn, test_id, dict())
|
|
|
|
inputs = { "value": self.input_ }
|
|
input_struct = _as_struct(inputs)
|
|
req = ConstructRequest(inputs=input_struct)
|
|
result = await ProviderServicer._construct_inputs(
|
|
req.inputs, MockInputDependencies(self.deps)) # pylint: disable=no-member
|
|
actual = result["value"]
|
|
if self.assert_:
|
|
await self.assert_(actual)
|
|
else:
|
|
assert actual == self.expected
|
|
|
|
|
|
class Assert:
|
|
"""Describes a series of asserts to be performed.
|
|
|
|
Each assert can be:
|
|
- An async value to be awaited and asserted.
|
|
assert await val
|
|
|
|
- A sync function to be called and asserted on. This will be called on the
|
|
same set of arguments that the class was called on.
|
|
assert fn(actual)
|
|
|
|
- A plain value to be asserted on.
|
|
assert val
|
|
|
|
"""
|
|
def __init__(self, *asserts):
|
|
self.asserts = asserts
|
|
|
|
async def __call__(self, *args, **kargs):
|
|
for assert_ in self.asserts:
|
|
assert await Assert.__eval(assert_, *args, **kargs)
|
|
|
|
@staticmethod
|
|
async def __eval(a, *args, **kargs) -> Any:
|
|
if isinstance(a, Awaitable):
|
|
return await a
|
|
elif isinstance(a, Callable):
|
|
a_res = a(*args, **kargs)
|
|
return await Assert.__eval(a_res, *args, **kargs)
|
|
return a
|
|
|
|
@staticmethod
|
|
def async_equal(a, b):
|
|
"""Asserts that two values are equal when evaluated with async and
|
|
given the args that `Asserts` were called on.
|
|
"""
|
|
async def check(*args, **kargs):
|
|
a_res = await Assert.__eval(a, *args, **kargs)
|
|
b_res = await Assert.__eval(b, *args, **kargs)
|
|
assert a_res == b_res
|
|
return True
|
|
return check
|
|
|
|
async def array_nested_resource_ref(actual):
|
|
async def helper(v: Any):
|
|
assert isinstance(v, list)
|
|
assert isinstance(v[0], MockResource)
|
|
assert await v[0].urn.future() == test_urn
|
|
assert await v[0].id.future() == test_id
|
|
await assert_output_equal(helper, True, False, [test_urn])(actual)
|
|
|
|
async def object_nested_resource_ref(actual):
|
|
async def helper(v: Any):
|
|
assert isinstance(v["foo"], MockResource)
|
|
assert await v["foo"].urn.future() == test_urn
|
|
assert await v["foo"].id.future() == test_id
|
|
await assert_output_equal(helper, True, False, [test_urn])(actual)
|
|
|
|
async def object_nested_resource_ref_and_secret(actual):
|
|
async def helper(v: Any):
|
|
assert isinstance(v["foo"], MockResource)
|
|
assert await v["foo"].urn.future() == test_urn
|
|
assert await v["foo"].id.future() == test_id
|
|
assert v["bar"] == "ssh"
|
|
await assert_output_equal(helper, True, True, [test_urn])(actual)
|
|
|
|
|
|
deserialization_tests = [
|
|
UnmarshalOutputTestCase(
|
|
name="unknown",
|
|
input_=rpc.UNKNOWN,
|
|
deps=["fakeURN"],
|
|
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested unknown",
|
|
input_=[rpc.UNKNOWN],
|
|
deps=["fakeURN"],
|
|
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested unknown",
|
|
input_={"foo": rpc.UNKNOWN},
|
|
deps=["fakeURN"],
|
|
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="unknown output value",
|
|
input_=create_output_value(None, False, ["fakeURN"]),
|
|
deps=["fakeURN"],
|
|
assert_=assert_output_equal(None, False, False, ["fakeURN"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="unknown output value (no deps)",
|
|
input_=create_output_value(),
|
|
assert_=assert_output_equal(None, False, False),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested unknown output value",
|
|
input_=[create_output_value(None, False, ["fakeURN"])],
|
|
deps=["fakeURN"],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual[0])
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested unknown output value (no deps)",
|
|
input_=[create_output_value(None, False, ["fakeURN"])],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual[0])
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested unknown output value",
|
|
input_= { "foo": create_output_value(None, False, ["fakeURN"]) },
|
|
deps=["fakeURN"],
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual["foo"]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested unknown output value (no deps)",
|
|
input_= { "foo": create_output_value(None, False, ["fakeURN"]) },
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal(None, False, False, ["fakeURN"])(actual["foo"]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string value (no deps)",
|
|
input_="hi",
|
|
expected="hi",
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string value (no deps)",
|
|
input_=["hi"],
|
|
expected=["hi"],
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string value (no deps)",
|
|
input_= { "foo": "hi" },
|
|
expected= { "foo": "hi" },
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string output value",
|
|
input_=create_output_value("hi", False, ["fakeURN"]),
|
|
deps=["fakeURN"],
|
|
assert_=assert_output_equal("hi", True, False, ["fakeURN"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string output value (no deps)",
|
|
input_=create_output_value("hi"),
|
|
assert_=assert_output_equal("hi", True, False),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string output value",
|
|
input_=[create_output_value("hi", False, ["fakeURN"])],
|
|
deps=["fakeURN"],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual[0]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string output value (no deps)",
|
|
input_=[create_output_value("hi", False, ["fakeURN"])],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual[0]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string output value",
|
|
input_={ "foo": create_output_value("hi", False, ["fakeURN"])},
|
|
deps=["fakeURN"],
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual["foo"])
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string output value (no deps)",
|
|
input_={ "foo": create_output_value("hi", False, ["fakeURN"])},
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal("hi", True, False, ["fakeURN"])(actual["foo"])
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string secrets (no deps)",
|
|
input_=create_secret("shh"),
|
|
assert_=assert_output_equal("shh", True, True),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string secrets (no deps)",
|
|
input_=[create_secret("shh")],
|
|
assert_=assert_output_equal(["shh"], True, True),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string secrets (no deps)",
|
|
input_={ "foo": create_secret("shh")},
|
|
assert_=assert_output_equal({"foo": "shh"}, True, True),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string secret output value (no deps)",
|
|
input_=create_output_value("shh", True),
|
|
assert_=assert_output_equal("shh", True, True),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string secret output value (no deps)",
|
|
input_=[create_output_value("shh", True)],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal("shh", True, True)(actual[0]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string secret output value (no deps)",
|
|
input_={"foo": create_output_value("shh", True)},
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal("shh", True, True)(actual["foo"]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string secret output value",
|
|
input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
|
|
deps=["fakeURN1", "fakeURN2"],
|
|
assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="string secret output value (no deps)",
|
|
input_=create_output_value("shh", True, ["fakeURN1", "fakeURN2"]),
|
|
assert_=assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"]),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string secret output value",
|
|
input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
|
|
deps=["fakeURN1", "fakeURN2"],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual[0]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested string secret output value (no deps)",
|
|
input_=[create_output_value("shh", True, ["fakeURN1", "fakeURN2"])],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual[0]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string secret output value",
|
|
input_={ "foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
|
|
deps=["fakeURN1", "fakeURN2"],
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual["foo"]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested string secret output value (no deps)",
|
|
input_={ "foo": create_output_value("shh", True, ["fakeURN1", "fakeURN2"])},
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: assert_output_equal("shh", True, True, ["fakeURN1", "fakeURN2"])(actual["foo"]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="resource ref",
|
|
input_=create_resource_ref(test_urn, test_id),
|
|
deps=[test_urn],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, MockResource),
|
|
Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
|
|
Assert.async_equal(lambda actual: actual.id.future(), test_id),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="resource ref (no deps)",
|
|
input_=create_resource_ref(test_urn, test_id),
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, MockResource),
|
|
Assert.async_equal(lambda actual: actual.urn.future(), test_urn),
|
|
Assert.async_equal(lambda actual: actual.id.future(), test_id),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested resource ref",
|
|
input_=[create_resource_ref(test_urn, test_id)],
|
|
deps=[test_urn],
|
|
assert_=array_nested_resource_ref
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="array nested resource ref (no deps)",
|
|
input_=[create_resource_ref(test_urn, test_id)],
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual, list),
|
|
lambda actual: isinstance(actual[0], MockResource),
|
|
Assert.async_equal(lambda actual: actual[0].urn.future(), test_urn),
|
|
Assert.async_equal(lambda actual: actual[0].id.future(), test_id),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested resource ref",
|
|
input_={ "foo": create_resource_ref(test_urn, test_id) },
|
|
deps=[test_urn],
|
|
assert_=object_nested_resource_ref
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested resource ref (no deps)",
|
|
input_={ "foo": create_resource_ref(test_urn, test_id) },
|
|
assert_=Assert(
|
|
lambda actual: isinstance(actual["foo"], MockResource),
|
|
Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
|
|
Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested resource ref and secret",
|
|
input_={
|
|
"foo": create_resource_ref(test_urn, test_id),
|
|
"bar": create_secret("ssh"),
|
|
},
|
|
deps=[test_urn],
|
|
assert_=object_nested_resource_ref_and_secret
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested resource ref and secret output value",
|
|
input_={
|
|
"foo": create_resource_ref(test_urn, test_id),
|
|
"bar": create_output_value("shh", True),
|
|
},
|
|
deps=[test_urn],
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: isinstance(actual["foo"], MockResource),
|
|
Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
|
|
Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
|
|
lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
|
|
),
|
|
),
|
|
UnmarshalOutputTestCase(
|
|
name="object nested resource ref and secret output value (no deps)",
|
|
input_={
|
|
"foo": create_resource_ref(test_urn, test_id),
|
|
"bar": create_output_value("shh", True),
|
|
},
|
|
assert_=Assert(
|
|
lambda actual: not isinstance(actual, pulumi.Output),
|
|
lambda actual: isinstance(actual["foo"], MockResource),
|
|
Assert.async_equal(lambda actual: actual["foo"].urn.future(), test_urn),
|
|
Assert.async_equal(lambda actual: actual["foo"].id.future(), test_id),
|
|
lambda actual: assert_output_equal("shh", True, True)(actual["bar"]),
|
|
),
|
|
),
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"testcase", deserialization_tests, ids=list(map(lambda x: x.name, deserialization_tests)))
|
|
@pulumi_test
|
|
async def test_deserialize_correctly(testcase):
|
|
await testcase.run()
|