[ISSUE #6605]🚀Add consume message hooks and topic namespace handling in DefaultLitePullConsumerImpl#6606
Conversation
…in DefaultLitePullConsumerImpl
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThis change adds consume message hooks support to the DefaultLitePullConsumerImpl, enabling hook execution before and after message consumption with panic protection. It also introduces namespace stripping for messages via a reset_topic method to normalize topic names by removing configured namespace prefixes. Changes
Sequence Diagram(s)sequenceDiagram
participant Consumer as DefaultLitePullConsumerImpl
participant Hooks as Registered Hooks
participant Context as ConsumeMessageContext
participant Handler as Message Handler
Consumer->>Consumer: poll() or consume
Consumer->>Consumer: Reset topics via reset_topic()
Consumer->>Context: Create ConsumeMessageContext
Consumer->>Hooks: Execute before_consume hook
Note over Hooks: With panic protection (catch_unwind)
Hooks-->>Consumer: Hook completed (log errors if any)
Consumer->>Handler: Process messages
Handler-->>Consumer: Processing complete
Consumer->>Hooks: Execute after_consume hook
Note over Hooks: With panic protection (catch_unwind)
Hooks-->>Consumer: Hook completed (log errors if any)
Consumer-->>Consumer: Return result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (1)
955-969: Avoid per-message allocation inreset_topichot path.Line 964 allocates a new
Stringfor every message. You can use a borrowed&strand only callset_topicwhen the value actually changes.Refactor sketch
- for msg in messages.iter_mut() { - let topic = msg.message.topic().to_string(); - let topic_without_namespace = - NamespaceUtil::without_namespace_with_namespace(&topic, namespace.as_str()); - msg.message - .set_topic(CheetahString::from_string(topic_without_namespace)); - } + for msg in messages.iter_mut() { + let current_topic = msg.message.topic(); + let topic_without_namespace = NamespaceUtil::without_namespace_with_namespace( + current_topic.as_str(), + namespace.as_str(), + ); + if topic_without_namespace != current_topic.as_str() { + msg.message + .set_topic(CheetahString::from_string(topic_without_namespace)); + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 955 - 969, In reset_topic, avoid allocating a new String per message: obtain the topic as a &str (via msg.message.topic()) instead of calling to_string(), call NamespaceUtil::without_namespace_with_namespace with that &str and the namespace, compare the returned value to the original topic and only call msg.message.set_topic(...) when the topic actually changed; construct a CheetahString (e.g. from_string or from_str) only in the branch where set_topic is needed so allocations happen only on changed topics. Use the existing symbols reset_topic, ArcMut<MessageExt>, NamespaceUtil::without_namespace_with_namespace, and msg.message.set_topic/CheetahString when applying the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 801-809: The panic guards around hook execution are incomplete
because hook.hook_name() is invoked outside the catch_unwind blocks; if
hook_name() panics it will still unwind poll(). Wrap both the hook call and any
inspection of the hook (including hook.hook_name()) inside the
std::panic::catch_unwind boundaries for the before and after paths (the loops
over self.consume_message_hook_list where hook.consume_message_before /
hook.consume_message_after are invoked), and log the hook name only from within
the catch_unwind success branch or capture it beforehand using a panic-safe
string (e.g., call hook_name inside the same AssertUnwindSafe closure or derive
a debug name via a provided safe accessor) so no code outside the catch_unwind
can panic.
---
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 955-969: In reset_topic, avoid allocating a new String per
message: obtain the topic as a &str (via msg.message.topic()) instead of calling
to_string(), call NamespaceUtil::without_namespace_with_namespace with that &str
and the namespace, compare the returned value to the original topic and only
call msg.message.set_topic(...) when the topic actually changed; construct a
CheetahString (e.g. from_string or from_str) only in the branch where set_topic
is needed so allocations happen only on changed topics. Use the existing symbols
reset_topic, ArcMut<MessageExt>,
NamespaceUtil::without_namespace_with_namespace, and
msg.message.set_topic/CheetahString when applying the change.
| for hook in &self.consume_message_hook_list { | ||
| if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { | ||
| hook.consume_message_before(Some(&mut context)); | ||
| })) { | ||
| tracing::error!( | ||
| "consumeMessageHook {} executeHookBefore panicked: {:?}", | ||
| hook.hook_name(), | ||
| err | ||
| ); |
There was a problem hiding this comment.
Panic isolation is incomplete in the hook error path.
On Line 806 and Line 823, hook.hook_name() is called outside catch_unwind. If hook_name() itself panics, poll() can still unwind despite the panic guards.
Suggested hardening
- for hook in &self.consume_message_hook_list {
+ for (idx, hook) in self.consume_message_hook_list.iter().enumerate() {
if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
hook.consume_message_before(Some(&mut context));
})) {
tracing::error!(
- "consumeMessageHook {} executeHookBefore panicked: {:?}",
- hook.hook_name(),
+ "consumeMessageHook[{}] executeHookBefore panicked: {:?}",
+ idx,
err
);
}
}
@@
- for hook in &self.consume_message_hook_list {
+ for (idx, hook) in self.consume_message_hook_list.iter().enumerate() {
if let Err(err) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
hook.consume_message_after(Some(&mut context));
})) {
tracing::error!(
- "consumeMessageHook {} executeHookAfter panicked: {:?}",
- hook.hook_name(),
+ "consumeMessageHook[{}] executeHookAfter panicked: {:?}",
+ idx,
err
);
}
}Also applies to: 819-826
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 801 - 809, The panic guards around hook execution are incomplete
because hook.hook_name() is invoked outside the catch_unwind blocks; if
hook_name() panics it will still unwind poll(). Wrap both the hook call and any
inspection of the hook (including hook.hook_name()) inside the
std::panic::catch_unwind boundaries for the before and after paths (the loops
over self.consume_message_hook_list where hook.consume_message_before /
hook.consume_message_after are invoked), and log the hook name only from within
the catch_unwind success branch or capture it beforehand using a panic-safe
string (e.g., call hook_name inside the same AssertUnwindSafe closure or derive
a debug name via a provided safe accessor) so no code outside the catch_unwind
can panic.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6606 +/- ##
==========================================
- Coverage 41.58% 41.57% -0.02%
==========================================
Files 959 959
Lines 134446 134490 +44
==========================================
+ Hits 55907 55908 +1
- Misses 78539 78582 +43 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit