[ISSUE #6601]šDefaultLitePullConsumerImpl with message polling and offset management improvements#6602
Conversation
ā¦fset management improvements
|
š@mxsm šThanks for your contributionšļ¼ š”CodeRabbit(AI) will review your code firstš„ļ¼ Note šØThe code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code reviewš„. |
WalkthroughThis PR adds message polling and offset management capabilities to DefaultLitePullConsumerImpl through new public methods including Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Consumer as DefaultLitePullConsumerImpl
participant OffsetStore
participant Hooks
participant MQ as MessageQueue
Client->>Consumer: poll(timeout_millis)
Consumer->>Consumer: Validate consumer state
alt State is running
Consumer->>Consumer: Check if auto-commit enabled
alt Auto-commit enabled
Consumer->>OffsetStore: Commit current offsets
OffsetStore-->>Consumer: Offsets committed
end
Consumer->>MQ: Fetch messages
MQ-->>Consumer: Messages received
Consumer->>Consumer: Update consume offset
Consumer->>Hooks: Invoke post-consume hooks
Hooks-->>Consumer: Hooks executed
Consumer-->>Client: Return Vec<ArcMut<MessageExt>>
else Invalid state
Consumer-->>Client: Error
end
Estimated code review effortšÆ 3 (Moderate) | ā±ļø ~20 minutes Poem
š„ Pre-merge checks | ā 4 | ā 1ā Failed checks (1 inconclusive)
ā Passed checks (4 passed)
āļø Tip: You can configure your own custom pre-merge checks in the settings. ⨠Finishing Touches
š§Ŗ Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
š§¹ Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (1)
812-823: Consider propagating partial failure information fromcommit_all().The method always returns
Ok(())even when all individualcommit_synccalls fail. While logging errors is good, callers have no way to detect whether any commits succeeded. If this is intentional best-effort behavior, a brief doc comment would clarify the contract.š” Alternative: return summary of failures
- pub async fn commit_all(&self) -> RocketMQResult<()> { + /// Commits offsets for all assigned queues. + /// Returns Ok(()) on best-effort basis; individual failures are logged but not propagated. + pub async fn commit_all(&self) -> RocketMQResult<()> { let queues = self.assigned_message_queue.message_queues().await; + let mut failure_count = 0; for mq in queues { if let Err(e) = self.commit_sync(&mq, true).await { tracing::error!("Failed to commit offset for queue {:?}: {}", mq, e); + failure_count += 1; } } + if failure_count > 0 { + tracing::warn!("commit_all completed with {} failures", failure_count); + } Ok(()) }š¤ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 812 - 823, commit_all currently swallows all commit_sync failures and always returns Ok(()), leaving callers unaware of partial or total failures; update commit_all (and its signature if needed) to propagate failure info by collecting errors from each commit_sync(&mq, true).await (refer to commit_all and commit_sync) and returning an Err (or a summary error type) if any individual commit failed, or alternatively document the method behavior clearly if best-effort is intentional; implement by aggregating errors (e.g., Vec<(MessageQueue, Error)> or a combined error) and returning RocketMQResult::Err when non-empty so callers can detect partial failures.
š¤ Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 779-783: The loop over consume_message_hook_list is a no-op so
hooks never run; either call the hook method or remove the dead loop: locate the
loop in default_lite_pull_consumer_impl.rs where consume_message_hook_list is
iterated and replace the empty body with invocations of the hook trait's method
(pass the current message(s) and any consumer/context needed per the hook trait,
e.g., call hook.consume_message(consumer, msgs, context) or the actual method
name defined by your ConsumeMessageHook trait), or if hooks are not yet
supported remove the loop and add a TODO explaining future intent.
---
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 812-823: commit_all currently swallows all commit_sync failures
and always returns Ok(()), leaving callers unaware of partial or total failures;
update commit_all (and its signature if needed) to propagate failure info by
collecting errors from each commit_sync(&mq, true).await (refer to commit_all
and commit_sync) and returning an Err (or a summary error type) if any
individual commit failed, or alternatively document the method behavior clearly
if best-effort is intentional; implement by aggregating errors (e.g.,
Vec<(MessageQueue, Error)> or a combined error) and returning
RocketMQResult::Err when non-empty so callers can detect partial failures.
ā¹ļø Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
š Files selected for processing (2)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsrocketmq-client/src/consumer/consumer_impl/lite_pull_consume_request.rs
| if !self.consume_message_hook_list.is_empty() { | ||
| for hook in &self.consume_message_hook_list { | ||
| // Execute consume message hooks | ||
| } | ||
| } |
There was a problem hiding this comment.
Empty hook execution loop - hooks are not being invoked.
The loop iterates over consume_message_hook_list but the body is empty. Either invoke the hooks or remove the dead code.
š§ Suggested fix
If hooks should be invoked, implement the call:
if !self.consume_message_hook_list.is_empty() {
for hook in &self.consume_message_hook_list {
- // Execute consume message hooks
+ // TODO: Build ConsumeMessageContext and invoke hook
+ // hook.consume_message_before(&context);
}
}Alternatively, if this is placeholder code, consider removing it entirely until ready to implement, or add a TODO comment outside the loop to clarify intent.
š Committable suggestion
ā¼ļø IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if !self.consume_message_hook_list.is_empty() { | |
| for hook in &self.consume_message_hook_list { | |
| // Execute consume message hooks | |
| } | |
| } | |
| if !self.consume_message_hook_list.is_empty() { | |
| for hook in &self.consume_message_hook_list { | |
| // TODO: Build ConsumeMessageContext and invoke hook | |
| // hook.consume_message_before(&context); | |
| } | |
| } |
š¤ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 779 - 783, The loop over consume_message_hook_list is a no-op so
hooks never run; either call the hook method or remove the dead loop: locate the
loop in default_lite_pull_consumer_impl.rs where consume_message_hook_list is
iterated and replace the empty body with invocations of the hook trait's method
(pass the current message(s) and any consumer/context needed per the hook trait,
e.g., call hook.consume_message(consumer, msgs, context) or the actual method
name defined by your ConsumeMessageHook trait), or if hooks are not yet
supported remove the loop and add a TODO explaining future intent.
Codecov Reportā Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6602 +/- ##
==========================================
- Coverage 41.67% 41.63% -0.05%
==========================================
Files 959 959
Lines 134135 134280 +145
==========================================
+ Hits 55904 55907 +3
- Misses 78231 78373 +142 ā View full report in Codecov by Sentry. š New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ā
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit