diff --git a/rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs index 9435b3756..d7e3ef3f1 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_lite_pull_consumer_impl.rs @@ -17,12 +17,15 @@ use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use cheetah_string::CheetahString; use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; use rocketmq_common::common::message::message_enum::MessageRequestMode; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::TimeUtils::current_millis; use rocketmq_error::RocketMQResult; use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; @@ -32,6 +35,8 @@ use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::task::JoinHandle; +use tracing::info; +use tracing::warn; use crate::base::client_config::ClientConfig; use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy; @@ -41,11 +46,14 @@ use crate::consumer::consumer_impl::pull_api_wrapper::PullAPIWrapper; use crate::consumer::consumer_impl::re_balance::rebalance_lite_pull_impl::RebalanceLitePullImpl; use crate::consumer::default_mq_push_consumer::ConsumerConfig; use crate::consumer::mq_consumer_inner::MQConsumerInner; +use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore; use crate::consumer::store::offset_store::OffsetStore; +use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore; use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener; use crate::factory::mq_client_instance::MQClientInstance; use crate::hook::consume_message_hook::ConsumeMessageHook; use crate::hook::filter_message_hook::FilterMessageHook; +use crate::implementation::mq_client_manager::MQClientManager; /// Subscription mode for lite pull consumer. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -314,6 +322,171 @@ impl DefaultLitePullConsumerImpl { pub fn service_state(&self) -> ServiceState { *self.service_state } + + /// Validates configuration before starting. + fn check_config(&self) -> RocketMQResult<()> { + if self.consumer_config.consumer_group.is_empty() { + return Err(crate::mq_client_err!("Consumer group cannot be empty")); + } + + if self.consumer_config.pull_batch_size < 1 || self.consumer_config.pull_batch_size > 1024 { + return Err(crate::mq_client_err!(format!( + "pullBatchSize must be in [1, 1024], current value: {}", + self.consumer_config.pull_batch_size + ))); + } + + if self.consumer_config.pull_threshold_for_queue < 1 || self.consumer_config.pull_threshold_for_queue > 65535 { + return Err(crate::mq_client_err!(format!( + "pullThresholdForQueue must be in [1, 65535], current value: {}", + self.consumer_config.pull_threshold_for_queue + ))); + } + + if self.consumer_config.poll_timeout_millis == 0 { + return Err(crate::mq_client_err!("pollTimeoutMillis cannot be 0")); + } + + Ok(()) + } + + /// Starts the lite pull consumer. + pub async fn start(&mut self) -> RocketMQResult<()> { + match *self.service_state { + ServiceState::CreateJust => { + info!( + "DefaultLitePullConsumerImpl [{}] starting. message_model={:?}", + self.consumer_config.consumer_group, self.consumer_config.message_model + ); + + *self.service_state = ServiceState::StartFailed; + + self.check_config()?; + + if self.consumer_config.message_model == MessageModel::Clustering { + self.client_config.change_instance_name_to_pid(); + } + + let client_instance = MQClientManager::get_instance() + .get_or_create_mq_client_instance(self.client_config.as_ref().clone(), None); + self.client_instance = Some(client_instance.clone()); + + self.rebalance_impl + .set_consumer_group(self.consumer_config.consumer_group.clone()); + self.rebalance_impl + .set_message_model(self.consumer_config.message_model); + self.rebalance_impl + .set_allocate_message_queue_strategy(self.consumer_config.allocate_message_queue_strategy.clone()); + self.rebalance_impl.set_mq_client_factory(client_instance.clone()); + + if self.pull_api_wrapper.is_none() { + let mut pull_api_wrapper = PullAPIWrapper::new( + client_instance.clone(), + self.consumer_config.consumer_group.clone(), + false, // unit_mode + ); + pull_api_wrapper.register_filter_message_hook(self.filter_message_hook_list.clone()); + self.pull_api_wrapper = Some(ArcMut::new(pull_api_wrapper)); + } + + // Initialize OffsetStore based on message model + let offset_store = ArcMut::new(match self.consumer_config.message_model { + MessageModel::Broadcasting => OffsetStore::new_with_local(LocalFileOffsetStore::new( + client_instance.clone(), + self.consumer_config.consumer_group.clone(), + )), + MessageModel::Clustering => OffsetStore::new_with_remote(RemoteBrokerOffsetStore::new( + client_instance.clone(), + self.consumer_config.consumer_group.clone(), + )), + }); + + offset_store.load().await?; + + self.rebalance_impl.set_offset_store(offset_store.clone()); + + // Store offset_store in self for later use (e.g., persist_consumer_offset) + self.offset_store = Some(offset_store); + + // Consumer registration integrated through MQClientInstance rebalance mechanism + + let cloned = self.client_instance.as_mut().cloned().unwrap(); + self.client_instance.as_mut().unwrap().start(cloned).await?; + + self.consumer_start_timestamp + .store(current_millis() as i64, Ordering::Release); + + *self.service_state = ServiceState::Running; + + info!( + "DefaultLitePullConsumerImpl [{}] started successfully", + self.consumer_config.consumer_group + ); + + Ok(()) + } + ServiceState::Running => Err(crate::mq_client_err!("The lite pull consumer is already running")), + ServiceState::ShutdownAlready => Err(crate::mq_client_err!("The lite pull consumer has been shutdown")), + ServiceState::StartFailed => Err(crate::mq_client_err!(format!( + "The lite pull consumer start failed, current state: {:?}", + *self.service_state + ))), + } + } + + /// Shuts down the lite pull consumer gracefully. + pub async fn shutdown(&mut self) -> RocketMQResult<()> { + match *self.service_state { + ServiceState::Running => { + info!( + "DefaultLitePullConsumerImpl [{}] shutting down", + self.consumer_config.consumer_group + ); + + self.shutdown_signal.store(true, Ordering::Release); + + // Wait for all pull tasks to complete (5s timeout) + let mut handles = self.task_handles.write().await; + for (mq, handle) in handles.drain() { + if let Err(e) = tokio::time::timeout(Duration::from_secs(5), handle).await { + warn!("Pull task for {:?} did not finish in time: {}", mq, e); + } + } + drop(handles); + + self.persist_consumer_offset().await; + + if let Some(client) = self.client_instance.as_mut() { + client.unregister_consumer(&self.consumer_config.consumer_group).await; + } + + if let Some(mut client) = self.client_instance.take() { + client.shutdown().await; + } + + *self.service_state = ServiceState::ShutdownAlready; + + info!( + "DefaultLitePullConsumerImpl [{}] shutdown successfully", + self.consumer_config.consumer_group + ); + + Ok(()) + } + ServiceState::CreateJust => { + *self.service_state = ServiceState::ShutdownAlready; + Ok(()) + } + ServiceState::ShutdownAlready => { + warn!("The lite pull consumer has already been shutdown"); + Ok(()) + } + ServiceState::StartFailed => { + *self.service_state = ServiceState::ShutdownAlready; + Ok(()) + } + } + } } impl MQConsumerInner for DefaultLitePullConsumerImpl { @@ -341,8 +514,8 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl { async fn do_rebalance(&self) { if *self.subscription_type.read().await == SubscriptionType::Subscribe { - // Rebalance implementation delegates to RebalanceLitePullImpl via the Rebalance trait - unimplemented!("do_rebalance is not yet implemented") + // Rebalance logic delegated to RebalanceLitePullImpl trait implementation + unimplemented!("do_rebalance") } } @@ -353,12 +526,12 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl { async fn persist_consumer_offset(&self) { // Offset persistence handled by commit operations - unimplemented!("persist_consumer_offset is not yet implemented") + unimplemented!("persist_consumer_offset") } async fn update_topic_subscribe_info(&self, topic: CheetahString, info: &HashSet) { // Topic subscription updates managed by rebalance implementation - unimplemented!("update_topic_subscribe_info is not yet implemented") + unimplemented!("update_topic_subscribe_info") } async fn is_subscribe_topic_need_update(&self, topic: &str) -> bool { diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs index 8f1648977..f138c8edf 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_lite_pull_impl.rs @@ -66,7 +66,7 @@ pub struct RebalanceLitePullImpl { /// optional [`MessageQueueListener`][crate::consumer::message_queue_listener::MessageQueueListener]. pub(crate) consumer_config: ArcMut, /// The active offset storage backend, injected after the consumer starts. - pub(crate) offset_store: Option, + pub(crate) offset_store: Option>, } impl RebalanceLitePullImpl { @@ -132,7 +132,7 @@ impl RebalanceLitePullImpl { } /// Sets the offset store backend used to persist and query per-queue consume offsets. - pub fn set_offset_store(&mut self, offset_store: OffsetStore) { + pub fn set_offset_store(&mut self, offset_store: ArcMut) { self.offset_store = Some(offset_store); } } @@ -352,7 +352,7 @@ impl Rebalance for RebalanceLitePullImpl { self.remove_unnecessary_message_queue(mq, &pq).await; if let Some(ref consumer_group) = self.rebalance_impl_inner.consumer_group { info!( - "Fix Offset, {}, remove unnecessary mq, {} Dropped: {}", + "Rebalance cleanup for {}, removed unnecessary mq: {}, dropped: {}", consumer_group, mq, dropped ); }