@@ -322,69 +322,84 @@ impl SubscriptionManager {
322322 let from_ts = self . processed_ts . succ ( ) ?;
323323
324324 let mut to_notify = BTreeSet :: new ( ) ;
325- let mut buffer = IndexKeyBuffer :: new ( ) ;
326- self . log . for_each ( from_ts, next_ts, |_, writes| {
327- for ( _, document_change) in writes {
328- // We're applying a mutation to the document so if it already exists
329- // we need to remove it before writing the new version.
330- if let Some ( ref old_document) = document_change. old_document {
331- self . overlapping (
332- old_document,
333- & mut to_notify,
334- self . persistence_version ,
335- & mut buffer,
336- ) ;
325+ {
326+ let _timer = metrics:: subscriptions_log_iterate_timer ( ) ;
327+ let mut buffer = IndexKeyBuffer :: new ( ) ;
328+ let mut log_len = 0 ;
329+ let mut num_writes = 0 ;
330+ self . log . for_each ( from_ts, next_ts, |_, writes| {
331+ log_len += 1 ;
332+ num_writes += writes. len ( ) ;
333+ for ( _, document_change) in writes {
334+ // We're applying a mutation to the document so if it already exists
335+ // we need to remove it before writing the new version.
336+ if let Some ( ref old_document) = document_change. old_document {
337+ self . overlapping (
338+ old_document,
339+ & mut to_notify,
340+ self . persistence_version ,
341+ & mut buffer,
342+ ) ;
343+ }
344+ // If we're doing anything other than deleting the document then
345+ // we'll also need to insert a new value.
346+ if let Some ( ref new_document) = document_change. new_document {
347+ self . overlapping (
348+ new_document,
349+ & mut to_notify,
350+ self . persistence_version ,
351+ & mut buffer,
352+ ) ;
353+ }
337354 }
338- // If we're doing anything other than deleting the document then
339- // we'll also need to insert a new value.
340- if let Some ( ref new_document) = document_change. new_document {
341- self . overlapping (
342- new_document,
343- & mut to_notify,
344- self . persistence_version ,
345- & mut buffer,
346- ) ;
355+ } ) ?;
356+ metrics:: log_subscriptions_log_length ( log_len) ;
357+ metrics:: log_subscriptions_log_writes ( num_writes) ;
358+ }
359+
360+ {
361+ let _timer = metrics:: subscriptions_invalidate_timer ( ) ;
362+ // First, do a pass where we advance all of the valid subscriptions.
363+ for ( subscriber_id, subscriber) in & mut self . subscribers {
364+ if !to_notify. contains ( & subscriber_id) {
365+ subscriber
366+ . sender
367+ . valid_ts
368+ . store ( i64:: from ( next_ts) , Ordering :: SeqCst ) ;
347369 }
348370 }
349- } ) ?;
350-
351- // First, do a pass where we advance all of the valid subscriptions.
352- for ( subscriber_id, subscriber) in & mut self . subscribers {
353- if !to_notify. contains ( & subscriber_id) {
354- subscriber
355- . sender
356- . valid_ts
357- . store ( i64:: from ( next_ts) , Ordering :: SeqCst ) ;
371+ // Then, invalidate all the remaining subscriptions.
372+ let num_subscriptions_invalidated = to_notify. len ( ) ;
373+ let should_splay_invalidations =
374+ num_subscriptions_invalidated > * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD ;
375+ if should_splay_invalidations {
376+ tracing:: info!(
377+ "Splaying subscription invalidations since there are {} subscriptions to \
378+ invalidate. The threshold is {}",
379+ num_subscriptions_invalidated,
380+ * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD
381+ ) ;
358382 }
359- }
360- // Then, invalidate all the remaining subscriptions.
361- let num_subscriptions_invalidated = to_notify. len ( ) ;
362- let should_splay_invalidations =
363- num_subscriptions_invalidated > * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD ;
364- if should_splay_invalidations {
365- tracing:: info!(
366- "Splaying subscription invalidations since there are {} subscriptions to \
367- invalidate. The threshold is {}",
368- num_subscriptions_invalidated,
369- * SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD
370- ) ;
371- }
372- for subscriber_id in to_notify {
373- let delay = should_splay_invalidations. then ( || {
374- Duration :: from_millis ( rand:: random_range (
375- 0 ..=num_subscriptions_invalidated as u64
376- * * SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER ,
377- ) )
378- } ) ;
379- self . _remove ( subscriber_id, delay) ;
380- }
381- log_subscriptions_invalidated ( num_subscriptions_invalidated) ;
383+ for subscriber_id in to_notify {
384+ let delay = should_splay_invalidations. then ( || {
385+ Duration :: from_millis ( rand:: random_range (
386+ 0 ..=num_subscriptions_invalidated as u64
387+ * * SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER ,
388+ ) )
389+ } ) ;
390+ self . _remove ( subscriber_id, delay) ;
391+ }
392+ log_subscriptions_invalidated ( num_subscriptions_invalidated) ;
382393
383- assert ! ( self . processed_ts <= next_ts) ;
384- self . processed_ts = next_ts;
394+ assert ! ( self . processed_ts <= next_ts) ;
395+ self . processed_ts = next_ts;
396+ }
385397
386398 // Enforce retention after we have processed the subscriptions.
387- self . log . enforce_retention_policy ( next_ts) ;
399+ {
400+ let _timer = metrics:: subscriptions_log_enforce_retention_timer ( ) ;
401+ self . log . enforce_retention_policy ( next_ts) ;
402+ }
388403
389404 Ok ( ( ) )
390405 } )
0 commit comments