@@ -27,6 +27,7 @@ use rocketmq_common::common::message::message_enum::MessageRequestMode;
2727use rocketmq_common:: common:: message:: message_queue:: MessageQueue ;
2828use rocketmq_common:: TimeUtils :: current_millis;
2929use rocketmq_error:: RocketMQResult ;
30+ use rocketmq_remoting:: protocol:: filter:: filter_api:: FilterAPI ;
3031use rocketmq_remoting:: protocol:: heartbeat:: consume_type:: ConsumeType ;
3132use rocketmq_remoting:: protocol:: heartbeat:: message_model:: MessageModel ;
3233use rocketmq_remoting:: protocol:: heartbeat:: subscription_data:: SubscriptionData ;
@@ -44,6 +45,7 @@ use crate::consumer::consumer_impl::assigned_message_queue::AssignedMessageQueue
4445use crate :: consumer:: consumer_impl:: lite_pull_consume_request:: LitePullConsumeRequest ;
4546use crate :: consumer:: consumer_impl:: pull_api_wrapper:: PullAPIWrapper ;
4647use crate :: consumer:: consumer_impl:: re_balance:: rebalance_lite_pull_impl:: RebalanceLitePullImpl ;
48+ use crate :: consumer:: consumer_impl:: re_balance:: Rebalance ;
4749use crate :: consumer:: default_mq_push_consumer:: ConsumerConfig ;
4850use crate :: consumer:: mq_consumer_inner:: MQConsumerInner ;
4951use crate :: consumer:: store:: local_file_offset_store:: LocalFileOffsetStore ;
@@ -350,6 +352,128 @@ impl DefaultLitePullConsumerImpl {
350352 Ok ( ( ) )
351353 }
352354
355+ /// Sets subscription type and validates no conflicts.
356+ async fn set_subscription_type ( & self , sub_type : SubscriptionType ) -> RocketMQResult < ( ) > {
357+ let mut subscription_type = self . subscription_type . write ( ) . await ;
358+ if * subscription_type == SubscriptionType :: None {
359+ * subscription_type = sub_type;
360+ Ok ( ( ) )
361+ } else if * subscription_type != sub_type {
362+ Err ( crate :: mq_client_err!( "Subscribe and assign are mutually exclusive." ) )
363+ } else {
364+ Ok ( ( ) )
365+ }
366+ }
367+
368+ /// Subscribes to a topic with optional tag expression filter.
369+ pub async fn subscribe (
370+ & mut self ,
371+ topic : impl Into < CheetahString > ,
372+ sub_expression : impl Into < CheetahString > ,
373+ ) -> RocketMQResult < ( ) > {
374+ let topic = topic. into ( ) ;
375+ let sub_expression = sub_expression. into ( ) ;
376+
377+ if topic. is_empty ( ) {
378+ return Err ( crate :: mq_client_err!( "Topic cannot be empty" ) ) ;
379+ }
380+
381+ self . set_subscription_type ( SubscriptionType :: Subscribe ) . await ?;
382+
383+ let subscription_data = FilterAPI :: build_subscription_data ( & topic, & sub_expression)
384+ . map_err ( |e| crate :: mq_client_err!( format!( "Failed to build subscription data: {}" , e) ) ) ?;
385+
386+ self . rebalance_impl . put_subscription_data ( topic, subscription_data) ;
387+
388+ if * self . service_state == ServiceState :: Running {
389+ if let Some ( ref client_instance) = self . client_instance {
390+ client_instance. send_heartbeat_to_all_broker_with_lock ( ) . await ;
391+ self . update_topic_subscribe_info_when_subscription_changed ( ) . await ?;
392+ }
393+ }
394+
395+ Ok ( ( ) )
396+ }
397+
398+ /// Manually assigns specific message queues (ASSIGN mode).
399+ pub async fn assign ( & mut self , message_queues : Vec < MessageQueue > ) -> RocketMQResult < ( ) > {
400+ if message_queues. is_empty ( ) {
401+ return Err ( crate :: mq_client_err!( "Message queues cannot be empty" ) ) ;
402+ }
403+
404+ self . set_subscription_type ( SubscriptionType :: Assign ) . await ?;
405+
406+ self . update_assigned_message_queue_for_assign ( & message_queues) . await ;
407+
408+ if * self . service_state == ServiceState :: Running {
409+ self . update_pull_task_for_assign ( & message_queues) . await ?;
410+ }
411+
412+ Ok ( ( ) )
413+ }
414+
415+ /// Updates assigned message queues for ASSIGN mode.
416+ async fn update_assigned_message_queue_for_assign ( & self , assigned : & [ MessageQueue ] ) {
417+ let assigned_set: HashSet < MessageQueue > = assigned. iter ( ) . cloned ( ) . collect ( ) ;
418+
419+ let to_remove: Vec < MessageQueue > = {
420+ let task_handles = self . task_handles . read ( ) . await ;
421+ task_handles
422+ . keys ( )
423+ . filter ( |mq| !assigned_set. contains ( mq) )
424+ . cloned ( )
425+ . collect ( )
426+ } ;
427+
428+ for mq in & to_remove {
429+ if let Some ( pq) = self . assigned_message_queue . remove ( mq) . await {
430+ pq. set_dropped ( true ) ;
431+ }
432+ }
433+
434+ if !to_remove. is_empty ( ) {
435+ let mut task_handles = self . task_handles . write ( ) . await ;
436+ for mq in & to_remove {
437+ task_handles. remove ( mq) ;
438+ }
439+ }
440+
441+ for mq in assigned {
442+ self . assigned_message_queue . put ( mq. clone ( ) ) . await ;
443+ }
444+ }
445+
446+ /// Starts pull tasks for assigned queues in ASSIGN mode.
447+ async fn update_pull_task_for_assign ( & self , assigned : & [ MessageQueue ] ) -> RocketMQResult < ( ) > {
448+ let to_start: Vec < MessageQueue > = {
449+ let task_handles = self . task_handles . read ( ) . await ;
450+ assigned
451+ . iter ( )
452+ . filter ( |mq| !task_handles. contains_key ( mq) )
453+ . cloned ( )
454+ . collect ( )
455+ } ;
456+
457+ for mq in to_start {
458+ self . start_pull_task ( mq) . await ?;
459+ }
460+ Ok ( ( ) )
461+ }
462+
463+ /// Updates topic subscription info when subscription changes (SUBSCRIBE mode).
464+ async fn update_topic_subscribe_info_when_subscription_changed ( & mut self ) -> RocketMQResult < ( ) > {
465+ let subscription_inner = self . rebalance_impl . get_subscription_inner ( ) ;
466+ if let Some ( ref mut client_instance) = self . client_instance {
467+ for entry in subscription_inner. iter ( ) {
468+ let topic = entry. key ( ) ;
469+ client_instance
470+ . update_topic_route_info_from_name_server_topic ( topic)
471+ . await ;
472+ }
473+ }
474+ Ok ( ( ) )
475+ }
476+
353477 /// Starts the lite pull consumer.
354478 pub async fn start ( & mut self ) -> RocketMQResult < ( ) > {
355479 match * self . service_state {
@@ -490,6 +614,13 @@ impl DefaultLitePullConsumerImpl {
490614
491615 /// Spawns an async pull task for a message queue.
492616 async fn start_pull_task ( & self , mq : MessageQueue ) -> RocketMQResult < ( ) > {
617+ {
618+ let task_handles = self . task_handles . read ( ) . await ;
619+ if task_handles. contains_key ( & mq) {
620+ return Ok ( ( ) ) ;
621+ }
622+ }
623+
493624 let default_impl = self
494625 . default_lite_pull_consumer_impl
495626 . clone ( )
@@ -534,7 +665,15 @@ impl DefaultLitePullConsumerImpl {
534665 }
535666 } ) ;
536667
537- self . task_handles . write ( ) . await . insert ( mq, handle) ;
668+ let mut task_handles = self . task_handles . write ( ) . await ;
669+ match task_handles. entry ( mq) {
670+ std:: collections:: hash_map:: Entry :: Vacant ( e) => {
671+ e. insert ( handle) ;
672+ }
673+ std:: collections:: hash_map:: Entry :: Occupied ( _) => {
674+ handle. abort ( ) ;
675+ }
676+ }
538677 Ok ( ( ) )
539678 }
540679
@@ -624,8 +763,7 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl {
624763
625764 async fn do_rebalance ( & self ) {
626765 if * self . subscription_type . read ( ) . await == SubscriptionType :: Subscribe {
627- // Rebalance logic delegated to RebalanceLitePullImpl trait implementation
628- unimplemented ! ( "do_rebalance" )
766+ self . rebalance_impl . mut_from_ref ( ) . do_rebalance ( false ) . await ;
629767 }
630768 }
631769
@@ -640,12 +778,32 @@ impl MQConsumerInner for DefaultLitePullConsumerImpl {
640778 }
641779
642780 async fn update_topic_subscribe_info ( & self , topic : CheetahString , info : & HashSet < MessageQueue > ) {
643- // Topic subscription updates managed by rebalance implementation
644- unimplemented ! ( "update_topic_subscribe_info" )
781+ let subscription_inner = self . rebalance_impl . get_subscription_inner ( ) ;
782+ if subscription_inner. contains_key ( & topic) {
783+ self . rebalance_impl
784+ . rebalance_impl_inner
785+ . topic_subscribe_info_table
786+ . write ( )
787+ . await
788+ . insert ( topic, info. clone ( ) ) ;
789+ }
645790 }
646791
647792 async fn is_subscribe_topic_need_update ( & self , topic : & str ) -> bool {
648- // Subscription updates determined by metadata changes
793+ let subscription_inner = self . rebalance_impl . get_subscription_inner ( ) ;
794+
795+ for entry in subscription_inner. iter ( ) {
796+ if entry. key ( ) . as_str ( ) == topic {
797+ let contains = self
798+ . rebalance_impl
799+ . rebalance_impl_inner
800+ . topic_subscribe_info_table
801+ . read ( )
802+ . await
803+ . contains_key ( entry. key ( ) ) ;
804+ return !contains;
805+ }
806+ }
649807 false
650808 }
651809
0 commit comments