@@ -54,9 +54,11 @@ use crate::processor::pop_message_processor::PopMessageProcessor;
5454use crate :: processor:: pop_message_processor:: QueueLockManager ;
5555
5656pub ( crate ) struct PopBufferMergeService < MS > {
57- buffer : DashMap < CheetahString /* mergeKey */ , PopCheckPointWrapper > ,
58- commit_offsets :
59- DashMap < CheetahString /* topic@cid@queueId */ , QueueWithTime < PopCheckPointWrapper > > ,
57+ buffer : DashMap < CheetahString /* mergeKey */ , ArcMut < PopCheckPointWrapper > > ,
58+ commit_offsets : DashMap <
59+ CheetahString , /* topic@cid@queueId */
60+ QueueWithTime < ArcMut < PopCheckPointWrapper > > ,
61+ > ,
6062 serving : AtomicBool ,
6163 counter : AtomicI32 ,
6264 scan_times : u64 ,
@@ -102,14 +104,94 @@ impl<MS> PopBufferMergeService<MS> {
102104}
103105
104106impl < MS : MessageStore > PopBufferMergeService < MS > {
107+ /// Adds a checkpoint to the buffer
105108 pub fn add_ck (
106- & mut self ,
109+ & self ,
107110 point : & PopCheckPoint ,
108111 revive_queue_id : i32 ,
109112 revive_queue_offset : i64 ,
110113 next_begin_offset : i64 ,
111114 ) -> bool {
112- unimplemented ! ( "add_ck not implemented" )
115+ // Check if buffer merge is enabled
116+ let broker_config = self . broker_runtime_inner . broker_config ( ) ;
117+ if !broker_config. enable_pop_buffer_merge {
118+ return false ;
119+ }
120+
121+ // Check if service is active
122+ if !self . serving . load ( Ordering :: Acquire ) {
123+ return false ;
124+ }
125+
126+ // Check timeout condition
127+ let now = get_current_millis ( ) as i64 ;
128+
129+ if point. get_revive_time ( ) - now < broker_config. pop_ck_stay_buffer_time_out as i64 + 1500 {
130+ if broker_config. enable_pop_log {
131+ warn ! ( "[PopBuffer]add ck, timeout, {:?}, {}" , point, now) ;
132+ }
133+ return false ;
134+ }
135+
136+ // Check buffer size
137+ if self . counter . load ( Ordering :: Acquire ) as i64 > broker_config. pop_ck_max_buffer_size {
138+ warn ! (
139+ "[PopBuffer]add ck, max size, {:?}, {}" ,
140+ point,
141+ self . counter. load( Ordering :: Acquire )
142+ ) ;
143+ return false ;
144+ }
145+
146+ // Create wrapper
147+ let point_wrapper = ArcMut :: new ( PopCheckPointWrapper :: new (
148+ revive_queue_id,
149+ revive_queue_offset,
150+ Arc :: new ( point. clone ( ) ) ,
151+ next_begin_offset,
152+ ) ) ;
153+
154+ // Check if queue is valid
155+ if !self . check_queue_ok ( & point_wrapper) {
156+ return false ;
157+ }
158+
159+ // Check for merge key conflict
160+ let merge_key = point_wrapper. get_merge_key ( ) ;
161+ if self . buffer . contains_key ( merge_key) {
162+ warn ! (
163+ "[PopBuffer]mergeKey conflict when add ck. ck:{:?}, mergeKey:{}" ,
164+ point_wrapper, merge_key
165+ ) ;
166+ return false ;
167+ }
168+
169+ // Add to offset queue
170+ self . put_offset_queue ( point_wrapper. clone ( ) ) ;
171+
172+ if broker_config. enable_pop_log {
173+ info ! ( "[PopBuffer]add ck, {:?}" , point_wrapper) ;
174+ }
175+ // Add to buffer
176+ self . buffer . insert ( merge_key. clone ( ) , point_wrapper) ;
177+ self . counter . fetch_add ( 1 , Ordering :: AcqRel ) ;
178+
179+ true
180+ }
181+
182+ // Helper methods
183+ fn check_queue_ok ( & self , point_wrapper : & PopCheckPointWrapper ) -> bool {
184+ let queue = self . commit_offsets . get ( point_wrapper. get_lock_key ( ) ) ;
185+ match queue {
186+ None => true ,
187+ Some ( value) => {
188+ value. get ( ) . lock ( ) . len ( )
189+ < self
190+ . broker_runtime_inner
191+ . broker_config ( )
192+ . pop_ck_offset_max_queue_size as usize
193+ }
194+ }
113195 }
114196
115197 pub fn add_ck_just_offset (
@@ -167,7 +249,7 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
167249 {
168250 if self . broker_runtime_inner . broker_config ( ) . enable_pop_log {
169251 warn ! (
170- "[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}" ,
252+ "[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {:? }, {}, {}" ,
171253 revive_qid,
172254 point_wrapper. value( ) ,
173255 ack_msg,
@@ -185,7 +267,7 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
185267 {
186268 if self . broker_runtime_inner . broker_config ( ) . enable_pop_log {
187269 warn ! (
188- "[PopBuffer]add ack fail, rqId={}, timeout for revive, {}, {}, {}" ,
270+ "[PopBuffer]add ack fail, rqId={}, timeout for revive, {:? }, {}, {}" ,
189271 revive_qid,
190272 point_wrapper. value( ) ,
191273 ack_msg,
@@ -552,7 +634,7 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
552634 ) ;
553635 point_wrapper. set_ck_stored ( true ) ;
554636
555- self . put_offset_queue ( point_wrapper) ;
637+ self . put_offset_queue ( ArcMut :: new ( point_wrapper) ) ;
556638 }
557639
558640 fn is_ck_done_for_finish ( & self , point_wrapper : & PopCheckPointWrapper ) -> bool {
@@ -567,7 +649,7 @@ impl<MS: MessageStore> PopBufferMergeService<MS> {
567649 true
568650 }
569651
570- fn put_offset_queue ( & mut self , point_wrapper : PopCheckPointWrapper ) -> bool {
652+ fn put_offset_queue ( & self , point_wrapper : ArcMut < PopCheckPointWrapper > ) -> bool {
571653 let queue = self
572654 . commit_offsets
573655 . entry ( point_wrapper. lock_key . clone ( ) )
@@ -759,6 +841,7 @@ impl<T> QueueWithTime<T> {
759841 }
760842}
761843
844+ #[ derive( Debug ) ]
762845pub struct PopCheckPointWrapper {
763846 revive_queue_id : i32 ,
764847 // -1: not stored, >=0: stored, Long.MAX: storing.
@@ -887,7 +970,7 @@ impl PopCheckPointWrapper {
887970 & self . lock_key
888971 }
889972
890- pub fn get_merge_key ( & self ) -> & str {
973+ pub fn get_merge_key ( & self ) -> & CheetahString {
891974 & self . merge_key
892975 }
893976
0 commit comments