[ISSUE #6511]♻️Refactor consume message handling and concurrency management for improved clarity and performance#6512
Conversation
…gement for improved clarity and performance
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis PR refactors message consumption handling and lock management in the RocketMQ client consumer. The primary changes involve replacing direct listener invocation with spawned blocking tasks, adjusting status code mappings for auto-commit and manual-commit scenarios, updating lock types from a custom wrapper to standard Tokio RwLock, and refining concurrency synchronization semantics. Changes
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1)
125-130: Consider releasing the consume_lock guard before async I/O if it’s only a try-lock check.
consume_lockstays alive acrosspersist/remove_offset/unlock, which can hold the queue lock during network I/O. If the intent is only to check lock availability, drop it before the awaits; otherwise, add a short comment to clarify the deliberate long hold.♻️ Optional refactor (release guard before async I/O)
- let consume_lock = tokio::time::timeout(Duration::from_millis(500), pq.consume_lock.write()) - .await - .ok(); - if force_unlock || consume_lock.is_some() { + let consume_lock = tokio::time::timeout(Duration::from_millis(500), pq.consume_lock.write()) + .await + .ok(); + let lock_acquired = consume_lock.is_some(); + drop(consume_lock); + if force_unlock || lock_acquired { let Some(offset_store) = default_mqpush_consumer_impl.offset_store.as_mut() else { error!("Offset store not initialized"); return false; };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs` around lines 125 - 130, The consume_lock guard returned by pq.consume_lock.write() is held across subsequent async I/O calls (persist/remove_offset/unlock) — either drop the guard immediately after using it as a mere availability check or explicitly document why the guard must be held across awaits; locate the check around pq.is_dropped(), UNLOCK_DELAY_TIME_MILLS and the consume_lock variable in rebalance_push_impl.rs (where consume_lock is assigned from tokio::time::timeout(... pq.consume_lock.write() ...)) and: if the intent is only to test lock availability, release (drop) consume_lock before calling persist/remove_offset/unlock so the queue lock is not held during network I/O, otherwise add a short comment above the block explaining the deliberate long-lived lock and why awaits are safe while holding it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs`:
- Around line 125-130: The consume_lock guard returned by
pq.consume_lock.write() is held across subsequent async I/O calls
(persist/remove_offset/unlock) — either drop the guard immediately after using
it as a mere availability check or explicitly document why the guard must be
held across awaits; locate the check around pq.is_dropped(),
UNLOCK_DELAY_TIME_MILLS and the consume_lock variable in rebalance_push_impl.rs
(where consume_lock is assigned from tokio::time::timeout(...
pq.consume_lock.write() ...)) and: if the intent is only to test lock
availability, release (drop) consume_lock before calling
persist/remove_offset/unlock so the queue lock is not held during network I/O,
otherwise add a short comment above the block explaining the deliberate
long-lived lock and why awaits are safe while holding it.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rsrocketmq-client/src/consumer/consumer_impl/process_queue.rsrocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6512 +/- ##
==========================================
- Coverage 42.15% 42.15% -0.01%
==========================================
Files 946 946
Lines 132145 132158 +13
==========================================
+ Hits 55710 55711 +1
- Misses 76435 76447 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit