[ISSUE #6508]♻️Refactor ProcessQueue to use a dedicated store struct for improved organization and clarity#6509
Conversation
…for improved organization and clarity
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThe PR refactors ProcessQueue to encapsulate message tree maps and offset tracking within a new ProcessQueueStore struct behind RwLock, improving organization and thread safety. Flow-control logging in the consumer implementation is updated to use an async get_offset_span() helper instead of direct map access. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/process_queue.rs (1)
428-430: Consider asserting msg_count never goes negative.
If negatives are not expected, a debug_assert can catch underflow early instead of silently clamping.💡 Possible tweak
pub fn msg_count(&self) -> u64 { - self.msg_count.load(Ordering::Acquire).max(0) as u64 + let v = self.msg_count.load(Ordering::Acquire); + debug_assert!(v >= 0, "msg_count should not be negative"); + v.max(0) as u64 }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/process_queue.rs` around lines 428 - 430, The msg_count method currently clamps a possibly-negative atomic value to 0; instead, load the atomic into a local (e.g., let v = self.msg_count.load(Ordering::Acquire)), add a debug_assert!(v >= 0, "msg_count underflow") to catch unexpected negatives in debug builds, then return v as u64 (or cast after verifying non-negative). Modify the msg_count function and reference the self.msg_count atomic in your change so underflow is detected early rather than silently clamped.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@rocketmq-client/src/consumer/consumer_impl/process_queue.rs`:
- Around line 428-430: The msg_count method currently clamps a possibly-negative
atomic value to 0; instead, load the atomic into a local (e.g., let v =
self.msg_count.load(Ordering::Acquire)), add a debug_assert!(v >= 0, "msg_count
underflow") to catch unexpected negatives in debug builds, then return v as u64
(or cast after verifying non-negative). Modify the msg_count function and
reference the self.msg_count atomic in your change so underflow is detected
early rather than silently clamped.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rsrocketmq-client/src/consumer/consumer_impl/process_queue.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6509 +/- ##
==========================================
- Coverage 42.16% 42.15% -0.01%
==========================================
Files 946 946
Lines 132130 132145 +15
==========================================
+ Hits 55708 55710 +2
- Misses 76422 76435 +13 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Refactor
Tests