mirror of https://github.com/pulumi/pulumi.git
143 lines
4.7 KiB
Python
143 lines
4.7 KiB
Python
# Copyright 2016-2018, Pulumi Corporation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
from google.protobuf import struct_pb2
|
|
from pulumi import CustomResource
|
|
from pulumi.runtime import rpc, known_types, Unknown
|
|
from pulumi.asset import FileAsset, StringAsset, RemoteAsset
|
|
|
|
class PropertySerializeTests(unittest.TestCase):
|
|
"""
|
|
Series of tests that ensures that we serialize Python datatypes
|
|
into Protobuf datatypes correctly.
|
|
"""
|
|
def test_empty(self):
|
|
"""
|
|
Tests that we serialize the empty Struct correctly.
|
|
"""
|
|
|
|
struct = rpc.serialize_resource_props({})
|
|
self.assertTrue(isinstance(struct, struct_pb2.Struct))
|
|
self.assertEqual(0, len(struct))
|
|
|
|
def test_dict_of_lists(self):
|
|
"""
|
|
Tests that we serialize a struct containing a list correctly.
|
|
"""
|
|
|
|
struct = rpc.serialize_resource_props({
|
|
"foo": [1, "2", True]
|
|
})
|
|
self.assertTrue(isinstance(struct, struct_pb2.Struct))
|
|
|
|
# pylint: disable=unsubscriptable-object
|
|
proto_list = struct["foo"]
|
|
self.assertTrue(isinstance(proto_list, struct_pb2.ListValue))
|
|
self.assertEqual(1, proto_list[0])
|
|
self.assertEqual("2", proto_list[1])
|
|
self.assertEqual(True, proto_list[2])
|
|
|
|
def test_unknown(self):
|
|
"""
|
|
Tests that we serialize instances of the Unknown class to
|
|
the unknown GUID.
|
|
"""
|
|
struct = rpc.serialize_resource_props({
|
|
"unknown_prop": Unknown()
|
|
})
|
|
|
|
# pylint: disable=unsubscriptable-object
|
|
unknown = struct["unknown_prop"]
|
|
self.assertEqual(rpc.UNKNOWN, unknown)
|
|
|
|
def test_file_asset(self):
|
|
"""
|
|
Tests that we serialize file assets correctly.
|
|
"""
|
|
struct = rpc.serialize_resource_props({
|
|
"asset": FileAsset("file.txt")
|
|
})
|
|
|
|
asset = struct["asset"]
|
|
self.assertEqual(rpc._special_asset_sig, asset[rpc._special_sig_key])
|
|
self.assertEqual("file.txt", asset["path"])
|
|
|
|
def test_string_asset(self):
|
|
"""
|
|
Tests that we serialize string assets correctly.
|
|
"""
|
|
struct = rpc.serialize_resource_props({
|
|
"asset": StringAsset("how do i python")
|
|
})
|
|
|
|
asset = struct["asset"]
|
|
self.assertEqual(rpc._special_asset_sig, asset[rpc._special_sig_key])
|
|
self.assertEqual("how do i python", asset["text"])
|
|
|
|
def test_remote_asset(self):
|
|
"""
|
|
Tests that we serialize remote assets correctly.
|
|
"""
|
|
struct = rpc.serialize_resource_props({
|
|
"asset": RemoteAsset("https://pulumi.io")
|
|
})
|
|
|
|
asset = struct["asset"]
|
|
self.assertEqual(rpc._special_asset_sig, asset[rpc._special_sig_key])
|
|
self.assertEqual("https://pulumi.io", asset["uri"])
|
|
|
|
|
|
class FakeCustomResource(object):
|
|
"""
|
|
Fake CustomResource class that duck-types to the real CustomResource.
|
|
This class is substituted for the real CustomResource for the below test.
|
|
"""
|
|
def __init__(self, id):
|
|
self.id = id
|
|
|
|
|
|
class CustomResourceSerializeTest(unittest.TestCase):
|
|
"""
|
|
Tests that we serialize CustomResources by serializing their ID.
|
|
"""
|
|
def setUp(self):
|
|
"""
|
|
Sets up the test by replacing the CustomResource that the rpc serialization
|
|
system knows about with the above FakeCustomResource, which doesn't interact
|
|
with the resource monitor.
|
|
"""
|
|
known_types._custom_resource_type = FakeCustomResource
|
|
|
|
def tearDown(self):
|
|
"""
|
|
Tears down the test by re-setting the rpc serialization system's known CustomResource.
|
|
"""
|
|
known_types._custom_resource_type = CustomResource
|
|
|
|
def test_custom_resource(self):
|
|
"""
|
|
Tests that the class registered by `register_custom_resource_type`
|
|
is serialized by serializing its ID field.
|
|
"""
|
|
struct = rpc.serialize_resource_props({
|
|
"fake": FakeCustomResource(42)
|
|
})
|
|
|
|
self.assertTrue(isinstance(struct, struct_pb2.Struct))
|
|
|
|
# pylint: disable=unsubscriptable-object
|
|
serialized_resource = struct["fake"]
|
|
self.assertEqual(42, serialized_resource)
|