logpuller: fix potential stuck#4358
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a potential deadlock or stuck state within the logpuller's subscription client. It enhances the robustness of the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughAdded context-cancellation checks and mutex protection to subscription client's pause/wakeup and event-pushing paths; Close now clears paused under lock and broadcasts to wake waiters. Tests add a mock dynamic stream and a test verifying pushRegionEventToDS unblocks on Close (duplicate test/mock blocks present). Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
The pull request effectively addresses a potential stuck issue in the logpuller by integrating context cancellation checks and ensuring correct synchronization with condition variables. The addition of mutex locks around s.cond.Broadcast() and the new test case TestPushRegionEventToDSUnblocksOnClose significantly improve the robustness and correctness of the subscriptionClient's pause/resume and shutdown mechanisms.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@logservice/logpuller/subscription_client_test.go`:
- Around line 37-39: The import ordering is incorrect: the import
"github.com/pingcap/ticdc/utils/dynstream" is in a separate group causing gci
formatting failure; move that import into the same block as the other
"github.com/pingcap/ticdc/*" imports in subscription_client_test.go (so it
groups with existing ticdc imports), then run make fmt (or gofmt/gci) to
reformat and verify the import groups are correct.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1a77cd4a-c622-47be-8406-46d6fbd19920
📒 Files selected for processing (2)
logservice/logpuller/subscription_client.gologservice/logpuller/subscription_client_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
logservice/logpuller/subscription_client_test.go (1)
225-235: Harden the pre-close blocking assertion to avoid flaky pass conditions.Line 231-Line 235 infers “blocked” via a fixed 100ms timeout. If the goroutine is late-scheduled, this can pass without truly validating the paused wait path. Prefer explicit synchronization before calling
Close().♻️ Suggested refactor
- done := make(chan struct{}) + entered := make(chan struct{}) + done := make(chan struct{}) go func() { + close(entered) client.pushRegionEventToDS(SubscriptionID(1), regionEvent{}) close(done) }() + <-entered select { case <-done: t.Fatal("pushRegionEventToDS should block when paused") - case <-time.After(100 * time.Millisecond): + default: }As per coding guidelines: "Applies to **/*_test.go : Use unit test files named
*_test.goin Go; favor deterministic tests and usetestify/require".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client_test.go` around lines 225 - 235, The test's assertion that pushRegionEventToDS is blocked uses a fixed 100ms timeout and can be flaky; change the test to deterministically synchronize with the goroutine before calling Close() by adding an explicit "entered wait" signal channel (e.g., started := make(chan struct{})) in the test wrapper goroutine that sends on started immediately before calling client.pushRegionEventToDS(SubscriptionID(1), regionEvent{}), then have the main test wait for <-started to guarantee the goroutine reached the paused state, assert that it hasn't completed, call client.Close() to unblock, and finally wait for completion; also replace t.Fatal/time.After checks with testify/require assertions for clarity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@logservice/logpuller/subscription_client_test.go`:
- Around line 225-235: The test's assertion that pushRegionEventToDS is blocked
uses a fixed 100ms timeout and can be flaky; change the test to
deterministically synchronize with the goroutine before calling Close() by
adding an explicit "entered wait" signal channel (e.g., started := make(chan
struct{})) in the test wrapper goroutine that sends on started immediately
before calling client.pushRegionEventToDS(SubscriptionID(1), regionEvent{}),
then have the main test wait for <-started to guarantee the goroutine reached
the paused state, assert that it hasn't completed, call client.Close() to
unblock, and finally wait for completion; also replace t.Fatal/time.After checks
with testify/require assertions for clarity.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cd34b234-2ff4-4dd3-ae08-b536f650a745
📒 Files selected for processing (1)
logservice/logpuller/subscription_client_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
logservice/logpuller/subscription_client.go (1)
477-486: Proper shutdown unblocking with graceful Push() handling.The mutex-protected broadcast correctly ensures blocked goroutines wake up during shutdown. There is a race window where a woken goroutine may call
ds.Push()(line 423) after or concurrent withds.Close(), but this is safely handled: theDynamicStream.Push()implementation checks if the stream is closed and silently drops the event viahandler.OnDrop()rather than panicking.Adding a defensive context check after exiting the wait loop (
if s.ctx.Err() != nil { return }) would avoid the unnecessary Push() call when the context is already cancelled, but it is not required for correctness.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/subscription_client.go` around lines 477 - 486, The Close method currently broadcasts to wake blocked goroutines but can let a woken goroutine continue past the wait loop and call ds.Push(); add a defensive context cancellation check (check s.ctx.Err() != nil) immediately after the wait loop in the goroutine (the same loop that currently calls ds.Push()) so that if the subscriptionClient context is cancelled the goroutine returns early and skips calling DynamicStream.Push(); update the goroutine that references ds.Push() and the wait/cond logic to perform this check before performing any Push() or other work to avoid unnecessary calls during shutdown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@logservice/logpuller/subscription_client.go`:
- Around line 477-486: The Close method currently broadcasts to wake blocked
goroutines but can let a woken goroutine continue past the wait loop and call
ds.Push(); add a defensive context cancellation check (check s.ctx.Err() != nil)
immediately after the wait loop in the goroutine (the same loop that currently
calls ds.Push()) so that if the subscriptionClient context is cancelled the
goroutine returns early and skips calling DynamicStream.Push(); update the
goroutine that references ds.Push() and the wait/cond logic to perform this
check before performing any Push() or other work to avoid unnecessary calls
during shutdown.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 60e8eb3d-ed3e-4b6a-8929-3226bb1963b8
📒 Files selected for processing (1)
logservice/logpuller/subscription_client.go
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: asddongmen, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
What problem does this PR solve?
Issue Number: close #4359
What is changed and how it works?
This pull request addresses a potential deadlock or stuck state within the logpuller's subscription client. It enhances the robustness of the
pushRegionEventToDSmechanism by integrating context cancellation checks and ensuring proper unblocking during client shutdown, thereby preventing indefinite waits and improving the client's lifecycle management.Highlights
pushRegionEventToDS: ThepushRegionEventToDSmethod now checks for context cancellation (s.ctx.Err()) before and during its waiting loop, ensuring it can exit gracefully if the context is canceled while the client is paused.pausedstate updates: Mutex locks (s.mu.Lock()ands.mu.Unlock()) were added arounds.paused.Store()operations withinhandleDSFeedBackto prevent race conditions when modifying the client's paused state.Closemethod now explicitly setss.paused.Store(false)and broadcasts to the condition variable (s.cond.Broadcast()), which ensures any goroutines blocked inpushRegionEventToDSare released during shutdown.TestPushRegionEventToDSUnblocksOnClose, was introduced to verify that thepushRegionEventToDSmethod correctly unblocks when thesubscriptionClientis closed.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Bug Fixes
Tests