-
-
Notifications
You must be signed in to change notification settings - Fork 53
Fix for cancellation token registration in case of already canceled token #317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR ensures that when a cancellation token is already canceled, the consumer immediately sets the channel to canceled instead of registering and queuing a waiter.
- Adds a
synchronouslyCanceledflag to detect and short-circuit already-canceled tokens in both single-message and batch receive paths - Updates both
MultiTopicsConsumerImpl.fsandConsumerImpl.fswith the new registration logic - Removes warning code
40from the project’s<NoWarn>settings
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| src/Pulsar.Client/Pulsar.Client.fsproj | Dropped warning code 40 from <NoWarn> |
| src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs | Introduced synchronous cancellation detection in receive/batch methods |
| src/Pulsar.Client/Internal/ConsumerImpl.fs | Mirrored synchronous cancellation logic for the single and batch paths |
Comments suppressed due to low confidence (2)
src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs:574
- Use the same variable name (
tokenRegistration) in both branches for consistency instead ofregistrationhere.
let registration =
src/Pulsar.Client/Pulsar.Client.fsproj:36
- [nitpick] Removing warning code
40from<NoWarn>will re-enable those warnings; please confirm if that was intended or add a comment explaining the rationale.
<NoWarn>3186</NoWarn>
| None | ||
| waiters.AddLast(struct(tokenRegistration, channel)) |> ignore | ||
| Log.Logger.LogDebug("{0} Receive waiting", prefix) | ||
| if synchronouslyCanceled then |
Copilot
AI
May 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before calling channel.SetCanceled(), dispose the registration (tokenRegistration.Value.Dispose()) when present to avoid leaking the cancellation callback.
| if synchronouslyCanceled then | |
| if synchronouslyCanceled then | |
| match tokenRegistration with | |
| | Some registration -> registration.Dispose() | |
| | None -> () |
| let mutable synchronouslyCanceled = false | ||
| let tokenRegistration = | ||
| if cancellationToken.CanBeCanceled then | ||
| let rec cancellationTokenRegistration = | ||
| let mutable cancellationTokenRegistration = None | ||
| cancellationTokenRegistration <- | ||
| cancellationToken.Register((fun () -> | ||
| Log.Logger.LogDebug("{0} receive cancelled", prefix) | ||
| post this.Mb (CancelWaiter(cancellationTokenRegistration, channel)) | ||
| match cancellationTokenRegistration with | ||
| | None -> synchronouslyCanceled <- true | ||
| | Some _ -> post this.Mb (CancelWaiter(cancellationTokenRegistration, channel)) | ||
| ), false) |> Some | ||
| cancellationTokenRegistration | ||
| else | ||
| None |
Copilot
AI
May 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The cancellation-registration logic is duplicated between the single and batch receive branches; consider extracting it into a shared helper to reduce duplication.
| None | ||
| waiters.AddLast(struct(tokenRegistration, channel)) |> ignore | ||
| Log.Logger.LogDebug("{0} Receive waiting", prefix) | ||
| if synchronouslyCanceled then |
Copilot
AI
May 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, dispose the tokenRegistration when present before calling channel.SetCanceled() to release the registration resource.
| if synchronouslyCanceled then | |
| if synchronouslyCanceled then | |
| match tokenRegistration with | |
| | Some registration -> registration.Dispose() | |
| | None -> () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes cancellation token registration issues that occur when the token is already canceled, by introducing logic to detect synchronous cancellation.
- Update package version and release notes in the project file.
- Refactor cancellation token handling in both MultiTopicsConsumerImpl.fs and ConsumerImpl.fs to properly detect and respond to synchronous cancellation events.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| src/Pulsar.Client/Pulsar.Client.fsproj | Bumps version to 3.9.4, updates release notes, and adjusts warning settings. |
| src/Pulsar.Client/Internal/MultiTopicsConsumerImpl.fs | Modifies cancellation token registration logic to handle already canceled tokens. |
| src/Pulsar.Client/Internal/ConsumerImpl.fs | Similar cancellation token handling updates as in MultiTopicsConsumerImpl.fs for consistency. |
Comments suppressed due to low confidence (1)
src/Pulsar.Client/Pulsar.Client.fsproj:36
- Confirm that removing warning code '40' is intentional and that any new warnings triggered by this change are acceptable in the build process.
<NoWarn>3186</NoWarn>
| let tokenRegistration = | ||
| if cancellationToken.CanBeCanceled then | ||
| let rec cancellationTokenRegistration = | ||
| let mutable cancellationTokenRegistration = None |
Copilot
AI
May 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a comment explaining the use of mutable cancellationTokenRegistration for synchronous cancellation detection to help future maintainers understand the ordering logic.
| let mutable cancellationTokenRegistration = None | |
| let mutable cancellationTokenRegistration = None | |
| // The mutable cancellationTokenRegistration is used to handle synchronous cancellation detection. | |
| // If the cancellation token is triggered before registration completes, the registration will be None, | |
| // and we set synchronouslyCanceled to true. Otherwise, we post a CancelWaiter message to handle | |
| // the cancellation asynchronously. This ensures correct ordering and avoids race conditions. |
| else | ||
| let mutable synchronouslyCanceled = false | ||
| let tokenRegistration = | ||
| if cancellationToken.CanBeCanceled then |
Copilot
AI
May 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a comment to clarify the rationale behind using a mutable cancellationTokenRegistration for detecting synchronous cancellations, which will aid in understanding the flow in concurrent scenarios.
| if cancellationToken.CanBeCanceled then | |
| if cancellationToken.CanBeCanceled then | |
| // Using a mutable cancellationTokenRegistration to detect synchronous cancellations. | |
| // This allows us to handle cases where the cancellation is requested immediately after | |
| // the token is registered but before the registration is stored. The mutable variable | |
| // ensures that we can track and dispose of the registration correctly in such scenarios. |
Issue #312