Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use cheetah_string::CheetahString;
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use rocketmq_common::common::message::message_enum::MessageRequestMode;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::TimeUtils::current_millis;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
Expand All @@ -32,6 +35,8 @@ use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info;
use tracing::warn;

use crate::base::client_config::ClientConfig;
use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
Expand All @@ -41,11 +46,14 @@ use crate::consumer::consumer_impl::pull_api_wrapper::PullAPIWrapper;
use crate::consumer::consumer_impl::re_balance::rebalance_lite_pull_impl::RebalanceLitePullImpl;
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
use crate::consumer::mq_consumer_inner::MQConsumerInner;
use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore;
use crate::consumer::store::offset_store::OffsetStore;
use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore;
use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
use crate::factory::mq_client_instance::MQClientInstance;
use crate::hook::consume_message_hook::ConsumeMessageHook;
use crate::hook::filter_message_hook::FilterMessageHook;
use crate::implementation::mq_client_manager::MQClientManager;

/// Subscription mode for lite pull consumer.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -314,6 +322,171 @@ impl DefaultLitePullConsumerImpl {
pub fn service_state(&self) -> ServiceState {
*self.service_state
}

/// Validates configuration before starting.
fn check_config(&self) -> RocketMQResult<()> {
if self.consumer_config.consumer_group.is_empty() {
return Err(crate::mq_client_err!("Consumer group cannot be empty"));
}

if self.consumer_config.pull_batch_size < 1 || self.consumer_config.pull_batch_size > 1024 {
return Err(crate::mq_client_err!(format!(
"pullBatchSize must be in [1, 1024], current value: {}",
self.consumer_config.pull_batch_size
)));
}

if self.consumer_config.pull_threshold_for_queue < 1 || self.consumer_config.pull_threshold_for_queue > 65535 {
return Err(crate::mq_client_err!(format!(
"pullThresholdForQueue must be in [1, 65535], current value: {}",
self.consumer_config.pull_threshold_for_queue
)));
}

if self.consumer_config.poll_timeout_millis == 0 {
return Err(crate::mq_client_err!("pollTimeoutMillis cannot be 0"));
}

Ok(())
}

/// Starts the lite pull consumer.
pub async fn start(&mut self) -> RocketMQResult<()> {
match *self.service_state {
ServiceState::CreateJust => {
info!(
"DefaultLitePullConsumerImpl [{}] starting. message_model={:?}",
self.consumer_config.consumer_group, self.consumer_config.message_model
);

*self.service_state = ServiceState::StartFailed;

self.check_config()?;

if self.consumer_config.message_model == MessageModel::Clustering {
self.client_config.change_instance_name_to_pid();
}

let client_instance = MQClientManager::get_instance()
.get_or_create_mq_client_instance(self.client_config.as_ref().clone(), None);
self.client_instance = Some(client_instance.clone());

self.rebalance_impl
.set_consumer_group(self.consumer_config.consumer_group.clone());
self.rebalance_impl
.set_message_model(self.consumer_config.message_model);
self.rebalance_impl
.set_allocate_message_queue_strategy(self.consumer_config.allocate_message_queue_strategy.clone());
self.rebalance_impl.set_mq_client_factory(client_instance.clone());

if self.pull_api_wrapper.is_none() {
let mut pull_api_wrapper = PullAPIWrapper::new(
client_instance.clone(),
self.consumer_config.consumer_group.clone(),
false, // unit_mode
);
pull_api_wrapper.register_filter_message_hook(self.filter_message_hook_list.clone());
self.pull_api_wrapper = Some(ArcMut::new(pull_api_wrapper));
}

// Initialize OffsetStore based on message model
let offset_store = ArcMut::new(match self.consumer_config.message_model {
MessageModel::Broadcasting => OffsetStore::new_with_local(LocalFileOffsetStore::new(
client_instance.clone(),
self.consumer_config.consumer_group.clone(),
)),
MessageModel::Clustering => OffsetStore::new_with_remote(RemoteBrokerOffsetStore::new(
client_instance.clone(),
self.consumer_config.consumer_group.clone(),
)),
});

offset_store.load().await?;

self.rebalance_impl.set_offset_store(offset_store.clone());

// Store offset_store in self for later use (e.g., persist_consumer_offset)
self.offset_store = Some(offset_store);

// Consumer registration integrated through MQClientInstance rebalance mechanism

let cloned = self.client_instance.as_mut().cloned().unwrap();
self.client_instance.as_mut().unwrap().start(cloned).await?;
Comment on lines +410 to +414
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for register_consumer patterns
echo "=== Searching for register_consumer patterns ==="
rg -i "register.*consumer" --type rust -n -B2 -A2

echo ""
echo "=== Searching for how client_instance is used ==="
rg "client_instance.*start" --type rust -n -B2 -A2

echo ""
echo "=== Checking MQClientInstance structure ==="
fd ".*client_instance.*" --type rust | head -20

Repository: mxsm/rocketmq-rust

Length of output: 20664


🏁 Script executed:

#!/bin/bash
# Look at the full context of the start method
echo "=== Examining default_lite_pull_consumer_impl.rs around lines 410-414 ==="
wc -l rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs

Repository: mxsm/rocketmq-rust

Length of output: 217


🏁 Script executed:

#!/bin/bash
# Get more context from the start method
echo "=== Reading the start method context ==="
sed -n '350,435p' rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs | cat -n

Repository: mxsm/rocketmq-rust

Length of output: 4664


🏁 Script executed:

#!/bin/bash
# Search for rebalance_impl registration or consumer registration
echo "=== Searching for rebalance setup or consumer group registration ==="
rg "rebalance_impl.*register\|register.*consumer\|consumer_group" rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs -n -B1 -A1

Repository: mxsm/rocketmq-rust

Length of output: 130


🏁 Script executed:

#!/bin/bash
# Check MQClientInstance to see what start method expects
echo "=== Checking MQClientInstance definition ==="
fd "client_instance.rs\|client_impl.rs" --type rust | grep -v test

echo ""
echo "=== Looking for MQClientInstance start method signature ==="
rg "fn start\(" --type rust -B2 -A8 | grep -A8 "MQClientInstance"

Repository: mxsm/rocketmq-rust

Length of output: 2321


🏁 Script executed:

#!/bin/bash
# Check if rebalance automatically registers consumer
echo "=== Checking RebalanceImpl for consumer registration ==="
rg "register_consumer" rocketmq-client/src/consumer/consumer_impl/re_balance/ -n -B2 -A2

echo ""
echo "=== Checking MQClientInstance start method ==="
sed -n '1,150p' rocketmq-client/src/factory/mq_client_instance.rs | grep -n "pub async fn start" -A20

Repository: mxsm/rocketmq-rust

Length of output: 166


🏁 Script executed:

#!/bin/bash
# Compare what happens in default_mq_push_consumer_impl around registration
echo "=== Push consumer registration pattern ==="
sed -n '300,330p' rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs | cat -n

Repository: mxsm/rocketmq-rust

Length of output: 1762


Consumer registration is missing before starting the client instance.

The lite pull consumer should call register_consumer() before start(), consistent with the push consumer implementation. Without explicit registration, the consumer won't be properly tracked by the broker for rebalancing and coordination.

The pattern should match default_mq_push_consumer_impl which calls:

client_instance.register_consumer(group, MQConsumerInnerImpl { ... }).await;
client_instance.start(cloned).await?;

Additionally, the double unwrap pattern on lines 413-414 can be simplified to reduce redundant calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
around lines 410 - 414, The consumer isn't registered before starting the client
instance and uses redundant unwraps; update the startup sequence in
default_lite_pull_consumer_impl to call client_instance.register_consumer(group,
MQConsumerInnerImpl { ... }).await (mirroring default_mq_push_consumer_impl)
before calling client_instance.start(...). Also simplify the unwrap pattern by
taking a single mutable reference or cloning the client_instance once (reuse the
existing cloned variable) and pass that to start rather than calling
as_mut().cloned().unwrap() and as_mut().unwrap() twice.


self.consumer_start_timestamp
.store(current_millis() as i64, Ordering::Release);

*self.service_state = ServiceState::Running;

info!(
"DefaultLitePullConsumerImpl [{}] started successfully",
self.consumer_config.consumer_group
);

Ok(())
}
ServiceState::Running => Err(crate::mq_client_err!("The lite pull consumer is already running")),
ServiceState::ShutdownAlready => Err(crate::mq_client_err!("The lite pull consumer has been shutdown")),
ServiceState::StartFailed => Err(crate::mq_client_err!(format!(
"The lite pull consumer start failed, current state: {:?}",
*self.service_state
))),
}
}

/// Shuts down the lite pull consumer gracefully.
pub async fn shutdown(&mut self) -> RocketMQResult<()> {
match *self.service_state {
ServiceState::Running => {
info!(
"DefaultLitePullConsumerImpl [{}] shutting down",
self.consumer_config.consumer_group
);

self.shutdown_signal.store(true, Ordering::Release);

// Wait for all pull tasks to complete (5s timeout)
let mut handles = self.task_handles.write().await;
for (mq, handle) in handles.drain() {
if let Err(e) = tokio::time::timeout(Duration::from_secs(5), handle).await {
warn!("Pull task for {:?} did not finish in time: {}", mq, e);
}
}
drop(handles);

self.persist_consumer_offset().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Shutdown will panic due to calling unimplemented persist_consumer_offset.

Line 457 calls self.persist_consumer_offset().await, but the MQConsumerInner trait implementation at line 529 contains unimplemented!("persist_consumer_offset"). This will cause a runtime panic during shutdown, preventing graceful termination and potentially losing uncommitted offsets.

🐛 Proposed fix: implement offset persistence or guard the call

Option 1: Implement the method using the stored offset_store:

     async fn persist_consumer_offset(&self) {
-        // Offset persistence handled by commit operations
-        unimplemented!("persist_consumer_offset")
+        if let Some(offset_store) = &self.offset_store {
+            let mqs: Vec<MessageQueue> = self.assigned_message_queue.get_assigned_message_queues();
+            offset_store.persist_all(&mqs).await;
+        }
     }

Option 2: Guard the shutdown call until implementation is ready:

-                self.persist_consumer_offset().await;
+                if let Some(offset_store) = &self.offset_store {
+                    // TODO: Implement proper offset persistence
+                    let _ = offset_store;
+                }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.persist_consumer_offset().await;
if let Some(offset_store) = &self.offset_store {
// TODO: Implement proper offset persistence
let _ = offset_store;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs`
at line 457, The shutdown path calls self.persist_consumer_offset().await but
MQConsumerInner's persist_consumer_offset is currently unimplemented
(unimplemented!("persist_consumer_offset")), which will panic at runtime; either
implement persist_consumer_offset to flush offsets from the consumer's
offset_store (use the existing offset_store field/API to collect and persist
offsets, update any remote broker/store and handle errors) or change the
shutdown call site to conditionally skip or safely await a no-op when
persistence is not available (e.g., replace the direct call with a safe wrapper
like try_persist_consumer_offset or check a flag/Option before awaiting) so
shutdown won't call the unimplemented method; focus changes on the
persist_consumer_offset implementation in the MQConsumerInner impl and the
shutdown caller where self.persist_consumer_offset().await is invoked.


if let Some(client) = self.client_instance.as_mut() {
client.unregister_consumer(&self.consumer_config.consumer_group).await;
}

if let Some(mut client) = self.client_instance.take() {
client.shutdown().await;
}

*self.service_state = ServiceState::ShutdownAlready;

info!(
"DefaultLitePullConsumerImpl [{}] shutdown successfully",
self.consumer_config.consumer_group
);

Ok(())
}
ServiceState::CreateJust => {
*self.service_state = ServiceState::ShutdownAlready;
Ok(())
}
ServiceState::ShutdownAlready => {
warn!("The lite pull consumer has already been shutdown");
Ok(())
}
ServiceState::StartFailed => {
*self.service_state = ServiceState::ShutdownAlready;
Ok(())
}
}
}
}

impl MQConsumerInner for DefaultLitePullConsumerImpl {
Expand Down Expand Up @@ -341,8 +514,8 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl {

async fn do_rebalance(&self) {
if *self.subscription_type.read().await == SubscriptionType::Subscribe {
// Rebalance implementation delegates to RebalanceLitePullImpl via the Rebalance trait
unimplemented!("do_rebalance is not yet implemented")
// Rebalance logic delegated to RebalanceLitePullImpl trait implementation
unimplemented!("do_rebalance")
}
}

Expand All @@ -353,12 +526,12 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl {

async fn persist_consumer_offset(&self) {
// Offset persistence handled by commit operations
unimplemented!("persist_consumer_offset is not yet implemented")
unimplemented!("persist_consumer_offset")
}

async fn update_topic_subscribe_info(&self, topic: CheetahString, info: &HashSet<MessageQueue>) {
// Topic subscription updates managed by rebalance implementation
unimplemented!("update_topic_subscribe_info is not yet implemented")
unimplemented!("update_topic_subscribe_info")
}

async fn is_subscribe_topic_need_update(&self, topic: &str) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct RebalanceLitePullImpl {
/// optional [`MessageQueueListener`][crate::consumer::message_queue_listener::MessageQueueListener].
pub(crate) consumer_config: ArcMut<ConsumerConfig>,
/// The active offset storage backend, injected after the consumer starts.
pub(crate) offset_store: Option<OffsetStore>,
pub(crate) offset_store: Option<ArcMut<OffsetStore>>,
}

impl RebalanceLitePullImpl {
Expand Down Expand Up @@ -132,7 +132,7 @@ impl RebalanceLitePullImpl {
}

/// Sets the offset store backend used to persist and query per-queue consume offsets.
pub fn set_offset_store(&mut self, offset_store: OffsetStore) {
pub fn set_offset_store(&mut self, offset_store: ArcMut<OffsetStore>) {
self.offset_store = Some(offset_store);
}
}
Expand Down Expand Up @@ -352,7 +352,7 @@ impl Rebalance for RebalanceLitePullImpl {
self.remove_unnecessary_message_queue(mq, &pq).await;
if let Some(ref consumer_group) = self.rebalance_impl_inner.consumer_group {
info!(
"Fix Offset, {}, remove unnecessary mq, {} Dropped: {}",
"Rebalance cleanup for {}, removed unnecessary mq: {}, dropped: {}",
consumer_group, mq, dropped
);
}
Expand Down
Loading