2024-07-03 14:47:59 +00:00
|
|
|
// Copyright 2016-2024, Pulumi Corporation.
|
2018-05-22 19:43:36 +00:00
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2017-09-27 19:34:44 +00:00
|
|
|
|
2020-04-14 08:30:25 +00:00
|
|
|
import * as grpc from "@grpc/grpc-js";
|
2019-10-22 07:20:26 +00:00
|
|
|
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-11 23:00:46 +00:00
|
|
|
import { AsyncIterable } from "@pulumi/query/interfaces";
|
|
|
|
|
2024-07-03 14:47:59 +00:00
|
|
|
import { gatherExplicitDependencies } from "./dependsOn";
|
Implement first-class providers. (#1695)
### First-Class Providers
These changes implement support for first-class providers. First-class
providers are provider plugins that are exposed as resources via the
Pulumi programming model so that they may be explicitly and multiply
instantiated. Each instance of a provider resource may be configured
differently, and configuration parameters may be source from the
outputs of other resources.
### Provider Plugin Changes
In order to accommodate the need to verify and diff provider
configuration and configure providers without complete configuration
information, these changes adjust the high-level provider plugin
interface. Two new methods for validating a provider's configuration
and diffing changes to the same have been added (`CheckConfig` and
`DiffConfig`, respectively), and the type of the configuration bag
accepted by `Configure` has been changed to a `PropertyMap`.
These changes have not yet been reflected in the provider plugin gRPC
interface. We will do this in a set of follow-up changes. Until then,
these methods are implemented by adapters:
- `CheckConfig` validates that all configuration parameters are string
or unknown properties. This is necessary because existing plugins
only accept string-typed configuration values.
- `DiffConfig` either returns "never replace" if all configuration
values are known or "must replace" if any configuration value is
unknown. The justification for this behavior is given
[here](https://github.com/pulumi/pulumi/pull/1695/files#diff-a6cd5c7f337665f5bb22e92ca5f07537R106)
- `Configure` converts the config bag to a legacy config map and
configures the provider plugin if all config values are known. If any
config value is unknown, the underlying plugin is not configured and
the provider may only perform `Check`, `Read`, and `Invoke`, all of
which return empty results. We justify this behavior becuase it is
only possible during a preview and provides the best experience we
can manage with the existing gRPC interface.
### Resource Model Changes
Providers are now exposed as resources that participate in a stack's
dependency graph. Like other resources, they are explicitly created,
may have multiple instances, and may have dependencies on other
resources. Providers are referred to using provider references, which
are a combination of the provider's URN and its ID. This design
addresses the need during a preview to refer to providers that have not
yet been physically created and therefore have no ID.
All custom resources that are not themselves providers must specify a
single provider via a provider reference. The named provider will be
used to manage that resource's CRUD operations. If a resource's
provider reference changes, the resource must be replaced. Though its
URN is not present in the resource's dependency list, the provider
should be treated as a dependency of the resource when topologically
sorting the dependency graph.
Finally, `Invoke` operations must now specify a provider to use for the
invocation via a provider reference.
### Engine Changes
First-class providers support requires a few changes to the engine:
- The engine must have some way to map from provider references to
provider plugins. It must be possible to add providers from a stack's
checkpoint to this map and to register new/updated providers during
the execution of a plan in response to CRUD operations on provider
resources.
- In order to support updating existing stacks using existing Pulumi
programs that may not explicitly instantiate providers, the engine
must be able to manage the "default" providers for each package
referenced by a checkpoint or Pulumi program. The configuration for
a "default" provider is taken from the stack's configuration data.
The former need is addressed by adding a provider registry type that is
responsible for managing all of the plugins required by a plan. In
addition to loading plugins froma checkpoint and providing the ability
to map from a provider reference to a provider plugin, this type serves
as the provider plugin for providers themselves (i.e. it is the
"provider provider").
The latter need is solved via two relatively self-contained changes to
plan setup and the eval source.
During plan setup, the old checkpoint is scanned for custom resources
that do not have a provider reference in order to compute the set of
packages that require a default provider. Once this set has been
computed, the required default provider definitions are conjured and
prepended to the checkpoint's resource list. Each resource that
requires a default provider is then updated to refer to the default
provider for its package.
While an eval source is running, each custom resource registration,
resource read, and invoke that does not name a provider is trapped
before being returned by the source iterator. If no default provider
for the appropriate package has been registered, the eval source
synthesizes an appropriate registration, waits for it to complete, and
records the registered provider's reference. This reference is injected
into the original request, which is then processed as usual. If a
default provider was already registered, the recorded reference is
used and no new registration occurs.
### SDK Changes
These changes only expose first-class providers from the Node.JS SDK.
- A new abstract class, `ProviderResource`, can be subclassed and used
to instantiate first-class providers.
- A new field in `ResourceOptions`, `provider`, can be used to supply
a particular provider instance to manage a `CustomResource`'s CRUD
operations.
- A new type, `InvokeOptions`, can be used to specify options that
control the behavior of a call to `pulumi.runtime.invoke`. This type
includes a `provider` field that is analogous to
`ResourceOptions.provider`.
2018-08-07 00:50:29 +00:00
|
|
|
import { InvokeOptions } from "../invoke";
|
2017-10-08 19:10:46 +00:00
|
|
|
import * as log from "../log";
|
2019-10-15 05:08:06 +00:00
|
|
|
import { Inputs, Output } from "../output";
|
2017-09-27 19:34:44 +00:00
|
|
|
import { debuggablePromise } from "./debuggable";
|
2020-09-24 02:06:26 +00:00
|
|
|
import {
|
2023-04-28 22:27:10 +00:00
|
|
|
deserializeProperties,
|
|
|
|
isRpcSecret,
|
|
|
|
serializeProperties,
|
|
|
|
serializePropertiesReturnDeps,
|
|
|
|
unwrapRpcSecret,
|
|
|
|
} from "./rpc";
|
|
|
|
import { excessiveDebugOutput, getMonitor, rpcKeepAlive, terminateRpcs } from "./settings";
|
2019-10-15 05:08:06 +00:00
|
|
|
|
2021-07-07 23:03:56 +00:00
|
|
|
import { DependencyResource, ProviderResource, Resource } from "../resource";
|
2019-10-15 05:08:06 +00:00
|
|
|
import * as utils from "../utils";
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-11 23:00:46 +00:00
|
|
|
import { PushableAsyncIterable } from "./asyncIterableUtil";
|
2017-09-27 19:34:44 +00:00
|
|
|
|
2023-12-04 15:22:44 +00:00
|
|
|
import * as gstruct from "google-protobuf/google/protobuf/struct_pb";
|
|
|
|
import * as resourceproto from "../proto/resource_pb";
|
2024-03-02 00:00:57 +00:00
|
|
|
import * as providerproto from "../proto/provider_pb";
|
2017-09-27 19:34:44 +00:00
|
|
|
|
|
|
|
/**
|
2019-10-15 05:08:06 +00:00
|
|
|
* `invoke` dynamically invokes the function, `tok`, which is offered by a provider plugin. `invoke`
|
|
|
|
* behaves differently in the case that options contains `{async:true}` or not.
|
|
|
|
*
|
|
|
|
* In the case where `{async:true}` is present in the options bag:
|
|
|
|
*
|
|
|
|
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider plugin.
|
|
|
|
* 2. the `props` inputs can be a bag of computed values (including, `T`s, `Promise<T>`s,
|
|
|
|
* `Output<T>`s etc.).
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* In the case where `{async:true}` is not present in the options bag:
|
|
|
|
*
|
|
|
|
* 1. the result of `invoke` will be a Promise resolved to the result value of the provider call.
|
|
|
|
* However, that Promise will *also* have the respective values of the Provider result exposed
|
|
|
|
* directly on it as properties.
|
|
|
|
*
|
|
|
|
* 2. The inputs must be a bag of simple values, and the result is the result that the Provider
|
|
|
|
* produced.
|
|
|
|
*
|
|
|
|
* Simple values are:
|
|
|
|
* 1. `undefined`, `null`, string, number or boolean values.
|
|
|
|
* 2. arrays of simple values.
|
|
|
|
* 3. objects containing only simple values.
|
|
|
|
*
|
|
|
|
* Importantly, simple values do *not* include:
|
|
|
|
* 1. `Promise`s
|
|
|
|
* 2. `Output`s
|
|
|
|
* 3. `Asset`s or `Archive`s
|
|
|
|
* 4. `Resource`s.
|
|
|
|
*
|
|
|
|
* All of these contain async values that would prevent `invoke from being able to operate
|
|
|
|
* synchronously.
|
2017-09-27 19:34:44 +00:00
|
|
|
*/
|
2019-10-15 05:08:06 +00:00
|
|
|
export function invoke(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
|
2020-04-14 08:30:25 +00:00
|
|
|
return invokeAsync(tok, props, opts);
|
2019-10-15 05:08:06 +00:00
|
|
|
}
|
|
|
|
|
2022-12-06 21:15:40 +00:00
|
|
|
/*
|
|
|
|
* `invokeSingle` dynamically invokes the function, `tok`, which is offered by a provider plugin.
|
|
|
|
* Similar to `invoke`, but returns a single value instead of an object with a single key.
|
|
|
|
*/
|
|
|
|
export function invokeSingle(tok: string, props: Inputs, opts: InvokeOptions = {}): Promise<any> {
|
2023-04-28 22:27:10 +00:00
|
|
|
return invokeAsync(tok, props, opts).then((outputs) => {
|
2022-12-06 21:15:40 +00:00
|
|
|
// assume outputs have a single key
|
|
|
|
const keys = Object.keys(outputs);
|
|
|
|
// return the first key's value from the outputs
|
|
|
|
return outputs[keys[0]];
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
export async function streamInvoke(
|
2019-10-22 07:20:26 +00:00
|
|
|
tok: string,
|
|
|
|
props: Inputs,
|
|
|
|
opts: InvokeOptions = {},
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
): Promise<StreamInvokeResponse<any>> {
|
|
|
|
const label = `StreamInvoking function: tok=${tok} asynchronously`;
|
|
|
|
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
|
|
|
|
|
|
|
// Wait for all values to be available, and then perform the RPC.
|
|
|
|
const done = rpcKeepAlive();
|
|
|
|
try {
|
2024-07-03 14:47:59 +00:00
|
|
|
// Wait for any explicit dependencies to complete before proceeding.
|
|
|
|
await gatherExplicitDependencies(opts.dependsOn);
|
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
const serialized = await serializeProperties(`streamInvoke:${tok}`, props);
|
|
|
|
log.debug(
|
2023-04-28 22:27:10 +00:00
|
|
|
`StreamInvoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
// Fetch the monitor and make an RPC request.
|
|
|
|
const monitor: any = getMonitor();
|
|
|
|
|
|
|
|
const provider = await ProviderResource.register(getProvider(tok, opts));
|
|
|
|
const req = createInvokeRequest(tok, serialized, provider, opts);
|
|
|
|
|
|
|
|
// Call `streamInvoke`.
|
2021-07-07 23:03:56 +00:00
|
|
|
const result = monitor.streamInvoke(req, {});
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-11 23:00:46 +00:00
|
|
|
const queue = new PushableAsyncIterable();
|
2023-04-28 22:27:10 +00:00
|
|
|
result.on("data", function (thing: any) {
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-11 23:00:46 +00:00
|
|
|
const live = deserializeResponse(tok, thing);
|
|
|
|
queue.push(live);
|
|
|
|
});
|
2021-07-07 23:03:56 +00:00
|
|
|
result.on("error", (err: any) => {
|
2020-07-10 16:56:35 +00:00
|
|
|
if (err.code === 1) {
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-11 23:00:46 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
throw err;
|
|
|
|
});
|
2021-07-07 23:03:56 +00:00
|
|
|
result.on("end", () => {
|
`StreamInvoke` should return `AsyncIterable` that completes
A user who calls `StreamInvoke` probably expects the `AsyncIterable`
that is returned to gracefully terminate. This is currently not the
case.
Where does something like this go wrong? A better question might be
where any of this went right, because several days later, after
wandering into civilization from the great Wilderness of Bugs, I must
confess that I've forgotten if any of it had.
`AsyncIterable` is a pull-based API. `for await (...)` will continuously
call `next` ("pull") on the underlying `AsyncIterator` until the
iterable is exhausted. But, gRPC's streaming-return API is _push_ based.
That is to say, when a streaming RPC is called, data is provided by
callback on the stream object, like:
call.on("data", (thing: any) => {... do thing ...});
Our goal in `StreamInvoke` is to convert the push-based gRPC routines
into the pull-based `AsyncIterable` retrun type. You may remember your
CS theory this is one of those annoying "fundamental mismatches" in
abstraction. So we're off to a good start.
Until this point, we've depended on a library,
`callback-to-async-iterator` to handle the details of being this bridge.
Our trusting nature and innocent charm has mislead us. This library is
not worthy of our trust. Instead of doing what we'd like it to do, it
returns (in our case) an `AsyncIterable` that will never complete.
Yes,, this `AsyncIterable` will patiently wait for eternity, which
honestly is kind of poetic when you sit down in a nice bath and think
about that fun time you considered eating your computer instead of
finishing this idiotic bug.
Indeed, this is the sort of bug that you wonder where it even comes
from. Our query libraries? Why aren't these `finally` blocks executing?
Is our language host terminating early? Is gRPC angry at me, and just
passive-aggrssively not servicing some of my requests? Oh god I've been
up for 48 hours, why is that wallpaper starting to move? And by the way,
a fun interlude to take in an otherwise very productive week is to try
to understand the gRPC streaming node client, which is code-gen'd, but
which also takes the liberty of generating itself at runtime, so that
gRPC is code-gen'ing a code-gen routine, which makes the whole thing
un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't
need to understand any of this anyway, thanks friends.
But we've come out the other side knowing that the weak link in this
very sorry chain of incredibly weak links, is this dependency.
This commit removes this dependency for a better monster: the one we
know.
It is at this time that I'd like to announce that I am quitting my job
at Pulumi. I thank you all for the good times, but mostly, for taking
this code over for me.
2019-11-11 23:00:46 +00:00
|
|
|
queue.complete();
|
|
|
|
});
|
2019-10-22 07:20:26 +00:00
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
// Return a cancellable handle to the stream.
|
2023-04-28 22:27:10 +00:00
|
|
|
return new StreamInvokeResponse(queue, () => result.cancel());
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
} finally {
|
|
|
|
done();
|
|
|
|
}
|
2019-10-22 07:20:26 +00:00
|
|
|
}
|
|
|
|
|
2019-10-15 05:08:06 +00:00
|
|
|
async function invokeAsync(tok: string, props: Inputs, opts: InvokeOptions): Promise<any> {
|
|
|
|
const label = `Invoking function: tok=${tok} asynchronously`;
|
|
|
|
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
|
|
|
|
2018-01-25 23:26:39 +00:00
|
|
|
// Wait for all values to be available, and then perform the RPC.
|
|
|
|
const done = rpcKeepAlive();
|
|
|
|
try {
|
2024-07-03 14:47:59 +00:00
|
|
|
// Wait for any explicit dependencies to complete before proceeding.
|
|
|
|
await gatherExplicitDependencies(opts.dependsOn);
|
|
|
|
|
2019-10-15 05:08:06 +00:00
|
|
|
const serialized = await serializeProperties(`invoke:${tok}`, props);
|
2023-04-28 22:27:10 +00:00
|
|
|
log.debug(
|
|
|
|
`Invoke RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
|
|
|
|
);
|
2018-01-25 23:26:39 +00:00
|
|
|
|
|
|
|
// Fetch the monitor and make an RPC request.
|
|
|
|
const monitor: any = getMonitor();
|
|
|
|
|
2019-10-15 05:08:06 +00:00
|
|
|
const provider = await ProviderResource.register(getProvider(tok, opts));
|
|
|
|
const req = createInvokeRequest(tok, serialized, provider, opts);
|
Implement first-class providers. (#1695)
### First-Class Providers
These changes implement support for first-class providers. First-class
providers are provider plugins that are exposed as resources via the
Pulumi programming model so that they may be explicitly and multiply
instantiated. Each instance of a provider resource may be configured
differently, and configuration parameters may be source from the
outputs of other resources.
### Provider Plugin Changes
In order to accommodate the need to verify and diff provider
configuration and configure providers without complete configuration
information, these changes adjust the high-level provider plugin
interface. Two new methods for validating a provider's configuration
and diffing changes to the same have been added (`CheckConfig` and
`DiffConfig`, respectively), and the type of the configuration bag
accepted by `Configure` has been changed to a `PropertyMap`.
These changes have not yet been reflected in the provider plugin gRPC
interface. We will do this in a set of follow-up changes. Until then,
these methods are implemented by adapters:
- `CheckConfig` validates that all configuration parameters are string
or unknown properties. This is necessary because existing plugins
only accept string-typed configuration values.
- `DiffConfig` either returns "never replace" if all configuration
values are known or "must replace" if any configuration value is
unknown. The justification for this behavior is given
[here](https://github.com/pulumi/pulumi/pull/1695/files#diff-a6cd5c7f337665f5bb22e92ca5f07537R106)
- `Configure` converts the config bag to a legacy config map and
configures the provider plugin if all config values are known. If any
config value is unknown, the underlying plugin is not configured and
the provider may only perform `Check`, `Read`, and `Invoke`, all of
which return empty results. We justify this behavior becuase it is
only possible during a preview and provides the best experience we
can manage with the existing gRPC interface.
### Resource Model Changes
Providers are now exposed as resources that participate in a stack's
dependency graph. Like other resources, they are explicitly created,
may have multiple instances, and may have dependencies on other
resources. Providers are referred to using provider references, which
are a combination of the provider's URN and its ID. This design
addresses the need during a preview to refer to providers that have not
yet been physically created and therefore have no ID.
All custom resources that are not themselves providers must specify a
single provider via a provider reference. The named provider will be
used to manage that resource's CRUD operations. If a resource's
provider reference changes, the resource must be replaced. Though its
URN is not present in the resource's dependency list, the provider
should be treated as a dependency of the resource when topologically
sorting the dependency graph.
Finally, `Invoke` operations must now specify a provider to use for the
invocation via a provider reference.
### Engine Changes
First-class providers support requires a few changes to the engine:
- The engine must have some way to map from provider references to
provider plugins. It must be possible to add providers from a stack's
checkpoint to this map and to register new/updated providers during
the execution of a plan in response to CRUD operations on provider
resources.
- In order to support updating existing stacks using existing Pulumi
programs that may not explicitly instantiate providers, the engine
must be able to manage the "default" providers for each package
referenced by a checkpoint or Pulumi program. The configuration for
a "default" provider is taken from the stack's configuration data.
The former need is addressed by adding a provider registry type that is
responsible for managing all of the plugins required by a plan. In
addition to loading plugins froma checkpoint and providing the ability
to map from a provider reference to a provider plugin, this type serves
as the provider plugin for providers themselves (i.e. it is the
"provider provider").
The latter need is solved via two relatively self-contained changes to
plan setup and the eval source.
During plan setup, the old checkpoint is scanned for custom resources
that do not have a provider reference in order to compute the set of
packages that require a default provider. Once this set has been
computed, the required default provider definitions are conjured and
prepended to the checkpoint's resource list. Each resource that
requires a default provider is then updated to refer to the default
provider for its package.
While an eval source is running, each custom resource registration,
resource read, and invoke that does not name a provider is trapped
before being returned by the source iterator. If no default provider
for the appropriate package has been registered, the eval source
synthesizes an appropriate registration, waits for it to complete, and
records the registered provider's reference. This reference is injected
into the original request, which is then processed as usual. If a
default provider was already registered, the recorded reference is
used and no new registration occurs.
### SDK Changes
These changes only expose first-class providers from the Node.JS SDK.
- A new abstract class, `ProviderResource`, can be subclassed and used
to instantiate first-class providers.
- A new field in `ResourceOptions`, `provider`, can be used to supply
a particular provider instance to manage a `CustomResource`'s CRUD
operations.
- A new type, `InvokeOptions`, can be used to specify options that
control the behavior of a call to `pulumi.runtime.invoke`. This type
includes a `provider` field that is analogous to
`ResourceOptions.provider`.
2018-08-07 00:50:29 +00:00
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
const resp: any = await debuggablePromise(
|
|
|
|
new Promise((innerResolve, innerReject) =>
|
|
|
|
monitor.invoke(req, (err: grpc.ServiceError, innerResponse: any) => {
|
|
|
|
log.debug(`Invoke RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
|
|
|
|
if (err) {
|
|
|
|
// If the monitor is unavailable, it is in the process of shutting down or has already
|
|
|
|
// shut down. Don't emit an error and don't do any more RPCs, just exit.
|
|
|
|
if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
|
|
|
|
terminateRpcs();
|
|
|
|
err.message = "Resource monitor is terminating";
|
|
|
|
innerReject(err);
|
|
|
|
return;
|
|
|
|
}
|
2018-05-16 22:37:34 +00:00
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
// If the RPC failed, rethrow the error with a native exception and the message that
|
|
|
|
// the engine provided - it's suitable for user presentation.
|
|
|
|
innerReject(new Error(err.details));
|
|
|
|
} else {
|
|
|
|
innerResolve(innerResponse);
|
|
|
|
}
|
|
|
|
}),
|
|
|
|
),
|
|
|
|
label,
|
|
|
|
);
|
2017-09-27 19:34:44 +00:00
|
|
|
|
2018-01-25 23:26:39 +00:00
|
|
|
// Finally propagate any other properties that were given to us as outputs.
|
2019-10-15 05:08:06 +00:00
|
|
|
return deserializeResponse(tok, resp);
|
2023-04-28 22:27:10 +00:00
|
|
|
} finally {
|
2018-01-25 23:26:39 +00:00
|
|
|
done();
|
|
|
|
}
|
2017-09-27 19:34:44 +00:00
|
|
|
}
|
2019-10-15 05:08:06 +00:00
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
// StreamInvokeResponse represents a (potentially infinite) streaming response to `streamInvoke`,
|
|
|
|
// with facilities to gracefully cancel and clean up the stream.
|
|
|
|
export class StreamInvokeResponse<T> implements AsyncIterable<T> {
|
2024-06-24 11:14:56 +00:00
|
|
|
constructor(
|
|
|
|
private source: AsyncIterable<T>,
|
|
|
|
private cancelSource: () => void,
|
|
|
|
) {}
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
|
|
|
|
// cancel signals the `streamInvoke` should be cancelled and cleaned up gracefully.
|
|
|
|
public cancel() {
|
|
|
|
this.cancelSource();
|
|
|
|
}
|
2019-10-22 07:20:26 +00:00
|
|
|
|
Make `streamInvoke` gracefully-cancellable from SDKs
The @pulumi/pulumi TypScript SDK exposes `streamInvoke`, which returns a
(potentially infinite) stream of responses. This currently is _assumed_
to be infinite, in that there is no way to signal cancellation, and
prevents Pulumi from being able to clean up when we're finished using
the results of the `streamInvoke`.
This commit will introduce a `StreamInvokeResult` type, which is an
`AsyncIterable` that also exposes a `cancel` function, whih does just
this.
Use it like this:
// `streamInvoke` to retrieve all updates to any `Deployment`, enumerate 0
// updates from the stream, then `cancel` giving the Kubernetes provider to
// clean up and close gracefully.
const deployments = await streamInvoke("kubernetes:kubernetes:watch", {
group: "apps", version: "v1", kind: "Deployment",
break;
});
deployments.cancel();
2019-11-04 19:36:11 +00:00
|
|
|
[Symbol.asyncIterator]() {
|
|
|
|
return this.source[Symbol.asyncIterator]();
|
2019-10-22 07:20:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-15 05:08:06 +00:00
|
|
|
function createInvokeRequest(tok: string, serialized: any, provider: string | undefined, opts: InvokeOptions) {
|
|
|
|
if (provider !== undefined && typeof provider !== "string") {
|
|
|
|
throw new Error("Incorrect provider type.");
|
|
|
|
}
|
|
|
|
|
|
|
|
const obj = gstruct.Struct.fromJavaScript(serialized);
|
|
|
|
|
2022-04-14 09:59:46 +00:00
|
|
|
const req = new resourceproto.ResourceInvokeRequest();
|
2019-10-15 05:08:06 +00:00
|
|
|
req.setTok(tok);
|
|
|
|
req.setArgs(obj);
|
2023-12-04 15:22:44 +00:00
|
|
|
req.setProvider(provider || "");
|
2019-10-15 05:08:06 +00:00
|
|
|
req.setVersion(opts.version || "");
|
2020-12-10 18:21:05 +00:00
|
|
|
req.setAcceptresources(!utils.disableResourceReferences);
|
2019-10-15 05:08:06 +00:00
|
|
|
return req;
|
|
|
|
}
|
|
|
|
|
|
|
|
function getProvider(tok: string, opts: InvokeOptions) {
|
2023-04-28 22:27:10 +00:00
|
|
|
return opts.provider ? opts.provider : opts.parent ? opts.parent.getProvider(tok) : undefined;
|
2019-10-15 05:08:06 +00:00
|
|
|
}
|
|
|
|
|
2024-03-02 00:00:57 +00:00
|
|
|
function deserializeResponse(
|
|
|
|
tok: string,
|
|
|
|
resp: { getFailuresList(): Array<providerproto.CheckFailure>; getReturn(): gstruct.Struct | undefined },
|
|
|
|
): Inputs | undefined {
|
|
|
|
const failures = resp.getFailuresList();
|
2023-04-29 02:16:01 +00:00
|
|
|
if (failures?.length) {
|
2019-10-15 05:08:06 +00:00
|
|
|
let reasons = "";
|
|
|
|
for (let i = 0; i < failures.length; i++) {
|
|
|
|
if (reasons !== "") {
|
|
|
|
reasons += "; ";
|
|
|
|
}
|
|
|
|
|
|
|
|
reasons += `${failures[i].getReason()} (${failures[i].getProperty()})`;
|
|
|
|
}
|
|
|
|
|
|
|
|
throw new Error(`Invoke of '${tok}' failed: ${reasons}`);
|
|
|
|
}
|
|
|
|
|
2019-11-07 02:41:49 +00:00
|
|
|
const ret = resp.getReturn();
|
2023-04-28 22:27:10 +00:00
|
|
|
return ret === undefined ? ret : deserializeProperties(ret);
|
2019-10-15 05:08:06 +00:00
|
|
|
}
|
2021-07-07 23:03:56 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* `call` dynamically calls the function, `tok`, which is offered by a provider plugin.
|
|
|
|
*/
|
|
|
|
export function call<T>(tok: string, props: Inputs, res?: Resource): Output<T> {
|
|
|
|
const label = `Calling function: tok=${tok}`;
|
|
|
|
log.debug(label + (excessiveDebugOutput ? `, props=${JSON.stringify(props)}` : ``));
|
|
|
|
|
|
|
|
const [out, resolver] = createOutput<T>(`call(${tok})`);
|
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
debuggablePromise(
|
|
|
|
Promise.resolve().then(async () => {
|
|
|
|
const done = rpcKeepAlive();
|
|
|
|
try {
|
|
|
|
// Construct a provider reference from the given provider, if one is available on the resource.
|
|
|
|
let provider: string | undefined = undefined;
|
|
|
|
let version: string | undefined = undefined;
|
|
|
|
let pluginDownloadURL: string | undefined = undefined;
|
|
|
|
if (res) {
|
|
|
|
if (res.__prov) {
|
|
|
|
provider = await ProviderResource.register(res.__prov);
|
|
|
|
}
|
|
|
|
version = res.__version;
|
|
|
|
pluginDownloadURL = res.__pluginDownloadURL;
|
2021-07-07 23:03:56 +00:00
|
|
|
}
|
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
// We keep output values when serializing inputs for call.
|
|
|
|
const [serialized, propertyDepsResources] = await serializePropertiesReturnDeps(`call:${tok}`, props, {
|
|
|
|
keepOutputValues: true,
|
|
|
|
});
|
|
|
|
log.debug(
|
|
|
|
`Call RPC prepared: tok=${tok}` + excessiveDebugOutput ? `, obj=${JSON.stringify(serialized)}` : ``,
|
|
|
|
);
|
|
|
|
|
|
|
|
const req = await createCallRequest(
|
|
|
|
tok,
|
|
|
|
serialized,
|
|
|
|
propertyDepsResources,
|
|
|
|
provider,
|
|
|
|
version,
|
|
|
|
pluginDownloadURL,
|
|
|
|
);
|
|
|
|
|
2024-03-02 00:00:57 +00:00
|
|
|
const monitor = getMonitor();
|
|
|
|
const resp = await debuggablePromise(
|
|
|
|
new Promise<providerproto.CallResponse>((innerResolve, innerReject) => {
|
|
|
|
if (monitor === undefined) {
|
|
|
|
throw new Error("No monitor available");
|
|
|
|
}
|
2023-04-28 22:27:10 +00:00
|
|
|
|
2024-03-02 00:00:57 +00:00
|
|
|
monitor.call(
|
|
|
|
req,
|
|
|
|
(err: grpc.ServiceError | null, innerResponse: providerproto.CallResponse) => {
|
|
|
|
log.debug(`Call RPC finished: tok=${tok}; err: ${err}, resp: ${innerResponse}`);
|
|
|
|
if (err) {
|
|
|
|
// If the monitor is unavailable, it is in the process of shutting down or has already
|
|
|
|
// shut down. Don't emit an error and don't do any more RPCs, just exit.
|
|
|
|
if (err.code === grpc.status.UNAVAILABLE || err.code === grpc.status.CANCELLED) {
|
|
|
|
terminateRpcs();
|
|
|
|
err.message = "Resource monitor is terminating";
|
|
|
|
innerReject(err);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the RPC failed, rethrow the error with a native exception and the message that
|
|
|
|
// the engine provided - it's suitable for user presentation.
|
|
|
|
innerReject(new Error(err.details));
|
|
|
|
} else {
|
|
|
|
innerResolve(innerResponse);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
);
|
|
|
|
}),
|
2023-04-28 22:27:10 +00:00
|
|
|
label,
|
|
|
|
);
|
|
|
|
|
|
|
|
// Deserialize the response and resolve the output.
|
|
|
|
const deserialized = deserializeResponse(tok, resp);
|
|
|
|
let isSecret = false;
|
|
|
|
const deps: Resource[] = [];
|
|
|
|
|
|
|
|
// Keep track of whether we need to mark the resulting output a secret.
|
|
|
|
// and unwrap each individual value.
|
2024-03-02 00:00:57 +00:00
|
|
|
if (deserialized !== undefined) {
|
|
|
|
for (const k of Object.keys(deserialized)) {
|
|
|
|
const v = deserialized[k];
|
|
|
|
if (isRpcSecret(v)) {
|
|
|
|
isSecret = true;
|
|
|
|
deserialized[k] = unwrapRpcSecret(v);
|
|
|
|
}
|
2023-04-28 22:27:10 +00:00
|
|
|
}
|
|
|
|
}
|
2021-07-07 23:03:56 +00:00
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
// Combine the individual dependencies into a single set of dependency resources.
|
|
|
|
const rpcDeps = resp.getReturndependenciesMap();
|
|
|
|
if (rpcDeps) {
|
|
|
|
const urns = new Set<string>();
|
|
|
|
for (const [, returnDeps] of rpcDeps.entries()) {
|
|
|
|
for (const urn of returnDeps.getUrnsList()) {
|
|
|
|
urns.add(urn);
|
2021-07-07 23:03:56 +00:00
|
|
|
}
|
|
|
|
}
|
2023-04-28 22:27:10 +00:00
|
|
|
for (const urn of urns) {
|
|
|
|
deps.push(new DependencyResource(urn));
|
2021-07-07 23:03:56 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
// If the value the engine handed back is or contains an unknown value, the resolver will mark its value as
|
|
|
|
// unknown automatically, so we just pass true for isKnown here. Note that unknown values will only be
|
|
|
|
// present during previews (i.e. isDryRun() will be true).
|
2024-03-02 00:00:57 +00:00
|
|
|
resolver(<any>deserialized, true, isSecret, deps);
|
2023-04-28 22:27:10 +00:00
|
|
|
} catch (e) {
|
|
|
|
resolver(<any>undefined, true, false, undefined, e);
|
|
|
|
} finally {
|
|
|
|
done();
|
2021-07-07 23:03:56 +00:00
|
|
|
}
|
2023-04-28 22:27:10 +00:00
|
|
|
}),
|
|
|
|
label,
|
|
|
|
);
|
2021-07-07 23:03:56 +00:00
|
|
|
|
|
|
|
return out;
|
|
|
|
}
|
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
function createOutput<T>(
|
|
|
|
label: string,
|
|
|
|
): [Output<T>, (v: T, isKnown: boolean, isSecret: boolean, deps?: Resource[], err?: Error | undefined) => void] {
|
2021-07-07 23:03:56 +00:00
|
|
|
let resolveValue: (v: T) => void;
|
|
|
|
let rejectValue: (err: Error) => void;
|
|
|
|
let resolveIsKnown: (v: boolean) => void;
|
|
|
|
let rejectIsKnown: (err: Error) => void;
|
|
|
|
let resolveIsSecret: (v: boolean) => void;
|
|
|
|
let rejectIsSecret: (err: Error) => void;
|
|
|
|
let resolveDeps: (v: Resource[]) => void;
|
|
|
|
let rejectDeps: (err: Error) => void;
|
|
|
|
|
|
|
|
const resolver = (v: T, isKnown: boolean, isSecret: boolean, deps: Resource[] = [], err?: Error) => {
|
2023-04-29 02:16:01 +00:00
|
|
|
if (err) {
|
2021-07-07 23:03:56 +00:00
|
|
|
rejectValue(err);
|
|
|
|
rejectIsKnown(err);
|
|
|
|
rejectIsSecret(err);
|
|
|
|
rejectDeps(err);
|
|
|
|
} else {
|
|
|
|
resolveValue(v);
|
|
|
|
resolveIsKnown(isKnown);
|
|
|
|
resolveIsSecret(isSecret);
|
|
|
|
resolveDeps(deps);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
const out = new Output(
|
|
|
|
[],
|
|
|
|
debuggablePromise(
|
|
|
|
new Promise<T>((resolve, reject) => {
|
|
|
|
resolveValue = resolve;
|
|
|
|
rejectValue = reject;
|
|
|
|
}),
|
2023-04-28 22:27:10 +00:00
|
|
|
`${label}Value`,
|
|
|
|
),
|
2021-07-07 23:03:56 +00:00
|
|
|
debuggablePromise(
|
|
|
|
new Promise<boolean>((resolve, reject) => {
|
|
|
|
resolveIsKnown = resolve;
|
|
|
|
rejectIsKnown = reject;
|
|
|
|
}),
|
2023-04-28 22:27:10 +00:00
|
|
|
`${label}IsKnown`,
|
|
|
|
),
|
2021-07-07 23:03:56 +00:00
|
|
|
debuggablePromise(
|
|
|
|
new Promise<boolean>((resolve, reject) => {
|
|
|
|
resolveIsSecret = resolve;
|
|
|
|
rejectIsSecret = reject;
|
|
|
|
}),
|
2023-04-28 22:27:10 +00:00
|
|
|
`${label}IsSecret`,
|
|
|
|
),
|
2021-07-07 23:03:56 +00:00
|
|
|
debuggablePromise(
|
|
|
|
new Promise<Resource[]>((resolve, reject) => {
|
|
|
|
resolveDeps = resolve;
|
|
|
|
rejectDeps = reject;
|
|
|
|
}),
|
2023-04-28 22:27:10 +00:00
|
|
|
`${label}Deps`,
|
|
|
|
),
|
|
|
|
);
|
2021-07-07 23:03:56 +00:00
|
|
|
|
|
|
|
return [out, resolver];
|
|
|
|
}
|
|
|
|
|
2023-04-28 22:27:10 +00:00
|
|
|
async function createCallRequest(
|
|
|
|
tok: string,
|
|
|
|
serialized: Record<string, any>,
|
|
|
|
serializedDeps: Map<string, Set<Resource>>,
|
|
|
|
provider?: string,
|
|
|
|
version?: string,
|
|
|
|
pluginDownloadURL?: string,
|
|
|
|
) {
|
2021-07-07 23:03:56 +00:00
|
|
|
if (provider !== undefined && typeof provider !== "string") {
|
|
|
|
throw new Error("Incorrect provider type.");
|
|
|
|
}
|
|
|
|
|
|
|
|
const obj = gstruct.Struct.fromJavaScript(serialized);
|
|
|
|
|
2024-02-08 13:16:23 +00:00
|
|
|
const req = new resourceproto.ResourceCallRequest();
|
2021-07-07 23:03:56 +00:00
|
|
|
req.setTok(tok);
|
|
|
|
req.setArgs(obj);
|
2023-12-04 15:22:44 +00:00
|
|
|
req.setProvider(provider || "");
|
2021-07-07 23:03:56 +00:00
|
|
|
req.setVersion(version || "");
|
2022-01-10 23:54:41 +00:00
|
|
|
req.setPlugindownloadurl(pluginDownloadURL || "");
|
2021-07-07 23:03:56 +00:00
|
|
|
|
|
|
|
const argDependencies = req.getArgdependenciesMap();
|
|
|
|
for (const [key, propertyDeps] of serializedDeps) {
|
|
|
|
const urns = new Set<string>();
|
|
|
|
for (const dep of propertyDeps) {
|
|
|
|
const urn = await dep.urn.promise();
|
|
|
|
urns.add(urn);
|
|
|
|
}
|
2024-02-08 13:16:23 +00:00
|
|
|
const deps = new resourceproto.ResourceCallRequest.ArgumentDependencies();
|
2021-07-07 23:03:56 +00:00
|
|
|
deps.setUrnsList(Array.from(urns));
|
|
|
|
argDependencies.set(key, deps);
|
|
|
|
}
|
|
|
|
|
|
|
|
return req;
|
|
|
|
}
|