pulumi/sdk/nodejs/runtime/asyncIterableUtil.ts

85 lines
3.0 KiB
TypeScript
Raw Normal View History

`StreamInvoke` should return `AsyncIterable` that completes A user who calls `StreamInvoke` probably expects the `AsyncIterable` that is returned to gracefully terminate. This is currently not the case. Where does something like this go wrong? A better question might be where any of this went right, because several days later, after wandering into civilization from the great Wilderness of Bugs, I must confess that I've forgotten if any of it had. `AsyncIterable` is a pull-based API. `for await (...)` will continuously call `next` ("pull") on the underlying `AsyncIterator` until the iterable is exhausted. But, gRPC's streaming-return API is _push_ based. That is to say, when a streaming RPC is called, data is provided by callback on the stream object, like: call.on("data", (thing: any) => {... do thing ...}); Our goal in `StreamInvoke` is to convert the push-based gRPC routines into the pull-based `AsyncIterable` retrun type. You may remember your CS theory this is one of those annoying "fundamental mismatches" in abstraction. So we're off to a good start. Until this point, we've depended on a library, `callback-to-async-iterator` to handle the details of being this bridge. Our trusting nature and innocent charm has mislead us. This library is not worthy of our trust. Instead of doing what we'd like it to do, it returns (in our case) an `AsyncIterable` that will never complete. Yes,, this `AsyncIterable` will patiently wait for eternity, which honestly is kind of poetic when you sit down in a nice bath and think about that fun time you considered eating your computer instead of finishing this idiotic bug. Indeed, this is the sort of bug that you wonder where it even comes from. Our query libraries? Why aren't these `finally` blocks executing? Is our language host terminating early? Is gRPC angry at me, and just passive-aggrssively not servicing some of my requests? Oh god I've been up for 48 hours, why is that wallpaper starting to move? And by the way, a fun interlude to take in an otherwise very productive week is to try to understand the gRPC streaming node client, which is code-gen'd, but which also takes the liberty of generating itself at runtime, so that gRPC is code-gen'ing a code-gen routine, which makes the whole thing un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't need to understand any of this anyway, thanks friends. But we've come out the other side knowing that the weak link in this very sorry chain of incredibly weak links, is this dependency. This commit removes this dependency for a better monster: the one we know. It is at this time that I'd like to announce that I am quitting my job at Pulumi. I thank you all for the good times, but mostly, for taking this code over for me.
2019-11-11 23:00:46 +00:00
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { AsyncIterable } from "@pulumi/query/interfaces";
type CloseValue = "7473659d-924c-414d-84e5-b1640b2a6296";
const closeValue: CloseValue = "7473659d-924c-414d-84e5-b1640b2a6296";
// PushableAsyncIterable is an `AsyncIterable` that data can be pushed to. It is useful for turning
// push-based callback APIs into pull-based `AsyncIterable` APIs. For example, a user can write:
//
// const queue = new PushableAsyncIterable();
// call.on("data", (thing: any) => queue.push(live));
//
// And then later consume `queue` as any other `AsyncIterable`:
//
// for await (const l of list) {
// console.log(l.metadata.name);
// }
//
// Note that this class implements `AsyncIterable<T | undefined>`. This is for a fundamental reason:
// the user can call `complete` at any time. `AsyncIteratable` would normally know when an element
// is the last, but in this case it can't. Or, another way to look at it is, the last element is
// guaranteed to be `undefined`.
/** @internal */
`StreamInvoke` should return `AsyncIterable` that completes A user who calls `StreamInvoke` probably expects the `AsyncIterable` that is returned to gracefully terminate. This is currently not the case. Where does something like this go wrong? A better question might be where any of this went right, because several days later, after wandering into civilization from the great Wilderness of Bugs, I must confess that I've forgotten if any of it had. `AsyncIterable` is a pull-based API. `for await (...)` will continuously call `next` ("pull") on the underlying `AsyncIterator` until the iterable is exhausted. But, gRPC's streaming-return API is _push_ based. That is to say, when a streaming RPC is called, data is provided by callback on the stream object, like: call.on("data", (thing: any) => {... do thing ...}); Our goal in `StreamInvoke` is to convert the push-based gRPC routines into the pull-based `AsyncIterable` retrun type. You may remember your CS theory this is one of those annoying "fundamental mismatches" in abstraction. So we're off to a good start. Until this point, we've depended on a library, `callback-to-async-iterator` to handle the details of being this bridge. Our trusting nature and innocent charm has mislead us. This library is not worthy of our trust. Instead of doing what we'd like it to do, it returns (in our case) an `AsyncIterable` that will never complete. Yes,, this `AsyncIterable` will patiently wait for eternity, which honestly is kind of poetic when you sit down in a nice bath and think about that fun time you considered eating your computer instead of finishing this idiotic bug. Indeed, this is the sort of bug that you wonder where it even comes from. Our query libraries? Why aren't these `finally` blocks executing? Is our language host terminating early? Is gRPC angry at me, and just passive-aggrssively not servicing some of my requests? Oh god I've been up for 48 hours, why is that wallpaper starting to move? And by the way, a fun interlude to take in an otherwise very productive week is to try to understand the gRPC streaming node client, which is code-gen'd, but which also takes the liberty of generating itself at runtime, so that gRPC is code-gen'ing a code-gen routine, which makes the whole thing un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't need to understand any of this anyway, thanks friends. But we've come out the other side knowing that the weak link in this very sorry chain of incredibly weak links, is this dependency. This commit removes this dependency for a better monster: the one we know. It is at this time that I'd like to announce that I am quitting my job at Pulumi. I thank you all for the good times, but mostly, for taking this code over for me.
2019-11-11 23:00:46 +00:00
export class PushableAsyncIterable<T> implements AsyncIterable<T | undefined> {
private bufferedData: T[] = [];
private nextQueue: ((payload: T | CloseValue) => void)[] = [];
private completed = false;
push(payload: T) {
if (this.nextQueue.length === 0) {
this.bufferedData.push(payload);
} else {
const resolve = this.nextQueue.shift()!;
resolve(payload);
}
}
complete() {
this.completed = true;
if (this.nextQueue.length > 0) {
const resolve = this.nextQueue.shift()!;
resolve(closeValue);
}
}
private shift(): Promise<T | CloseValue> {
return new Promise((resolve) => {
`StreamInvoke` should return `AsyncIterable` that completes A user who calls `StreamInvoke` probably expects the `AsyncIterable` that is returned to gracefully terminate. This is currently not the case. Where does something like this go wrong? A better question might be where any of this went right, because several days later, after wandering into civilization from the great Wilderness of Bugs, I must confess that I've forgotten if any of it had. `AsyncIterable` is a pull-based API. `for await (...)` will continuously call `next` ("pull") on the underlying `AsyncIterator` until the iterable is exhausted. But, gRPC's streaming-return API is _push_ based. That is to say, when a streaming RPC is called, data is provided by callback on the stream object, like: call.on("data", (thing: any) => {... do thing ...}); Our goal in `StreamInvoke` is to convert the push-based gRPC routines into the pull-based `AsyncIterable` retrun type. You may remember your CS theory this is one of those annoying "fundamental mismatches" in abstraction. So we're off to a good start. Until this point, we've depended on a library, `callback-to-async-iterator` to handle the details of being this bridge. Our trusting nature and innocent charm has mislead us. This library is not worthy of our trust. Instead of doing what we'd like it to do, it returns (in our case) an `AsyncIterable` that will never complete. Yes,, this `AsyncIterable` will patiently wait for eternity, which honestly is kind of poetic when you sit down in a nice bath and think about that fun time you considered eating your computer instead of finishing this idiotic bug. Indeed, this is the sort of bug that you wonder where it even comes from. Our query libraries? Why aren't these `finally` blocks executing? Is our language host terminating early? Is gRPC angry at me, and just passive-aggrssively not servicing some of my requests? Oh god I've been up for 48 hours, why is that wallpaper starting to move? And by the way, a fun interlude to take in an otherwise very productive week is to try to understand the gRPC streaming node client, which is code-gen'd, but which also takes the liberty of generating itself at runtime, so that gRPC is code-gen'ing a code-gen routine, which makes the whole thing un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't need to understand any of this anyway, thanks friends. But we've come out the other side knowing that the weak link in this very sorry chain of incredibly weak links, is this dependency. This commit removes this dependency for a better monster: the one we know. It is at this time that I'd like to announce that I am quitting my job at Pulumi. I thank you all for the good times, but mostly, for taking this code over for me.
2019-11-11 23:00:46 +00:00
if (this.bufferedData.length === 0) {
if (this.completed === true) {
resolve(closeValue);
}
this.nextQueue.push(resolve);
} else {
resolve(this.bufferedData.shift());
}
});
}
[Symbol.asyncIterator]() {
const t = this;
return {
async next(): Promise<{ done: boolean; value: T | undefined }> {
`StreamInvoke` should return `AsyncIterable` that completes A user who calls `StreamInvoke` probably expects the `AsyncIterable` that is returned to gracefully terminate. This is currently not the case. Where does something like this go wrong? A better question might be where any of this went right, because several days later, after wandering into civilization from the great Wilderness of Bugs, I must confess that I've forgotten if any of it had. `AsyncIterable` is a pull-based API. `for await (...)` will continuously call `next` ("pull") on the underlying `AsyncIterator` until the iterable is exhausted. But, gRPC's streaming-return API is _push_ based. That is to say, when a streaming RPC is called, data is provided by callback on the stream object, like: call.on("data", (thing: any) => {... do thing ...}); Our goal in `StreamInvoke` is to convert the push-based gRPC routines into the pull-based `AsyncIterable` retrun type. You may remember your CS theory this is one of those annoying "fundamental mismatches" in abstraction. So we're off to a good start. Until this point, we've depended on a library, `callback-to-async-iterator` to handle the details of being this bridge. Our trusting nature and innocent charm has mislead us. This library is not worthy of our trust. Instead of doing what we'd like it to do, it returns (in our case) an `AsyncIterable` that will never complete. Yes,, this `AsyncIterable` will patiently wait for eternity, which honestly is kind of poetic when you sit down in a nice bath and think about that fun time you considered eating your computer instead of finishing this idiotic bug. Indeed, this is the sort of bug that you wonder where it even comes from. Our query libraries? Why aren't these `finally` blocks executing? Is our language host terminating early? Is gRPC angry at me, and just passive-aggrssively not servicing some of my requests? Oh god I've been up for 48 hours, why is that wallpaper starting to move? And by the way, a fun interlude to take in an otherwise very productive week is to try to understand the gRPC streaming node client, which is code-gen'd, but which also takes the liberty of generating itself at runtime, so that gRPC is code-gen'ing a code-gen routine, which makes the whole thing un-introspectable, un-debuggable, and un-knowable. That's fine, I didn't need to understand any of this anyway, thanks friends. But we've come out the other side knowing that the weak link in this very sorry chain of incredibly weak links, is this dependency. This commit removes this dependency for a better monster: the one we know. It is at this time that I'd like to announce that I am quitting my job at Pulumi. I thank you all for the good times, but mostly, for taking this code over for me.
2019-11-11 23:00:46 +00:00
const value = await t.shift();
if (value === closeValue) {
return { value: undefined, done: true };
}
return { value, done: false };
},
};
}
}