pulumi/sdk/python/lib/pulumi/runtime/invoke.py

267 lines
12 KiB
Python

# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import traceback
from typing import Any, Dict, List, NamedTuple, Optional, Set, TYPE_CHECKING
import grpc
from .. import log
from .. import _types
from ..invoke import InvokeOptions
from ..runtime.proto import provider_pb2
from . import rpc
from .rpc_manager import RPC_MANAGER
from .settings import get_monitor, grpc_error_to_exception, handle_grpc_error
from .sync_await import _sync_await
if TYPE_CHECKING:
from .. import Resource, Inputs, Output
# This setting overrides a hardcoded maximum protobuf size in the python protobuf bindings. This avoids deserialization
# exceptions on large gRPC payloads, but makes it possible to use enough memory to cause an OOM error instead [1].
# Note: We hit the default maximum protobuf size in practice when processing Kubernetes CRDs [2]. If this setting ends
# up causing problems, it should be possible to work around it with more intelligent resource chunking in the k8s
# provider.
#
# [1] https://github.com/protocolbuffers/protobuf/blob/0a59054c30e4f0ba10f10acfc1d7f3814c63e1a7/python/google/protobuf/pyext/message.cc#L2017-L2024
# [2] https://github.com/pulumi/pulumi-kubernetes/issues/984
#
# This setting requires a platform-specific and python version-specific .so file called
# `_message.cpython-[py-version]-[platform].so`, which is not present in situations when a new python version is
# released but the corresponding dist wheel has not been. So, we wrap the import in a try/except to avoid breaking all
# python programs using a new version.
try:
from google.protobuf.pyext._message import SetAllowOversizeProtos # pylint: disable-msg=E0611
SetAllowOversizeProtos(True)
except ImportError:
pass
class InvokeResult:
"""
InvokeResult is a helper type that wraps a prompt value in an Awaitable.
"""
def __init__(self, value):
self.value = value
# pylint: disable=using-constant-test
def __await__(self):
# We need __await__ to be an iterator, but we only want it to return one value. As such, we use
# `if False: yield` to construct this.
if False:
yield self.value
return self.value
__iter__ = __await__
def invoke(tok: str, props: 'Inputs', opts: Optional[InvokeOptions] = None, typ: Optional[type] = None) -> InvokeResult:
"""
invoke dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
can be a bag of computed values (Ts or Awaitable[T]s), and the result is a Awaitable[Any] that
resolves when the invoke finishes.
"""
log.debug(f"Invoking function: tok={tok}")
if opts is None:
opts = InvokeOptions()
if typ and not _types.is_output_type(typ):
raise TypeError("Expected typ to be decorated with @output_type")
async def do_invoke():
# If a parent was provided, but no provider was provided, use the parent's provider if one was specified.
if opts.parent is not None and opts.provider is None:
opts.provider = opts.parent.get_provider(tok)
# Construct a provider reference from the given provider, if one was provided to us.
provider_ref = None
if opts.provider is not None:
provider_urn = await opts.provider.urn.future()
provider_id = (await opts.provider.id.future()) or rpc.UNKNOWN
provider_ref = f"{provider_urn}::{provider_id}"
log.debug(f"Invoke using provider {provider_ref}")
monitor = get_monitor()
inputs = await rpc.serialize_properties(props, {})
version = opts.version or ""
plugin_download_url = opts.plugin_download_url or ""
accept_resources = not (os.getenv("PULUMI_DISABLE_RESOURCE_REFERENCES", "").upper() in {"TRUE", "1"})
log.debug(f"Invoking function prepared: tok={tok}")
req = provider_pb2.InvokeRequest(
tok=tok,
args=inputs,
provider=provider_ref,
version=version,
acceptResources=accept_resources,
pluginDownloadURL=plugin_download_url,
)
def do_invoke():
try:
return monitor.Invoke(req), None
except grpc.RpcError as exn:
return None, grpc_error_to_exception(exn)
resp, error = await asyncio.get_event_loop().run_in_executor(None, do_invoke)
log.debug(f"Invoking function completed: tok={tok}, error={error}")
# If the invoke failed, raise an error.
if error is not None:
return None, Exception(f"invoke of {tok} failed: {error}")
if resp.failures:
return None, Exception(f"invoke of {tok} failed: {resp.failures[0].reason} ({resp.failures[0].property})")
# Otherwise, return the output properties.
ret_obj = getattr(resp, 'return')
if ret_obj:
deserialized = rpc.deserialize_properties(ret_obj)
# If typ is not None, call translate_output_properties to instantiate any output types.
return rpc.translate_output_properties(deserialized, lambda prop: prop, typ) if typ else deserialized, None
return None, None
async def do_rpc():
resp, exn = await RPC_MANAGER.do_rpc("invoke", do_invoke)()
# If there was an RPC level exception, we will raise it. Note that this will also crash the
# process because it will have been considered "unhandled". For semantic level errors, such
# as errors from the data source itself, we return that as part of the returned tuple instead.
if exn is not None:
raise exn
return resp
# Run the RPC callback asynchronously and then immediately await it.
# If there was a semantic error, raise it now, otherwise return the resulting value.
invoke_result, invoke_error = _sync_await(asyncio.ensure_future(do_rpc()))
if invoke_error is not None:
raise invoke_error
return InvokeResult(invoke_result)
def call(tok: str, props: 'Inputs', res: Optional['Resource'] = None, typ: Optional[type] = None) -> 'Output[Any]':
"""
call dynamically invokes the function, tok, which is offered by a provider plugin. The inputs
can be a bag of computed values (Ts or Awaitable[T]s).
"""
log.debug(f"Calling function: tok={tok}")
if typ and not _types.is_output_type(typ):
raise TypeError("Expected typ to be decorated with @output_type")
# Setup the futures for the output.
resolve_value: 'asyncio.Future' = asyncio.Future()
resolve_is_known: 'asyncio.Future[bool]' = asyncio.Future()
resolve_is_secret: 'asyncio.Future[bool]' = asyncio.Future()
resolve_deps: 'asyncio.Future[Set[Resource]]' = asyncio.Future()
from .. import Output # pylint: disable=import-outside-toplevel
out = Output(resolve_deps, resolve_value, resolve_is_known, resolve_is_secret)
async def do_call():
try:
# Construct a provider reference from the given provider, if one is available on the resource.
provider_ref, version, plugin_download_url = None, "", ""
if res is not None:
if res._provider is not None:
provider_urn = await res._provider.urn.future()
provider_id = (await res._provider.id.future()) or rpc.UNKNOWN
provider_ref = f"{provider_urn}::{provider_id}"
log.debug(f"Call using provider {provider_ref}")
version = res._version or ""
plugin_download_url = res._plugin_download_url or ""
monitor = get_monitor()
# Serialize out all props to their final values. In doing so, we'll also collect all the Resources pointed to
# by any Dependency objects we encounter, adding them to 'implicit_dependencies'.
property_dependencies_resources: Dict[str, List['Resource']] = {}
# We keep output values when serializing inputs for call.
inputs = await rpc.serialize_properties(props, property_dependencies_resources, keep_output_values=True)
property_dependencies = {}
for key, property_deps in property_dependencies_resources.items():
urns = set()
for dep in property_deps:
urn = await dep.urn.future()
urns.add(urn)
property_dependencies[key] = provider_pb2.CallRequest.ArgumentDependencies(urns=list(urns))
req = provider_pb2.CallRequest(
tok=tok,
args=inputs,
argDependencies=property_dependencies,
provider=provider_ref,
version=version,
pluginDownloadURL=plugin_download_url,
)
def do_rpc_call():
try:
return monitor.Call(req)
except grpc.RpcError as exn:
handle_grpc_error(exn)
return None
resp = await asyncio.get_event_loop().run_in_executor(None, do_rpc_call)
if resp is None:
return
if resp.failures:
raise Exception(f"call of {tok} failed: {resp.failures[0].reason} ({resp.failures[0].property})")
log.debug(f"Call successful: tok={tok}")
value = None
is_known = True
is_secret = False
deps: Set['Resource'] = set()
ret_obj = getattr(resp, "return")
if ret_obj:
deserialized = rpc.deserialize_properties(ret_obj)
is_known = not rpc.contains_unknowns(deserialized)
# Keep track of whether we need to mark the resulting output a secret,
# and unwrap each individual value.
for k, v in deserialized.items():
if rpc.is_rpc_secret(v):
is_secret = True
deserialized[k] = rpc.unwrap_rpc_secret(v)
# Combine the individual dependencies into a single set of dependency resources.
rpc_deps = resp.returnDependencies
deps_urns: Set[str] = {urn for v in rpc_deps.values() for urn in v.urns} if rpc_deps else set()
from ..resource import DependencyResource # pylint: disable=import-outside-toplevel
deps = set(map(DependencyResource, deps_urns))
if is_known:
# If typ is not None, call translate_output_properties to instantiate any output types.
value = rpc.translate_output_properties(deserialized, lambda p: p, typ) if typ else deserialized
resolve_value.set_result(value)
resolve_is_known.set_result(is_known)
resolve_is_secret.set_result(is_secret)
resolve_deps.set_result(deps)
except Exception as exn:
log.debug(f"exception when preparing or executing rpc: {traceback.format_exc()}")
resolve_value.set_exception(exn)
resolve_is_known.set_exception(exn)
resolve_is_secret.set_exception(exn)
resolve_deps.set_result(set())
raise
asyncio.ensure_future(RPC_MANAGER.do_rpc("call", do_call)())
return out