-
Notifications
You must be signed in to change notification settings - Fork 233
feat: add timeout option and graceful shutdown to Subscription.close() #2068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Implements a new `timeout` option (using `Duration`) for the `Subscription.close()` method. This provides more control over the shutdown process: - If `timeout` is zero, the subscription closes as quickly as possible without nacking buffered messages. - If `timeout` is positive, the subscription attempts to nack any buffered messages (in the lease manager) and waits up to the specified duration for pending acknowledgements and nacks to be sent to the server. - If no timeout is provided, the behavior remains as before (waits indefinitely for pending acks/modacks, no nacking). The core logic is implemented in `Subscriber.close()`. `PubSub.close()` documentation is updated to clarify its scope and recommend using `Subscription.close()` directly for this feature. Includes: - Unit tests for the new timeout behavior in `Subscriber.close()`. - A TypeScript sample (`samples/closeSubscriptionWithTimeout.ts`) demonstrating usage. - Updated JSDoc documentation for relevant methods.
| const remaining = this._inventory.clear(); | ||
|
|
||
| await this._waitForFlush(); | ||
| const options = this._options.closeOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional chaining?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this._options is guaranteed to be non-undefined, so I don't think it's necessary here. The lines that use options do optional chaining, since closeOptions may be undefined.
| // The timeout can't realistically be longer than the longest time we're willing | ||
| // to lease messages. | ||
| let timeout = durationAtMost( | ||
| options?.timeout ?? this.maxExtensionTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is maxExtensionTime a const? How do we know it will be available? If the former, maybe worth uppercasing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a config option with a default. So it's set by the constructor:
this.maxExtensionTime = defaultOptions.subscription.maxExtensionTime;
And then if the user passed a value, it overwrites the default.
miguelvelezsa
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would highly recommend to add types in all the new code :)
| * | ||
| * @private | ||
| */ | ||
| dispatched(): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be dispatch()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's less of a command, more of an event. Basically there is something else doing dispatching, and this method is called to notify telemetry and such. So I don't think I'd call it dispatch, but I'm open to other ideas.
src/subscriber.ts
Outdated
| const err = e as [unknown, boolean]; | ||
| if (err[1] === false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my own knowledge, how err[1] === true means was timeout? probably adding type for 'e' here will help :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this has to do with implementing the time-limited Promise wait... it's super annoying to make graceful. I'll look again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why it didn't give me a separate box for comments on the one above, but check out Sofia's comment on why I did this like I did.
I agree with you on the second one. I'll add an interface.
Can you elaborate here? The only time I tend to omit them is when they're super obvious (like making a class member like Edit: it didn't tag you :) @miguelvelezsa |
|
I need to look at a few more review comments before merging anything. |
…imeout' into feat-close-timeout
|
Warning: This pull request is touching the following templated files:
|
| if ( | ||
| behavior === SubscriberCloseBehaviors.WaitForProcessing && | ||
| !this._inventory.isEmpty | ||
| ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was just debugging the WaitForProcessing functionality locally and stumbled upon this check.
@feywind wouldn't this need to be the following? isEmpty seems to be undefined, instead this might have to be a method call?
| if ( | |
| behavior === SubscriberCloseBehaviors.WaitForProcessing && | |
| !this._inventory.isEmpty | |
| ) { | |
| if ( | |
| behavior === SubscriberCloseBehaviors.WaitForProcessing && | |
| !this._inventory.isEmpty() | |
| ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #2117 accordingly
Implements a new
timeoutandbehavioroption (usingDuration) for theSubscription.close()method. These options are on the SubscriberOptions/SubscriptionOptions passed in when opening a subscriber.This provides more control over the shutdown process:
(This is a re-open of #2037 to make it from the main repo.)