[ISSUE #6592]🚀Add lite pull consumer implementation with assigned message queue management#6593
[ISSUE #6592]🚀Add lite pull consumer implementation with assigned message queue management#6593rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
…sage queue management
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis PR introduces a lite pull consumer implementation for RocketMQ by adding three new modules: Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (1)
168-175: Use.try_into()or clamp unsigned conversions for safer config mapping.Lines 168, 170, and 175 cast from signed types (i64, i32) to u32 without validation. While these fields are never set to negative values in practice (defaults: 1000, 100, 10) and -1 sentinels appear only on different fields (pull_threshold_for_topic/-size), defensive conversion patterns would improve code robustness:
+ let pull_threshold_for_queue = u32::try_from(self.pull_threshold_for_queue) + .unwrap_or(1000); + let pull_threshold_size_for_queue = u32::try_from(self.pull_threshold_size_for_queue) + .unwrap_or(100); + let pull_batch_size = u32::try_from(self.pull_batch_size) + .unwrap_or(10); + ArcMut::new(ConsumerConfig { - pull_threshold_for_queue: self.pull_threshold_for_queue as u32, + pull_threshold_for_queue, - pull_threshold_size_for_queue: self.pull_threshold_size_for_queue as u32, + pull_threshold_size_for_queue, - pull_batch_size: self.pull_batch_size as u32, + pull_batch_size,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 168 - 175, The struct construction in default_lite_pull_consumer_impl.rs is unsafely casting signed integers to u32 for pull_threshold_for_queue, pull_threshold_size_for_queue and pull_batch_size; replace the direct casts with fallible conversions (e.g. use std::convert::TryInto or u32::try_from) and handle the Err case by clamping negatives to 0 and values exceeding u32::MAX to u32::MAX (or otherwise providing a safe default), so the mapping from the original signed types to the target u32 fields is validated and defensive.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rs`:
- Around line 150-155: The update_consume_offset currently updates offsets by
MessageQueue key only and can be overwritten by stale tasks; modify
update_consume_offset to first read the queue_map entry (as you do) and then
verify the assigned ProcessQueue identity/generation token on the retrieved
value before storing: e.g., read an atomic identifier field on the retrieved aq
(use the existing process_queue_id/generation field on the AssignedMessageQueue
or add one if missing), compare it to the expected id from the incoming
MessageQueue (or pass the expected id into update_consume_offset), and only call
aq.consume_offset.store(offset, Ordering::Release) if the ids match; otherwise
skip the update to avoid stale-task overwrites. Ensure the id read and
comparison are done atomically to avoid races.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 342-346: The current do_rebalance method uses unimplemented! and
will panic at runtime; replace the panic with a concrete implementation that
either delegates to the existing RebalanceLitePullImpl via the Rebalance trait
or performs a safe no-op that returns/propagates an appropriate error;
specifically, in async fn do_rebalance(&self) when SubscriptionType::Subscribe
is detected, call the Rebalance trait implementation on the
RebalanceLitePullImpl instance used by this consumer (or call its rebalance
method) and propagate any Result/Err back to the caller instead of panicking,
and for the other similar methods (the places flagged at the other line ranges)
follow the same pattern: remove unimplemented!, implement delegation to the
Rebalance impl or return a defined error type so callers never cause a process
panic.
---
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 168-175: The struct construction in
default_lite_pull_consumer_impl.rs is unsafely casting signed integers to u32
for pull_threshold_for_queue, pull_threshold_size_for_queue and pull_batch_size;
replace the direct casts with fallible conversions (e.g. use
std::convert::TryInto or u32::try_from) and handle the Err case by clamping
negatives to 0 and values exceeding u32::MAX to u32::MAX (or otherwise providing
a safe default), so the mapping from the original signed types to the target u32
fields is validated and defensive.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
rocketmq-client/src/consumer/consumer_impl.rsrocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rsrocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsrocketmq-client/src/consumer/consumer_impl/lite_pull_consume_request.rs
| /// Updates the consume offset for the specified queue. | ||
| pub async fn update_consume_offset(&self, mq: &MessageQueue, offset: i64) { | ||
| let map = self.queue_map.read().await; | ||
| if let Some(aq) = map.get(mq) { | ||
| aq.consume_offset.store(offset, Ordering::Release); | ||
| } |
There was a problem hiding this comment.
Guard consume offset updates with process-queue identity.
At Line 151-Line 155, consume offsets are updated by MessageQueue key only. A stale task from a previous assignment can overwrite offsets after reassignment.
Proposed fix
- pub async fn update_consume_offset(&self, mq: &MessageQueue, offset: i64) {
+ pub async fn update_consume_offset(
+ &self,
+ mq: &MessageQueue,
+ offset: i64,
+ process_queue: &Arc<ProcessQueue>,
+ ) {
let map = self.queue_map.read().await;
if let Some(aq) = map.get(mq) {
- aq.consume_offset.store(offset, Ordering::Release);
+ if Arc::ptr_eq(&aq.process_queue, process_queue) {
+ aq.consume_offset.store(offset, Ordering::Release);
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Updates the consume offset for the specified queue. | |
| pub async fn update_consume_offset(&self, mq: &MessageQueue, offset: i64) { | |
| let map = self.queue_map.read().await; | |
| if let Some(aq) = map.get(mq) { | |
| aq.consume_offset.store(offset, Ordering::Release); | |
| } | |
| /// Updates the consume offset for the specified queue. | |
| pub async fn update_consume_offset( | |
| &self, | |
| mq: &MessageQueue, | |
| offset: i64, | |
| process_queue: &Arc<ProcessQueue>, | |
| ) { | |
| let map = self.queue_map.read().await; | |
| if let Some(aq) = map.get(mq) { | |
| if Arc::ptr_eq(&aq.process_queue, process_queue) { | |
| aq.consume_offset.store(offset, Ordering::Release); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rs` around
lines 150 - 155, The update_consume_offset currently updates offsets by
MessageQueue key only and can be overwritten by stale tasks; modify
update_consume_offset to first read the queue_map entry (as you do) and then
verify the assigned ProcessQueue identity/generation token on the retrieved
value before storing: e.g., read an atomic identifier field on the retrieved aq
(use the existing process_queue_id/generation field on the AssignedMessageQueue
or add one if missing), compare it to the expected id from the incoming
MessageQueue (or pass the expected id into update_consume_offset), and only call
aq.consume_offset.store(offset, Ordering::Release) if the ids match; otherwise
skip the update to avoid stale-task overwrites. Ensure the id read and
comparison are done atomically to avoid races.
| async fn do_rebalance(&self) { | ||
| if *self.subscription_type.read().await == SubscriptionType::Subscribe { | ||
| // Rebalance implementation delegates to RebalanceLitePullImpl via the Rebalance trait | ||
| unimplemented!("do_rebalance is not yet implemented") | ||
| } |
There was a problem hiding this comment.
unimplemented! in runtime trait methods will panic consumers.
At Line 345, Line 356, and Line 361, any call path hitting these methods will crash the process. These need concrete behavior (or a safe no-op with explicit error propagation) before merge.
Also applies to: 354-357, 359-362
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 342 - 346, The current do_rebalance method uses unimplemented! and
will panic at runtime; replace the panic with a concrete implementation that
either delegates to the existing RebalanceLitePullImpl via the Rebalance trait
or performs a safe no-op that returns/propagates an appropriate error;
specifically, in async fn do_rebalance(&self) when SubscriptionType::Subscribe
is detected, call the Rebalance trait implementation on the
RebalanceLitePullImpl instance used by this consumer (or call its rebalance
method) and propagate any Result/Err back to the caller instead of panicking,
and for the other similar methods (the places flagged at the other line ranges)
follow the same pattern: remove unimplemented!, implement delegation to the
Rebalance impl or return a defined error type so callers never cause a process
panic.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6593 +/- ##
==========================================
- Coverage 41.84% 41.77% -0.07%
==========================================
Files 956 959 +3
Lines 133557 133824 +267
==========================================
+ Hits 55884 55905 +21
- Misses 77673 77919 +246 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit