-
Notifications
You must be signed in to change notification settings - Fork 0
Description
I started evaluating using this AsyncHelper to process messages one after another (because the nodejs client lib just starts them all at same time).
I copied over the code as is and used it to wrap my async "userHandler".
After some short testing I saw that the messages are still handled simultanously.
I also found a fix :)
This is the problematic area:
nodejs-pubsub/src/async-helper.ts
Lines 92 to 101 in 02b42bf
| this.tailPromise.then(() => { | |
| const message = this.queue.shift(); | |
| if (!message) { | |
| // No message -> go back to resolve() to signal ready. | |
| this.tailPromise = Promise.resolve(); | |
| } else { | |
| // Message -> chain to the previous tail and replace it. | |
| this.tailPromise = this.userHandler(message); | |
| } | |
| }); |
many messages are simply chaining themselves to the original Promise.resolve().
you can visualize it as a root of a tree and each message is linked to the root, creating a broad "second layer" instead of chaining to each other to create a long chain.
A quick fix that worked for me:
this.tailPromise = this.tailPromise.then(async () => { // <-- immidiately (and sync) replacing the tail with the "tail+then"
const message = this.queue.shift();
if (!message) {
// No message -> go back to resolve() to signal ready.
this.tailPromise = Promise.resolve();
} else {
return this.userHandler(message); // <-- returning the handler, so that a chained "then"actually waits for this to complete.
}
});
...
this causes all message to be handled one after another, which might be good or bad depending on your case, but I believe this is what this AsyncHelper was trying to do.
feel free to correct me if I got it wrong.
BTW I also continued to adapt this initial code so there are multiple "chains", one for each orderingKey, so only ordered messages (with ordering keys) form these kind of "wait chains" that dont affect other messages of other ordering keys (or not ordered). there's also some boring cleanup code to clear up unused chains...