[ISSUE #6529]♻️Refactor pop order consume message handling for improved error handling and clarity#6530
[ISSUE #6529]♻️Refactor pop order consume message handling for improved error handling and clarity#6530rocketmq-rust-bot merged 2 commits intomainfrom
Conversation
…ed error handling and clarity
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis refactoring improves ordered pop message consumption by replacing direct message mutation with MessageAccessor-based updates, introducing blocking tasks for message processing, implementing explicit context passing throughout the flow, and splitting commit/rollback handling into distinct branches with specialized logic paths. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (1 warning, 2 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…ed error handling and clarity
There was a problem hiding this comment.
🧹 Nitpick comments (2)
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs (2)
319-328: Inconsistent suspend time handling for deprecatedRollbackstatus.The
SuspendCurrentQueueAMomentbranch (lines 301-305) derivessuspend_timefrom the context, whileRollbackuses a hardcoded1000ms. For consistency and to respect any custom suspend time the listener may have set, consider using the same pattern:♻️ Suggested change for consistency
#[allow(deprecated)] ConsumeOrderlyStatus::Rollback => { warn!( "Consumer group {} received deprecated Rollback status, reverting messages", self.consumer_group ); + let suspend_time = if context.get_suspend_current_queue_time_millis() > 0 { + context.get_suspend_current_queue_time_millis() as u64 + } else { + 1000 + }; for msg in msgs { - self.change_invisible_time(msg.as_ref(), 1000).await; + self.change_invisible_time(msg.as_ref(), suspend_time).await; } false }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs` around lines 319 - 328, The Rollback branch of ConsumeOrderlyStatus uses a hardcoded 1000ms suspend time; change it to use the same suspend_time value derived in the SuspendCurrentQueueAMoment branch (i.e., obtain suspend_time from the listener/context where SuspendCurrentQueueAMoment does) and call change_invisible_time(msg.as_ref(), suspend_time). Update the ConsumeOrderlyStatus::Rollback arm to mirror SuspendCurrentQueueAMoment's suspend_time retrieval and application so custom suspend times are respected for self.consumer_group and msgs.
301-305: Consider extracting suspend time derivation to reduce duplication.The same pattern for deriving
suspend_timefrom context appears at lines 301-305 and 607-609. A small helper method could improve clarity and reduce duplication:♻️ Optional helper extraction
impl ConsumeMessagePopOrderlyService { fn get_suspend_time(&self, context: &ConsumeOrderlyContext) -> u64 { if context.get_suspend_current_queue_time_millis() > 0 { context.get_suspend_current_queue_time_millis() as u64 } else { self.consumer_config.suspend_current_queue_time_millis } } }Then use
self.get_suspend_time(&context)in both locations.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs` around lines 301 - 305, Extract the repeated suspend_time derivation into a helper on ConsumeMessagePopOrderlyService: create a method fn get_suspend_time(&self, context: &ConsumeOrderlyContext) -> u64 that returns context.get_suspend_current_queue_time_millis() as u64 when > 0 otherwise returns self.consumer_config.suspend_current_queue_time_millis, then replace the duplicated logic at the sites using suspend_time (the blocks referencing context.get_suspend_current_queue_time_millis() at lines shown) with self.get_suspend_time(&context).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs`:
- Around line 319-328: The Rollback branch of ConsumeOrderlyStatus uses a
hardcoded 1000ms suspend time; change it to use the same suspend_time value
derived in the SuspendCurrentQueueAMoment branch (i.e., obtain suspend_time from
the listener/context where SuspendCurrentQueueAMoment does) and call
change_invisible_time(msg.as_ref(), suspend_time). Update the
ConsumeOrderlyStatus::Rollback arm to mirror SuspendCurrentQueueAMoment's
suspend_time retrieval and application so custom suspend times are respected for
self.consumer_group and msgs.
- Around line 301-305: Extract the repeated suspend_time derivation into a
helper on ConsumeMessagePopOrderlyService: create a method fn
get_suspend_time(&self, context: &ConsumeOrderlyContext) -> u64 that returns
context.get_suspend_current_queue_time_millis() as u64 when > 0 otherwise
returns self.consumer_config.suspend_current_queue_time_millis, then replace the
duplicated logic at the sites using suspend_time (the blocks referencing
context.get_suspend_current_queue_time_millis() at lines shown) with
self.get_suspend_time(&context).
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6530 +/- ##
==========================================
- Coverage 42.08% 42.07% -0.02%
==========================================
Files 947 947
Lines 132264 132311 +47
==========================================
+ Hits 55666 55667 +1
- Misses 76598 76644 +46 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Bug Fixes
Refactor