@@ -26,7 +26,9 @@ use mz_compute_types::sinks::{
2626} ;
2727use mz_compute_types:: sources:: SourceInstanceDesc ;
2828use mz_compute_types:: ComputeInstanceId ;
29- use mz_controller_types:: dyncfgs:: WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL ;
29+ use mz_controller_types:: dyncfgs:: {
30+ WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL , WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL ,
31+ } ;
3032use mz_dyncfg:: ConfigSet ;
3133use mz_expr:: RowSetFinishing ;
3234use mz_ore:: cast:: CastFrom ;
@@ -35,9 +37,10 @@ use mz_ore::now::NowFn;
3537use mz_ore:: tracing:: OpenTelemetryContext ;
3638use mz_ore:: { soft_assert_or_log, soft_panic_or_log} ;
3739use mz_repr:: adt:: interval:: Interval ;
40+ use mz_repr:: adt:: timestamp:: CheckedTimestamp ;
3841use mz_repr:: refresh_schedule:: RefreshSchedule ;
39- use mz_repr:: { Datum , Diff , GlobalId , Row } ;
40- use mz_storage_client:: controller:: IntrospectionType ;
42+ use mz_repr:: { Datum , DatumMap , Diff , GlobalId , Row } ;
43+ use mz_storage_client:: controller:: { IntrospectionType , WallclockLagHistogramPeriod } ;
4144use mz_storage_types:: read_holds:: { self , ReadHold } ;
4245use mz_storage_types:: read_policy:: ReadPolicy ;
4346use serde:: Serialize ;
@@ -291,6 +294,8 @@ pub(super) struct Instance<T: ComputeControllerTimestamp> {
291294 wallclock_lag : WallclockLagFn < T > ,
292295 /// The last time `WallclockLagHistory` introspection was refreshed.
293296 wallclock_lag_history_last_refresh : Instant ,
297+ /// The last time `WallclockLagHistogram` introspection was refreshed.
298+ wallclock_lag_histogram_last_refresh : Instant ,
294299
295300 /// Sender for updates to collection read holds.
296301 ///
@@ -555,17 +560,20 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
555560 let refresh_history = !self . read_only
556561 && self . wallclock_lag_history_last_refresh . elapsed ( )
557562 >= WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL . get ( & self . dyncfg ) ;
563+ let refresh_histogram = !self . read_only
564+ && self . wallclock_lag_histogram_last_refresh . elapsed ( )
565+ >= WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL . get ( & self . dyncfg ) ;
558566
559- let mut history_updates = Vec :: new ( ) ;
560-
561- let now = mz_ore:: now:: to_datetime ( ( self . now ) ( ) ) ;
562- let now_tz = now. try_into ( ) . expect ( "must fit" ) ;
567+ let now_ms = ( self . now ) ( ) ;
568+ let now_dt = mz_ore:: now:: to_datetime ( now_ms) ;
569+ let now_ts: CheckedTimestamp < _ > = now_dt. try_into ( ) . expect ( "must fit" ) ;
563570
564571 let frontier_lag = |frontier : & Antichain < _ > | match frontier. as_option ( ) {
565572 Some ( ts) => ( self . wallclock_lag ) ( ts) ,
566573 None => Duration :: ZERO ,
567574 } ;
568575
576+ let mut history_updates = Vec :: new ( ) ;
569577 for ( replica_id, replica) in & mut self . replicas {
570578 for ( collection_id, collection) in & mut replica. collections {
571579 let lag = frontier_lag ( & collection. write_frontier ) ;
@@ -580,7 +588,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
580588 Datum :: String ( & collection_id. to_string ( ) ) ,
581589 Datum :: String ( & replica_id. to_string ( ) ) ,
582590 Datum :: Interval ( Interval :: new ( 0 , 0 , max_lag_us) ) ,
583- Datum :: TimestampTz ( now_tz ) ,
591+ Datum :: TimestampTz ( now_ts ) ,
584592 ] ) ;
585593 history_updates. push ( ( row, 1 ) ) ;
586594 }
@@ -591,14 +599,46 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
591599 } ;
592600 }
593601 }
594-
595602 if !history_updates. is_empty ( ) {
596603 self . deliver_introspection_updates (
597604 IntrospectionType :: WallclockLagHistory ,
598605 history_updates,
599606 ) ;
600607 self . wallclock_lag_history_last_refresh = Instant :: now ( ) ;
601608 }
609+
610+ let histogram_period = WallclockLagHistogramPeriod :: from_epoch_millis ( now_ms, & self . dyncfg ) ;
611+
612+ let mut histogram_updates = Vec :: new ( ) ;
613+ for ( collection_id, collection) in & mut self . collections {
614+ if let Some ( stash) = & mut collection. wallclock_lag_histogram_stash {
615+ let lag = collection. shared . lock_write_frontier ( |f| frontier_lag ( f) ) ;
616+ let bucket = lag. as_secs ( ) . next_power_of_two ( ) ;
617+
618+ let key = ( histogram_period, bucket) ;
619+ * stash. entry ( key) . or_default ( ) += 1 ;
620+
621+ if refresh_histogram {
622+ for ( ( period, lag) , count) in std:: mem:: take ( stash) {
623+ let row = Row :: pack_slice ( & [
624+ Datum :: String ( & collection_id. to_string ( ) ) ,
625+ Datum :: UInt64 ( lag) ,
626+ Datum :: TimestampTz ( period. start ) ,
627+ Datum :: TimestampTz ( period. end ) ,
628+ Datum :: Map ( DatumMap :: empty ( ) ) ,
629+ ] ) ;
630+ histogram_updates. push ( ( row, count) ) ;
631+ }
632+ }
633+ }
634+ }
635+ if !histogram_updates. is_empty ( ) {
636+ self . deliver_introspection_updates (
637+ IntrospectionType :: WallclockLagHistogram ,
638+ histogram_updates,
639+ ) ;
640+ self . wallclock_lag_histogram_last_refresh = Instant :: now ( ) ;
641+ }
602642 }
603643
604644 /// Report updates (inserts or retractions) to the identified collection's dependencies.
@@ -819,6 +859,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
819859 now : _,
820860 wallclock_lag : _,
821861 wallclock_lag_history_last_refresh,
862+ wallclock_lag_histogram_last_refresh,
822863 read_hold_tx : _,
823864 replica_tx : _,
824865 replica_rx : _,
@@ -854,6 +895,8 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
854895 . map ( |( id, epoch) | ( id. to_string ( ) , epoch) )
855896 . collect ( ) ;
856897 let wallclock_lag_history_last_refresh = format ! ( "{wallclock_lag_history_last_refresh:?}" ) ;
898+ let wallclock_lag_histogram_last_refresh =
899+ format ! ( "{wallclock_lag_histogram_last_refresh:?}" ) ;
857900
858901 let map = serde_json:: Map :: from_iter ( [
859902 field ( "initialized" , initialized) ?,
@@ -869,6 +912,10 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
869912 "wallclock_lag_history_last_refresh" ,
870913 wallclock_lag_history_last_refresh,
871914 ) ?,
915+ field (
916+ "wallclock_lag_histogram_last_refresh" ,
917+ wallclock_lag_histogram_last_refresh,
918+ ) ?,
872919 ] ) ;
873920 Ok ( serde_json:: Value :: Object ( map) )
874921 }
@@ -934,6 +981,7 @@ where
934981 now,
935982 wallclock_lag,
936983 wallclock_lag_history_last_refresh : Instant :: now ( ) ,
984+ wallclock_lag_histogram_last_refresh : Instant :: now ( ) ,
937985 read_hold_tx,
938986 replica_tx,
939987 replica_rx,
@@ -2155,6 +2203,12 @@ struct CollectionState<T: ComputeControllerTimestamp> {
21552203
21562204 /// Introspection state associated with this collection.
21572205 introspection : CollectionIntrospection < T > ,
2206+
2207+ /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
2208+ /// introspection update.
2209+ ///
2210+ /// If this is `None`, wallclock lag is not tracked for this collection.
2211+ wallclock_lag_histogram_stash : Option < BTreeMap < ( WallclockLagHistogramPeriod , u64 ) , i64 > > ,
21582212}
21592213
21602214impl < T : ComputeControllerTimestamp > CollectionState < T > {
@@ -2189,6 +2243,14 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
21892243 c. update_iter ( updates) ;
21902244 } ) ;
21912245
2246+ // In an effort to keep the produced wallclock lag introspection data small and
2247+ // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
2248+ // select indexes and subscribes.
2249+ let wallclock_lag_histogram_stash = match collection_id. is_transient ( ) {
2250+ true => None ,
2251+ false => Some ( Default :: default ( ) ) ,
2252+ } ;
2253+
21922254 Self {
21932255 log_collection : false ,
21942256 dropped : false ,
@@ -2200,6 +2262,7 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
22002262 storage_dependencies,
22012263 compute_dependencies,
22022264 introspection,
2265+ wallclock_lag_histogram_stash,
22032266 }
22042267 }
22052268
0 commit comments