@@ -30,6 +30,7 @@ use rollup_node_providers::{BlockDataProvider, L1Provider};
3030use scroll_alloy_rpc_types_engine:: { BlockDataHint , ScrollPayloadAttributes } ;
3131use scroll_codec:: Codec ;
3232use scroll_db:: { Database , DatabaseOperations } ;
33+ use tokio:: time:: Interval ;
3334
3435/// A future that resolves to a stream of [`ScrollPayloadAttributesWithBatchInfo`].
3536type DerivationPipelineFuture = Pin <
@@ -43,6 +44,9 @@ type DerivationPipelineFuture = Pin<
4344 > ,
4445> ;
4546
47+ /// The interval (in ms) at which the derivation pipeline should report queue size metrics.
48+ const QUEUE_METRICS_INTERVAL : u64 = 1000 ;
49+
4650/// A structure holding the current unresolved futures for the derivation pipeline.
4751pub struct DerivationPipeline < P > {
4852 /// The current derivation pipeline futures polled.
@@ -59,6 +63,8 @@ pub struct DerivationPipeline<P> {
5963 waker : Option < Waker > ,
6064 /// The metrics of the pipeline.
6165 metrics : DerivationPipelineMetrics ,
66+ /// The interval at which the derivation pipeline should report queue size metrics.
67+ queue_metrics_interval : Interval ,
6268}
6369
6470impl < P : Debug > Debug for DerivationPipeline < P > {
9298 attributes_queue : Default :: default ( ) ,
9399 waker : None ,
94100 metrics : DerivationPipelineMetrics :: default ( ) ,
101+ queue_metrics_interval : delayed_interval ( QUEUE_METRICS_INTERVAL ) ,
95102 }
96103 }
97104
@@ -160,6 +167,12 @@ where
160167 self . batch_queue . clear ( ) ;
161168 self . pipeline_future = None ;
162169 }
170+
171+ /// Emits the queue size metrics for the batch and payload attributes queues.
172+ fn emit_queue_gauges ( & self ) {
173+ self . metrics . batch_queue_size . set ( self . batch_queue . len ( ) as f64 ) ;
174+ self . metrics . payload_attributes_queue_size . set ( self . attributes_queue . len ( ) as f64 ) ;
175+ }
163176}
164177
165178impl < P > Stream for DerivationPipeline < P >
@@ -171,6 +184,11 @@ where
171184 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
172185 let this = self . get_mut ( ) ;
173186
187+ // report queue size metrics if the interval has elapsed.
188+ while this. queue_metrics_interval . poll_tick ( cx) . is_ready ( ) {
189+ this. emit_queue_gauges ( ) ;
190+ }
191+
174192 // return attributes from the queue if any.
175193 if let Some ( attribute) = this. attributes_queue . pop_front ( ) {
176194 return Poll :: Ready ( Some ( attribute. inner ) )
@@ -295,6 +313,14 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
295313 Ok ( attributes)
296314}
297315
316+ /// Creates a delayed interval that will not skip ticks if the interval is missed but will delay
317+ /// the next tick until the interval has passed.
318+ fn delayed_interval ( interval : u64 ) -> Interval {
319+ let mut interval = tokio:: time:: interval ( tokio:: time:: Duration :: from_millis ( interval) ) ;
320+ interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Delay ) ;
321+ interval
322+ }
323+
298324#[ cfg( test) ]
299325mod tests {
300326 use super :: * ;
@@ -388,6 +414,7 @@ mod tests {
388414 . into ( ) ,
389415 waker : None ,
390416 metrics : Default :: default ( ) ,
417+ queue_metrics_interval : delayed_interval ( QUEUE_METRICS_INTERVAL ) ,
391418 } ;
392419
393420 // flush and verify all relevant fields are emptied.
@@ -810,6 +837,7 @@ mod tests {
810837 attributes_queue : attributes,
811838 waker : None ,
812839 metrics : Default :: default ( ) ,
840+ queue_metrics_interval : delayed_interval ( QUEUE_METRICS_INTERVAL ) ,
813841 }
814842 }
815843
0 commit comments