[ISSUE #6608]šImplement topic unsubscription and message queue removal in DefaultLitePullConsumerImpl#6609
Conversation
ā¦l in DefaultLitePullConsumerImpl
|
š@mxsm šThanks for your contributionšļ¼ š”CodeRabbit(AI) will review your code firstš„ļ¼ Note šØThe code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code reviewš„. |
WalkthroughThis pull request adds topic unsubscription and message queue removal capabilities to the RocketMQ lite pull consumer. It introduces a new Changes
Sequence DiagramsequenceDiagram
participant Client as Consumer Client
participant Consumer as DefaultLitePullConsumerImpl
participant TaskMgr as PullTaskManager
participant Queue as AssignedMessageQueue
participant NS as NameServer/Router
Client->>Consumer: unsubscribe(topic)
Consumer->>TaskMgr: terminate_pull_task(topic)
TaskMgr->>TaskMgr: stop & remove tasks
Consumer->>Consumer: remove subscribed topic
Consumer->>Queue: remove_by_topic(topic)
Queue->>Queue: acquire write lock
Queue->>Queue: filter & drop queues
Queue-->>Consumer: Vec<ProcessQueue>
Consumer-->>Client: Result<()>
Client->>Consumer: fetch_message_queues(topic)
Consumer->>NS: fetch topic route
NS-->>Consumer: MessageQueue set
Consumer->>Consumer: parse_message_queues()
Consumer->>Consumer: strip namespace
Consumer-->>Client: HashSet<MessageQueue>
Client->>Consumer: search_offset(mq, timestamp)
Consumer->>NS: query offset for timestamp
NS-->>Consumer: i64 offset
Consumer-->>Client: RocketMQResult<i64>
Estimated code review effortšÆ 3 (Moderate) | ā±ļø ~22 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
š„ Pre-merge checks | ā 5ā Passed checks (5 passed)
āļø Tip: You can configure your own custom pre-merge checks in the settings. ⨠Finishing Touches
š§Ŗ Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
š§¹ Nitpick comments (2)
rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs (2)
1219-1229: Doc comment inconsistent with implementation.The documentation states "This is an alias for
search_offset" but the implementation usestodo!()instead of delegating. If this is meant to be an alias, consider delegating tosearch_offset.š§ Proposed fix to match documentation
#[allow(dead_code)] pub async fn offset_for_timestamp(&self, _mq: &MessageQueue, _timestamp: u64) -> RocketMQResult<i64> { - todo!("offset_for_timestamp: Low priority method, not yet implemented") + self.search_offset(_mq, _timestamp).await }š¤ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 1219 - 1229, The doc says offset_for_timestamp is an alias for search_offset but the impl uses todo!(); change pub async fn offset_for_timestamp(&self, _mq: &MessageQueue, _timestamp: u64) -> RocketMQResult<i64> to delegate to the existing async search_offset method (call self.search_offset(mq, timestamp).await and return its result), removing the todo!() and using the same parameter names (mq, timestamp) to match search_offset's signature and propagate its RocketMQResult<i64>.
1273-1281:is_auto_commitcan be trivially implemented.This method has a straightforward implementation available - simply return the config value. Consider implementing it instead of using
todo!()to avoid potential runtime panics.š§ Proposed implementation
#[allow(dead_code)] pub fn is_auto_commit(&self) -> bool { - todo!("is_auto_commit: Low priority method, not yet implemented") + self.consumer_config.auto_commit }š¤ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs` around lines 1273 - 1281, Replace the todo! in the is_auto_commit(&self) -> bool method with a direct return of the configured value (e.g., return the boolean from the consumer's config such as self.config.auto_commit or by calling a config getter like self.config.is_auto_commit()). Update the implementation of is_auto_commit to simply read and return that config flag instead of panicking.
š¤ Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 1107-1135: The unsubscribe method removes local subscription state
but doesn't notify brokers like subscribe does; after removing the topic from
rebalance_impl, stopping tasks, and clearing assigned_message_queue in
unsubscribe, invoke the same broker-notify logic used by subscribe (e.g. call
the rebalance_impl send-heartbeat method such as send_heartbeat_to_all_brokers
or equivalent) when the consumer is running (check the consumer running flag
used in subscribe) and await/handle its result (log errors if the heartbeat
fails) so brokers receive the unsubscription update.
---
Nitpick comments:
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`:
- Around line 1219-1229: The doc says offset_for_timestamp is an alias for
search_offset but the impl uses todo!(); change pub async fn
offset_for_timestamp(&self, _mq: &MessageQueue, _timestamp: u64) ->
RocketMQResult<i64> to delegate to the existing async search_offset method (call
self.search_offset(mq, timestamp).await and return its result), removing the
todo!() and using the same parameter names (mq, timestamp) to match
search_offset's signature and propagate its RocketMQResult<i64>.
- Around line 1273-1281: Replace the todo! in the is_auto_commit(&self) -> bool
method with a direct return of the configured value (e.g., return the boolean
from the consumer's config such as self.config.auto_commit or by calling a
config getter like self.config.is_auto_commit()). Update the implementation of
is_auto_commit to simply read and return that config flag instead of panicking.
ā¹ļø Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
š Files selected for processing (2)
rocketmq-client/src/consumer/consumer_impl/assigned_message_queue.rsrocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs
| /// Unsubscribes from the specified topic. | ||
| /// | ||
| /// Removes the topic subscription, stops and removes all pull tasks for the topic, | ||
| /// and clears assigned message queues for the topic. | ||
| /// | ||
| /// This operation can be performed regardless of the consumer state. | ||
| pub async fn unsubscribe(&mut self, topic: impl Into<CheetahString>) -> RocketMQResult<()> { | ||
| let topic = topic.into(); | ||
|
|
||
| // Remove from rebalance_impl subscription | ||
| self.rebalance_impl.get_subscription_inner().remove(&topic); | ||
|
|
||
| // Stop and remove pull tasks for this topic | ||
| let mut task_handles = self.task_handles.write().await; | ||
| task_handles.retain(|mq, handle| { | ||
| if mq.topic() == topic.as_str() { | ||
| handle.abort(); | ||
| false | ||
| } else { | ||
| true | ||
| } | ||
| }); | ||
| drop(task_handles); | ||
|
|
||
| // Remove from assigned_message_queue | ||
| self.assigned_message_queue.remove_by_topic(topic.as_str()).await; | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Consider notifying brokers of unsubscription.
The subscribe method (lines 393-398) sends a heartbeat to all brokers when running to notify them of the subscription change. However, unsubscribe does not notify brokers that the subscription was removed. This could leave stale subscription data on brokers until the next scheduled heartbeat.
š ļø Proposed fix to add heartbeat notification
// Remove from assigned_message_queue
self.assigned_message_queue.remove_by_topic(topic.as_str()).await;
+ // Notify brokers of subscription change if running
+ if *self.service_state == ServiceState::Running {
+ if let Some(ref client_instance) = self.client_instance {
+ client_instance.send_heartbeat_to_all_broker_with_lock().await;
+ }
+ }
+
Ok(())
}š Committable suggestion
ā¼ļø IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Unsubscribes from the specified topic. | |
| /// | |
| /// Removes the topic subscription, stops and removes all pull tasks for the topic, | |
| /// and clears assigned message queues for the topic. | |
| /// | |
| /// This operation can be performed regardless of the consumer state. | |
| pub async fn unsubscribe(&mut self, topic: impl Into<CheetahString>) -> RocketMQResult<()> { | |
| let topic = topic.into(); | |
| // Remove from rebalance_impl subscription | |
| self.rebalance_impl.get_subscription_inner().remove(&topic); | |
| // Stop and remove pull tasks for this topic | |
| let mut task_handles = self.task_handles.write().await; | |
| task_handles.retain(|mq, handle| { | |
| if mq.topic() == topic.as_str() { | |
| handle.abort(); | |
| false | |
| } else { | |
| true | |
| } | |
| }); | |
| drop(task_handles); | |
| // Remove from assigned_message_queue | |
| self.assigned_message_queue.remove_by_topic(topic.as_str()).await; | |
| Ok(()) | |
| } | |
| /// Unsubscribes from the specified topic. | |
| /// | |
| /// Removes the topic subscription, stops and removes all pull tasks for the topic, | |
| /// and clears assigned message queues for the topic. | |
| /// | |
| /// This operation can be performed regardless of the consumer state. | |
| pub async fn unsubscribe(&mut self, topic: impl Into<CheetahString>) -> RocketMQResult<()> { | |
| let topic = topic.into(); | |
| // Remove from rebalance_impl subscription | |
| self.rebalance_impl.get_subscription_inner().remove(&topic); | |
| // Stop and remove pull tasks for this topic | |
| let mut task_handles = self.task_handles.write().await; | |
| task_handles.retain(|mq, handle| { | |
| if mq.topic() == topic.as_str() { | |
| handle.abort(); | |
| false | |
| } else { | |
| true | |
| } | |
| }); | |
| drop(task_handles); | |
| // Remove from assigned_message_queue | |
| self.assigned_message_queue.remove_by_topic(topic.as_str()).await; | |
| // Notify brokers of subscription change if running | |
| if *self.service_state == ServiceState::Running { | |
| if let Some(ref client_instance) = self.client_instance { | |
| client_instance.send_heartbeat_to_all_broker_with_lock().await; | |
| } | |
| } | |
| Ok(()) | |
| } |
š¤ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 1107 - 1135, The unsubscribe method removes local subscription
state but doesn't notify brokers like subscribe does; after removing the topic
from rebalance_impl, stopping tasks, and clearing assigned_message_queue in
unsubscribe, invoke the same broker-notify logic used by subscribe (e.g. call
the rebalance_impl send-heartbeat method such as send_heartbeat_to_all_brokers
or equivalent) when the consumer is running (check the consumer running flag
used in subscribe) and await/handle its result (log errors if the heartbeat
fails) so brokers receive the unsubscription update.
Codecov Reportā Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6609 +/- ##
==========================================
- Coverage 41.56% 41.54% -0.03%
==========================================
Files 959 959
Lines 134490 134575 +85
==========================================
Hits 55907 55907
- Misses 78583 78668 +85 ā View full report in Codecov by Sentry. š New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ā
Which Issue(s) This PR Fixes(Closes)
Brief Description
How Did You Test This Change?
Summary by CodeRabbit