[ISSUE #6603]🚀Add seek functionality and offset retrieval methods in DefaultLitePullConsumerImpl#6604
Conversation
…DefaultLitePullConsumerImpl
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughThe PR introduces seek functionality to DefaultLitePullConsumerImpl, enabling consumers to navigate to specific offsets, queue beginnings, or queue ends. It adds offset retrieval via a Changes
Sequence DiagramsequenceDiagram
participant Consumer as Consumer
participant StateStore as State/Cache
participant ProcessQueue as ProcessQueue
participant Broker as Broker
Consumer->>StateStore: validate state & assignment
StateStore-->>Consumer: state validated
Consumer->>ProcessQueue: check not dropped
ProcessQueue-->>Consumer: status OK
Consumer->>StateStore: clear queue from cache
StateStore-->>Consumer: cache cleared
Consumer->>ProcessQueue: abort existing pull task
ProcessQueue-->>Consumer: task aborted
Consumer->>StateStore: set seek offset
StateStore-->>Consumer: offset persisted
Consumer->>Broker: fetch messages from new offset
Broker-->>Consumer: messages returned
Consumer->>ProcessQueue: restart pull task
ProcessQueue-->>Consumer: task running
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (2)
1001-1014: Redundant conditional check after task removal.Line 1003 removes the task handle for
message_queue, so the check at line 1011!task_handles.contains_key(message_queue)will always be true. The conditional is unnecessary.♻️ Suggested simplification
// Stop old pull task let mut task_handles = self.task_handles.write().await; if let Some(handle) = task_handles.remove(message_queue) { handle.abort(); } // Set seek offset self.assigned_message_queue.set_seek_offset(message_queue, offset).await; // Start new pull task - if !task_handles.contains_key(message_queue) { - drop(task_handles); // Release write lock before starting pull task - self.start_pull_task(message_queue.clone()).await?; - } + drop(task_handles); // Release write lock before starting pull task + self.start_pull_task(message_queue.clone()).await?; Ok(())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 1001 - 1014, The check for task_handles.contains_key(message_queue) after removing the handle is redundant because task_handles.remove(message_queue) just ran; drop the conditional and always release the write lock then call self.start_pull_task(message_queue.clone()).await? in the same place—i.e., after remove and set_seek_offset, drop task_handles and invoke self.start_pull_task(message_queue.clone()).await? unconditionally (keep abort() and set_seek_offset logic intact) so the flow uses task_handles.remove(...), set_seek_offset(...), drop(task_handles), then start_pull_task(...).
1055-1061: Consider replacing.unwrap()with proper error handling.Lines 1057-1058 and 1081-1082 use
.unwrap()onmq_client_api_impl.as_mut(), which could panic if the API impl is unexpectedlyNone. While this should not happen whenservice_stateisRunning, defensive error handling would be more robust.♻️ Suggested improvement for max_offset
- client_instance - .mq_client_api_impl - .as_mut() - .unwrap() - .get_max_offset(broker_result.broker_addr.as_str(), message_queue, 3000) - .await + client_instance + .mq_client_api_impl + .as_mut() + .ok_or_else(|| crate::mq_client_err!("MQ client API not initialized"))? + .get_max_offset(broker_result.broker_addr.as_str(), message_queue, 3000) + .awaitApply the same pattern to
min_offset.Also applies to: 1079-1085
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 1055 - 1061, Replace the vulnerable .unwrap() calls on client_instance.mq_client_api_impl with explicit error handling: check mq_client_api_impl.as_mut() (e.g., if let Some(api) = ...) before calling get_max_offset/get_min_offset and return or propagate a meaningful error if None (include context like broker_result.broker_addr and message_queue). Update both places where get_max_offset and get_min_offset are invoked so they return a Result or map the absence to a domain error rather than panicking; refer to the mq_client_api_impl field and the get_max_offset/get_min_offset calls to locate and fix the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 1001-1014: The check for task_handles.contains_key(message_queue)
after removing the handle is redundant because
task_handles.remove(message_queue) just ran; drop the conditional and always
release the write lock then call
self.start_pull_task(message_queue.clone()).await? in the same place—i.e., after
remove and set_seek_offset, drop task_handles and invoke
self.start_pull_task(message_queue.clone()).await? unconditionally (keep abort()
and set_seek_offset logic intact) so the flow uses task_handles.remove(...),
set_seek_offset(...), drop(task_handles), then start_pull_task(...).
- Around line 1055-1061: Replace the vulnerable .unwrap() calls on
client_instance.mq_client_api_impl with explicit error handling: check
mq_client_api_impl.as_mut() (e.g., if let Some(api) = ...) before calling
get_max_offset/get_min_offset and return or propagate a meaningful error if None
(include context like broker_result.broker_addr and message_queue). Update both
places where get_max_offset and get_min_offset are invoked so they return a
Result or map the absence to a domain error rather than panicking; refer to the
mq_client_api_impl field and the get_max_offset/get_min_offset calls to locate
and fix the code.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rsrocketmq-client/src/implementation/mq_client_api_impl.rs
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6604 +/- ##
==========================================
- Coverage 41.63% 41.58% -0.06%
==========================================
Files 959 959
Lines 134280 134446 +166
==========================================
Hits 55907 55907
- Misses 78373 78539 +166 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit