@@ -34,13 +34,7 @@ use scroll_engine::Engine;
3434use scroll_network:: {
3535 BlockImportOutcome , NewBlockWithPeer , ScrollNetwork , ScrollNetworkManagerEvent ,
3636} ;
37- use std:: {
38- collections:: { HashMap , VecDeque } ,
39- sync:: Arc ,
40- time:: Instant ,
41- vec,
42- } ;
43- use strum:: IntoEnumIterator ;
37+ use std:: { collections:: VecDeque , sync:: Arc , time:: Instant , vec} ;
4438use tokio:: sync:: mpsc:: { self , Receiver , UnboundedReceiver } ;
4539
4640mod config;
@@ -50,7 +44,7 @@ mod consensus;
5044pub use consensus:: { Consensus , NoopConsensus , SystemContractConsensus } ;
5145
5246mod consolidation;
53- use consolidation:: reconcile_batch;
47+ use consolidation:: { reconcile_batch, BlockConsolidationAction } ;
5448
5549mod event;
5650pub use event:: ChainOrchestratorEvent ;
@@ -62,15 +56,26 @@ mod handle;
6256pub use handle:: { ChainOrchestratorCommand , ChainOrchestratorHandle , DatabaseQuery } ;
6357
6458mod metrics;
65- pub use metrics:: { ChainOrchestratorItem , ChainOrchestratorMetrics } ;
59+ use metrics:: { MetricsHandler , Task } ;
6660
6761mod sync;
6862pub use sync:: { SyncMode , SyncState } ;
6963
7064mod status;
7165pub use status:: ChainOrchestratorStatus ;
7266
73- use crate :: consolidation:: BlockConsolidationAction ;
67+ /// Wraps a future, metering the completion of it.
68+ macro_rules! metered {
69+ ( $task: expr, $self: ident, $method: ident( $( $args: expr) ,* ) ) => {
70+ {
71+ let metric = $self. metric_handler. get( $task) . expect( "metric exists" ) . clone( ) ;
72+ let now = Instant :: now( ) ;
73+ let res =$self. $method( $( $args) ,* ) . await ;
74+ metric. task_duration. record( now. elapsed( ) . as_secs_f64( ) ) ;
75+ res
76+ }
77+ } ;
78+ }
7479
7580/// The mask used to mask the L1 message queue hash.
7681const L1_MESSAGE_QUEUE_HASH_MASK : B256 =
@@ -102,8 +107,6 @@ pub struct ChainOrchestrator<
102107 l2_client : Arc < L2P > ,
103108 /// The reference to database.
104109 database : Arc < Database > ,
105- /// The metrics for the chain orchestrator.
106- metrics : HashMap < ChainOrchestratorItem , ChainOrchestratorMetrics > ,
107110 /// The current sync state of the [`ChainOrchestrator`].
108111 sync_state : SyncState ,
109112 /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
@@ -122,6 +125,8 @@ pub struct ChainOrchestrator<
122125 derivation_pipeline : DerivationPipeline ,
123126 /// Optional event sender for broadcasting events to listeners.
124127 event_sender : Option < EventSender < ChainOrchestratorEvent > > ,
128+ /// The metrics handler.
129+ metric_handler : MetricsHandler ,
125130}
126131
127132impl <
@@ -155,12 +160,6 @@ impl<
155160 l2_client : Arc :: new ( l2_provider) ,
156161 database,
157162 config,
158- metrics : ChainOrchestratorItem :: iter ( )
159- . map ( |i| {
160- let label = i. as_str ( ) ;
161- ( i, ChainOrchestratorMetrics :: new_with_labels ( & [ ( "item" , label) ] ) )
162- } )
163- . collect ( ) ,
164163 sync_state : SyncState :: default ( ) ,
165164 l1_notification_rx,
166165 network,
@@ -171,6 +170,7 @@ impl<
171170 derivation_pipeline,
172171 handle_rx,
173172 event_sender : None ,
173+ metric_handler : MetricsHandler :: default ( ) ,
174174 } ,
175175 handle,
176176 ) )
@@ -211,7 +211,7 @@ impl<
211211 self . handle_outcome( res) ;
212212 }
213213 Some ( batch) = self . derivation_pipeline. next( ) => {
214- let res = self . handle_derived_batch( batch) . await ;
214+ let res = metered! ( Task :: BatchReconciliation , self , handle_derived_batch( batch) ) ;
215215 self . handle_outcome( res) ;
216216 }
217217 Some ( event) = self . network. events( ) . next( ) => {
@@ -266,7 +266,7 @@ impl<
266266 /// Handles an event from the sequencer.
267267 async fn handle_sequencer_event (
268268 & mut self ,
269- event : rollup_node_sequencer :: SequencerEvent ,
269+ event : SequencerEvent ,
270270 ) -> Result < Option < ChainOrchestratorEvent > , ChainOrchestratorError > {
271271 tracing:: info!( target: "scroll::chain_orchestrator" , ?event, "Handling sequencer event" ) ;
272272 match event {
@@ -277,6 +277,7 @@ impl<
277277 . map ( |s| & s. address )
278278 . expect ( "signer must be set if sequencer is present" ) ,
279279 ) {
280+ self . metric_handler . start_block_building_recording ( ) ;
280281 self . sequencer
281282 . as_mut ( )
282283 . expect ( "sequencer must be present" )
@@ -300,6 +301,7 @@ impl<
300301 . as_mut ( )
301302 . expect ( "signer must be present" )
302303 . sign_block ( block. clone ( ) ) ?;
304+ self . metric_handler . finish_block_building_recording ( ) ;
303305 return Ok ( Some ( ChainOrchestratorEvent :: BlockSequenced ( block) ) ) ;
304306 }
305307 }
@@ -508,30 +510,38 @@ impl<
508510 self . database . set_processed_l1_block_number ( block_number) . await ?;
509511 Ok ( None )
510512 }
511- L1Notification :: Reorg ( block_number) => self . handle_l1_reorg ( * block_number) . await ,
513+ L1Notification :: Reorg ( block_number) => {
514+ metered ! ( Task :: L1Reorg , self , handle_l1_reorg( * block_number) )
515+ }
512516 L1Notification :: Consensus ( update) => {
513517 self . consensus . update_config ( update) ;
514518 Ok ( None )
515519 }
516520 L1Notification :: NewBlock ( block_number) => self . handle_l1_new_block ( * block_number) . await ,
517521 L1Notification :: Finalized ( block_number) => {
518- self . handle_l1_finalized ( * block_number) . await
522+ metered ! ( Task :: L1Finalization , self , handle_l1_finalized( * block_number) )
523+ }
524+ L1Notification :: BatchCommit ( batch) => {
525+ metered ! ( Task :: BatchCommit , self , handle_batch_commit( batch. clone( ) ) )
519526 }
520- L1Notification :: BatchCommit ( batch) => self . handle_batch_commit ( batch. clone ( ) ) . await ,
521527 L1Notification :: L1Message { message, block_number, block_timestamp : _ } => {
522- self . handle_l1_message ( message. clone ( ) , * block_number) . await
528+ metered ! ( Task :: L1Message , self , handle_l1_message( message. clone( ) , * block_number) )
523529 }
524530 L1Notification :: Synced => {
525531 tracing:: info!( target: "scroll::chain_orchestrator" , "L1 is now synced" ) ;
526532 self . sync_state . l1_mut ( ) . set_synced ( ) ;
527533 if self . sync_state . is_synced ( ) {
528- self . consolidate_chain ( ) . await ?;
534+ metered ! ( Task :: ChainConsolidation , self , consolidate_chain( ) ) ?;
529535 }
530536 self . notify ( ChainOrchestratorEvent :: L1Synced ) ;
531537 Ok ( None )
532538 }
533539 L1Notification :: BatchFinalization { hash : _hash, index, block_number } => {
534- self . handle_l1_batch_finalization ( * index, * block_number) . await
540+ metered ! (
541+ Task :: BatchFinalization ,
542+ self ,
543+ handle_batch_finalization( * index, * block_number)
544+ )
535545 }
536546 }
537547 }
@@ -550,8 +560,6 @@ impl<
550560 & mut self ,
551561 block_number : u64 ,
552562 ) -> Result < Option < ChainOrchestratorEvent > , ChainOrchestratorError > {
553- let metric = self . metrics . get ( & ChainOrchestratorItem :: L1Reorg ) . expect ( "metric exists" ) ;
554- let now = Instant :: now ( ) ;
555563 let genesis_hash = self . config . chain_spec ( ) . genesis_hash ( ) ;
556564 let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } =
557565 self . database . unwind ( genesis_hash, block_number) . await ?;
@@ -594,8 +602,6 @@ impl<
594602 self . engine . update_fcs ( l2_head_block_info, l2_safe_block_info, None ) . await ?;
595603 }
596604
597- metric. task_duration . record ( now. elapsed ( ) . as_secs_f64 ( ) ) ;
598-
599605 let event = ChainOrchestratorEvent :: L1Reorg {
600606 l1_block_number,
601607 queue_index,
@@ -612,10 +618,6 @@ impl<
612618 & mut self ,
613619 block_number : u64 ,
614620 ) -> Result < Option < ChainOrchestratorEvent > , ChainOrchestratorError > {
615- let metric =
616- self . metrics . get ( & ChainOrchestratorItem :: L1Finalization ) . expect ( "metric exists" ) ;
617- let now = Instant :: now ( ) ;
618-
619621 let finalized_batches = self
620622 . database
621623 . tx_mut ( move |tx| async move {
@@ -632,8 +634,6 @@ impl<
632634 self . derivation_pipeline . push_batch ( Arc :: new ( * batch) ) . await ;
633635 }
634636
635- metric. task_duration . record ( now. elapsed ( ) . as_secs_f64 ( ) ) ;
636-
637637 Ok ( Some ( ChainOrchestratorEvent :: L1BlockFinalized ( block_number, finalized_batches) ) )
638638 }
639639
@@ -642,9 +642,6 @@ impl<
642642 & self ,
643643 batch : BatchCommitData ,
644644 ) -> Result < Option < ChainOrchestratorEvent > , ChainOrchestratorError > {
645- let metric = self . metrics . get ( & ChainOrchestratorItem :: BatchCommit ) . expect ( "metric exists" ) ;
646- let now = Instant :: now ( ) ;
647-
648645 let event = self
649646 . database
650647 . tx_mut ( move |tx| {
@@ -682,13 +679,11 @@ impl<
682679 } )
683680 . await ?;
684681
685- metric. task_duration . record ( now. elapsed ( ) . as_secs_f64 ( ) ) ;
686-
687682 Ok ( event)
688683 }
689684
690685 /// Handles a batch finalization event by updating the batch input in the database.
691- async fn handle_l1_batch_finalization (
686+ async fn handle_batch_finalization (
692687 & mut self ,
693688 batch_index : u64 ,
694689 block_number : u64 ,
@@ -732,9 +727,6 @@ impl<
732727 l1_message : TxL1Message ,
733728 l1_block_number : u64 ,
734729 ) -> Result < Option < ChainOrchestratorEvent > , ChainOrchestratorError > {
735- let metric = self . metrics . get ( & ChainOrchestratorItem :: L1Message ) . expect ( "metric exists" ) ;
736- let now = Instant :: now ( ) ;
737-
738730 let event = ChainOrchestratorEvent :: L1MessageCommitted ( l1_message. queue_index ) ;
739731 let queue_hash = compute_l1_message_queue_hash (
740732 & self . database ,
@@ -770,35 +762,17 @@ impl<
770762 } )
771763 . await ?;
772764
773- metric. task_duration . record ( now. elapsed ( ) . as_secs_f64 ( ) ) ;
774-
775765 Ok ( Some ( event) )
776766 }
777767
778- // /// Wraps a pending chain orchestrator future, metering the completion of it.
779- // pub fn handle_metered(
780- // &mut self,
781- // item: ChainOrchestratorItem,
782- // chain_orchestrator_fut: PendingChainOrchestratorFuture,
783- // ) -> PendingChainOrchestratorFuture {
784- // let metric = self.metrics.get(&item).expect("metric exists").clone();
785- // let fut_wrapper = Box::pin(async move {
786- // let now = Instant::now();
787- // let res = chain_orchestrator_fut.await;
788- // metric.task_duration.record(now.elapsed().as_secs_f64());
789- // res
790- // });
791- // fut_wrapper
792- // }
793-
794768 async fn handle_network_event (
795769 & mut self ,
796770 event : ScrollNetworkManagerEvent ,
797771 ) -> Result < Option < ChainOrchestratorEvent > , ChainOrchestratorError > {
798772 match event {
799773 ScrollNetworkManagerEvent :: NewBlock ( block_with_peer) => {
800774 self . notify ( ChainOrchestratorEvent :: NewBlockReceived ( block_with_peer. clone ( ) ) ) ;
801- Ok ( self . handle_block_from_peer ( block_with_peer) . await ? )
775+ metered ! ( Task :: L2BlockImport , self , handle_block_from_peer( block_with_peer) )
802776 }
803777 }
804778 }
0 commit comments