1- use std:: collections:: { HashMap , VecDeque } ;
1+ use std:: cmp:: Reverse ;
2+ use std:: collections:: { BinaryHeap , HashMap , VecDeque } ;
23use std:: hash:: { Hash , Hasher } ;
34use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
45use std:: sync:: Arc ;
@@ -180,8 +181,11 @@ struct SubscriberQueue {
180181#[ derive( Debug ) ]
181182struct SubscriberQueueInner {
182183 next_tag : u64 ,
183- pending : VecDeque < QueueEntry > ,
184+ pending : VecDeque < DeliveryTag > ,
185+ pending_entries : HashMap < DeliveryTag , QueueEntry > ,
184186 inflight : HashMap < DeliveryTag , QueueEntry > ,
187+ expiration_heap : BinaryHeap < Reverse < ExpirationEntry > > ,
188+ retry_heap : BinaryHeap < Reverse < RetryEntry > > ,
185189}
186190
187191#[ derive( Debug , Clone ) ]
@@ -197,6 +201,46 @@ struct QueueEntry {
197201 next_delivery_at : std:: time:: Instant ,
198202}
199203
204+ #[ derive( Debug , Clone , Copy , Eq , PartialEq ) ]
205+ struct ExpirationEntry {
206+ expires_at : std:: time:: Instant ,
207+ tag : DeliveryTag ,
208+ }
209+
210+ impl Ord for ExpirationEntry {
211+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
212+ self . expires_at
213+ . cmp ( & other. expires_at )
214+ . then_with ( || self . tag . value ( ) . cmp ( & other. tag . value ( ) ) )
215+ }
216+ }
217+
218+ impl PartialOrd for ExpirationEntry {
219+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
220+ Some ( self . cmp ( other) )
221+ }
222+ }
223+
224+ #[ derive( Debug , Clone , Copy , Eq , PartialEq ) ]
225+ struct RetryEntry {
226+ next_delivery_at : std:: time:: Instant ,
227+ tag : DeliveryTag ,
228+ }
229+
230+ impl Ord for RetryEntry {
231+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
232+ self . next_delivery_at
233+ . cmp ( & other. next_delivery_at )
234+ . then_with ( || self . tag . value ( ) . cmp ( & other. tag . value ( ) ) )
235+ }
236+ }
237+
238+ impl PartialOrd for RetryEntry {
239+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
240+ Some ( self . cmp ( other) )
241+ }
242+ }
243+
200244#[ derive( Debug , Clone ) ]
201245struct WalMessageRecord {
202246 topic : String ,
@@ -275,7 +319,10 @@ impl SubscriberQueue {
275319 inner : Mutex :: new ( SubscriberQueueInner {
276320 next_tag : 1 ,
277321 pending : VecDeque :: new ( ) ,
322+ pending_entries : HashMap :: new ( ) ,
278323 inflight : HashMap :: new ( ) ,
324+ expiration_heap : BinaryHeap :: new ( ) ,
325+ retry_heap : BinaryHeap :: new ( ) ,
279326 } ) ,
280327 }
281328 }
@@ -291,9 +338,10 @@ impl SubscriberQueue {
291338 let mut inner = self . inner . lock ( ) ;
292339
293340 // Bound total number of stored messages.
294- while inner. pending . len ( ) + inner. inflight . len ( ) >= self . capacity {
295- inner. pending . pop_front ( ) ;
296- if inner. pending . is_empty ( ) {
341+ while inner. pending_entries . len ( ) + inner. inflight . len ( ) >= self . capacity {
342+ if let Some ( tag) = inner. pending . pop_front ( ) {
343+ inner. pending_entries . remove ( & tag) ;
344+ } else {
297345 break ;
298346 }
299347 }
@@ -308,7 +356,7 @@ impl SubscriberQueue {
308356
309357 let now = std:: time:: Instant :: now ( ) ;
310358
311- inner . pending . push_back ( QueueEntry {
359+ let entry = QueueEntry {
312360 tag,
313361 payload,
314362 qos : effective_qos,
@@ -317,13 +365,28 @@ impl SubscriberQueue {
317365 ttl,
318366 delivery_attempts : 0 ,
319367 next_delivery_at : now,
320- } ) ;
368+ } ;
369+
370+ if let Some ( ttl) = entry. ttl {
371+ inner. expiration_heap . push ( Reverse ( ExpirationEntry {
372+ expires_at : entry. created_at + ttl,
373+ tag : entry. tag ,
374+ } ) ) ;
375+ }
376+
377+ inner. pending . push_back ( entry. tag ) ;
378+ inner. pending_entries . insert ( entry. tag , entry) ;
321379 }
322380
323381 #[ inline( always) ]
324382 fn dequeue ( & self , base_delay : Duration ) -> Option < ( QueueEntry , Option < DeliveryTag > ) > {
325383 let mut inner = self . inner . lock ( ) ;
326- let mut entry = inner. pending . pop_front ( ) ?;
384+ let mut entry = loop {
385+ let tag = inner. pending . pop_front ( ) ?;
386+ if let Some ( entry) = inner. pending_entries . remove ( & tag) {
387+ break entry;
388+ }
389+ } ;
327390
328391 let span = tracing:: trace_span!( "subscriber_dequeue" , qos = ?entry. qos) ;
329392 let _guard = span. enter ( ) ;
@@ -346,6 +409,10 @@ impl SubscriberQueue {
346409
347410 let tag = Some ( entry. tag ) ;
348411 inner. inflight . insert ( entry. tag , entry. clone ( ) ) ;
412+ inner. retry_heap . push ( Reverse ( RetryEntry {
413+ next_delivery_at : entry. next_delivery_at ,
414+ tag : entry. tag ,
415+ } ) ) ;
349416 Some ( ( entry, tag) )
350417 }
351418 }
@@ -354,7 +421,10 @@ impl SubscriberQueue {
354421 #[ allow( dead_code) ]
355422 fn peek ( & self ) -> Option < Bytes > {
356423 let inner = self . inner . lock ( ) ;
357- inner. pending . front ( ) . map ( |e| e. payload . clone ( ) )
424+ inner
425+ . pending
426+ . iter ( )
427+ . find_map ( |tag| inner. pending_entries . get ( tag) . map ( |e| e. payload . clone ( ) ) )
358428 }
359429
360430 fn ack ( & self , tag : DeliveryTag ) -> bool {
@@ -367,51 +437,70 @@ impl SubscriberQueue {
367437 inner. inflight . len ( )
368438 }
369439
440+ fn expiration_heap_len ( & self ) -> usize {
441+ let inner = self . inner . lock ( ) ;
442+ inner. expiration_heap . len ( )
443+ }
444+
445+ fn retry_heap_len ( & self ) -> usize {
446+ let inner = self . inner . lock ( ) ;
447+ inner. retry_heap . len ( )
448+ }
449+
370450 fn maintenance_tick ( & self , now : std:: time:: Instant , max_retries : u32 , _base_delay : Duration ) {
371451 let mut inner = self . inner . lock ( ) ;
372452
373- // Drop expired messages from pending.
374- inner. pending . retain ( |entry| {
375- if let Some ( ttl) = entry. ttl {
376- entry. created_at + ttl > now
377- } else {
378- true
453+ while let Some ( Reverse ( expiration) ) = inner. expiration_heap . peek ( ) . copied ( ) {
454+ if expiration. expires_at > now {
455+ break ;
379456 }
380- } ) ;
381-
382- // Drop expired messages from inflight.
383- inner. inflight . retain ( |_, entry| {
384- if let Some ( ttl) = entry. ttl {
385- entry. created_at + ttl > now
386- } else {
387- true
457+ inner. expiration_heap . pop ( ) ;
458+
459+ if let Some ( entry) = inner. pending_entries . get ( & expiration. tag ) {
460+ if entry
461+ . ttl
462+ . map ( |ttl| entry. created_at + ttl == expiration. expires_at )
463+ . unwrap_or ( false )
464+ {
465+ inner. pending_entries . remove ( & expiration. tag ) ;
466+ }
467+ continue ;
388468 }
389- } ) ;
390-
391- // Collect inflight messages that should be retried or dropped.
392- let mut to_retry = Vec :: new ( ) ;
393- let mut to_drop = Vec :: new ( ) ;
394469
395- for ( tag, entry) in inner. inflight . iter ( ) {
396- if now >= entry. next_delivery_at {
397- if entry. delivery_attempts < max_retries {
398- to_retry. push ( * tag) ;
399- } else {
400- to_drop. push ( * tag) ;
470+ if let Some ( entry) = inner. inflight . get ( & expiration. tag ) {
471+ if entry
472+ . ttl
473+ . map ( |ttl| entry. created_at + ttl == expiration. expires_at )
474+ . unwrap_or ( false )
475+ {
476+ inner. inflight . remove ( & expiration. tag ) ;
401477 }
402478 }
403479 }
404480
405- for tag in to_drop {
406- inner. inflight . remove ( & tag) ;
407- }
481+ while let Some ( Reverse ( retry) ) = inner. retry_heap . peek ( ) . copied ( ) {
482+ if retry. next_delivery_at > now {
483+ break ;
484+ }
485+ inner. retry_heap . pop ( ) ;
486+
487+ let Some ( entry) = inner. inflight . get ( & retry. tag ) else {
488+ continue ;
489+ } ;
490+
491+ if entry. next_delivery_at != retry. next_delivery_at {
492+ continue ;
493+ }
494+
495+ if entry. delivery_attempts >= max_retries {
496+ inner. inflight . remove ( & retry. tag ) ;
497+ continue ;
498+ }
408499
409- for tag in to_retry {
410- if let Some ( mut entry) = inner. inflight . remove ( & tag) {
411- // Reset next_delivery_at; it will be set when the message is
412- // delivered again via `dequeue`.
500+ if let Some ( mut entry) = inner. inflight . remove ( & retry. tag ) {
413501 entry. next_delivery_at = now;
414- inner. pending . push_back ( entry) ;
502+ inner. pending . push_back ( entry. tag ) ;
503+ inner. pending_entries . insert ( entry. tag , entry) ;
415504 }
416505 }
417506 }
@@ -640,6 +729,22 @@ impl Broker {
640729 . sum ( )
641730 }
642731
732+ pub fn expiration_heap_size ( & self ) -> usize {
733+ let subscriptions = self . subscriptions . read ( ) ;
734+ subscriptions
735+ . values ( )
736+ . map ( |sub_ref| sub_ref. subscriber . queue . expiration_heap_len ( ) )
737+ . sum ( )
738+ }
739+
740+ pub fn retry_heap_size ( & self ) -> usize {
741+ let subscriptions = self . subscriptions . read ( ) ;
742+ subscriptions
743+ . values ( )
744+ . map ( |sub_ref| sub_ref. subscriber . queue . retry_heap_len ( ) )
745+ . sum ( )
746+ }
747+
643748 /// Perform periodic maintenance such as TTL expiration and retry
644749 /// scheduling. Intended to be called from a Tokio interval in the
645750 /// daemon.
0 commit comments