from async generator #6699
Replies: 6 comments 2 replies
-
@benjamingr wdyt? |
Beta Was this translation helpful? Give feedback.
-
Moved to discussion as gh issue is for the bug tracking purpose. |
Beta Was this translation helpful? Give feedback.
-
also fyi, I feel there's some overlap for official abortsignal support discussion (#5863) though it is not exactly same thing. |
Beta Was this translation helpful? Give feedback.
-
I would be cool (in Node.js) with making our streams |
Beta Was this translation helpful? Give feedback.
-
@ronag I've actually been thinking quite a lot about this lately b/c I'm currently working on a library that exposes InteropObservables. However, since InteropObservables are tricky to work with, I'm actually thinking about the possibility of exposing "aboratable" Async Iterables, instead. But that's trickier than it looks, because RxJS These are some ideas that I got:
const delayOneSec = (signal: AbortSignal) => new Promise<void>((res, rej) => {
function onAbort() {
clearTimeout(token);
rej(new AbortError());
}
signal.addEventListener('abort', onAbort);
const token = setTimeout(() => {
signal.removeEventListener('abort', onAbort);
res();
}, 1_000);
});
async function* asyncCounter(signal: AbortSignal, init = 1) {
let count = init;
try {
while (true) {
await delayOneSec(signal);
yield count++;
}
} catch (e) {
if (e instanceof Error && e.name === 'AbortError') return;
throw e;
} finally {
// Maybe some extra cleanup goes here...
}
}
const fromCancellableAsyncIterable = <T, A extends Array<unknown>>(
cancelableAsyncGenerator: (
signal: AbortSignal,
...args: A
) => AsyncIterableIterator<T>,
...args: A
) => new Observable<T>((observer) => {
const ac = new AbortController();
const asyncIterator = cancelableAsyncGenerator(ac.signal, ...args);
(async () => {
for await (const value of asyncIterator) observer.next(value);
})().catch((e) => {
observer.error(e);
});
return () => {
ac.abort();
};
});
fromCancellableAsyncIterable(asyncCounter, 10)
.pipe(take(3))
.subscribe((x) => {
console.log('count', x);
}); You can find the full example here, in case that you want to fiddle with it. That way the There are other possible conventions, of course, but IMO this one (passing The real problem is that it's difficult to enforce a convention like this, when there is no way to identify Abort errors originating from controllers... So yeah, that's a bit of a bummer 😞. I mean, I guess that as part of the convention the Anyways, just some food for thought, I guess... |
Beta Was this translation helpful? Give feedback.
-
I'm not fully convinced I guess that's focusing on async generators, but as far as I know, there's no standard way of passing an If that's the case, then you'll have projects that will put the AbortSignal on the first parameter. Others will put it on the last parameter. Others will put it in a config object (similar to DOM's Wouldn't it be better to leave this up to the consumer? If we didn't use rxjs, and want to expose a "push" version of one async generator, we would do something like: const ac = new AbortController();
const iterable = myAsyncGenerator(myParams, { signal: ac.signal, otherOption: 'blah' })
const result = function (callback: (value: any) => void) {
async function run() {
let next;
while(!(next = await iterable.next()).done) {
callback(next.value);
}
}
run();
return () => ac.abort();
} If we want to do this in rxjs, it would be greatly simplified (assuming const ac = new AbortController();
const result$ = from(
myAsyncGenerator(myParams, { signal: ac.signal, otherOption: 'blah' })
).pipe(
finalize(() => ac.abort())
) I haven't tried, I'm probably missing something, but it should be something really close to this. But if you look closely, So as developers we need to abort that signal when our consumer unsubscribes from it. In my implementation it will also abort the signal once the asyncGenerator ends, but I think that should be alright? If a project uses a specific convention of how to use AbortSignals, then maybe it's up to a developer of that project of writing a helper function that will suit their own use case. But I don't think the AbortController should be of any concern to the internals of rxjs for this particular case. In a way it would be like adding a function in const ac = new AbortController();
const myPromise = doSomethingAsync(ac.signal);
from(myPromise).pipe(
finalize(() => ac.abort())
) Like.... yes, it could. But how exactly? What specific convention should we use? It's completely up to the developer that wants to call the function to pass in the parameters it wants in the order and shape they want. Edit - To make examples simpler I eagerly called the generator/promises. You'd probably want to wrap these into a |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Would be nice if we somehow had a helper for the following pattern:
Or something like that... the point here is that the creating the generator should take a signal which should be connected to the lifetime of the observable.
Beta Was this translation helpful? Give feedback.
All reactions