@@ -86,7 +86,7 @@ pub struct RollupNodeManager<
8686 /// The engine driver used to communicate with the engine.
8787 engine : EngineDriver < EC , CS , P > ,
8888 /// The derivation pipeline, used to derive payload attributes from batches.
89- derivation_pipeline : Option < DerivationPipeline < L1P > > ,
89+ derivation_pipeline : DerivationPipeline < L1P > ,
9090 /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
9191 l1_notification_rx : Option < ReceiverStream < Arc < L1Notification > > > ,
9292 /// An indexer used to index data for the rollup node.
@@ -153,7 +153,7 @@ where
153153 pub fn new (
154154 network : ScrollNetworkManager < N > ,
155155 engine : EngineDriver < EC , CS , P > ,
156- l1_provider : Option < L1P > ,
156+ l1_provider : L1P ,
157157 database : Arc < Database > ,
158158 l1_notification_rx : Option < Receiver < Arc < L1Notification > > > ,
159159 consensus : Box < dyn Consensus > ,
@@ -165,8 +165,7 @@ where
165165 ) -> ( Self , RollupManagerHandle ) {
166166 let ( handle_tx, handle_rx) = mpsc:: channel ( EVENT_CHANNEL_SIZE ) ;
167167 let indexer = Indexer :: new ( database. clone ( ) , chain_spec. clone ( ) ) ;
168- let derivation_pipeline =
169- l1_provider. map ( |provider| DerivationPipeline :: new ( provider, database) ) ;
168+ let derivation_pipeline = DerivationPipeline :: new ( l1_provider, database) ;
170169 let rnm = Self {
171170 handle_rx,
172171 chain_spec,
@@ -240,17 +239,13 @@ where
240239 IndexerEvent :: BatchCommitIndexed { batch_info, safe_head, l1_block_number } => {
241240 // if we detected a batch revert event, we reset the pipeline and the engine driver.
242241 if let Some ( new_safe_head) = safe_head {
243- if let Some ( pipeline) = self . derivation_pipeline . as_mut ( ) {
244- pipeline. flush ( )
245- }
242+ self . derivation_pipeline . flush ( ) ;
246243 self . engine . clear_l1_payload_attributes ( ) ;
247244 self . engine . set_head_block_info ( new_safe_head) ;
248245 self . engine . set_safe_block_info ( new_safe_head) ;
249246 }
250247 // push the batch info into the derivation pipeline.
251- if let Some ( pipeline) = & mut self . derivation_pipeline {
252- pipeline. handle_batch_commit ( batch_info, l1_block_number) ;
253- }
248+ self . derivation_pipeline . handle_batch_commit ( batch_info, l1_block_number) ;
254249 }
255250 IndexerEvent :: BatchFinalizationIndexed ( _, Some ( finalized_block) ) => {
256251 // update the fcs on new finalized block.
@@ -285,9 +280,7 @@ where
285280 }
286281
287282 // Handle the reorg in the derivation pipeline.
288- if let Some ( pipeline) = self . derivation_pipeline . as_mut ( ) {
289- pipeline. handle_reorg ( l1_block_number) ;
290- }
283+ self . derivation_pipeline . handle_reorg ( l1_block_number) ;
291284 }
292285 IndexerEvent :: L1MessageIndexed ( index) => {
293286 if let Some ( event_sender) = self . event_sender . as_ref ( ) {
@@ -520,9 +513,7 @@ where
520513 ) ;
521514
522515 // Poll Derivation Pipeline and push attribute in queue if any.
523- while let Some ( Poll :: Ready ( Some ( attributes) ) ) =
524- this. derivation_pipeline . as_mut ( ) . map ( |f| f. poll_next_unpin ( cx) )
525- {
516+ while let Poll :: Ready ( Some ( attributes) ) = this. derivation_pipeline . poll_next_unpin ( cx) {
526517 this. engine . handle_l1_consolidation ( attributes)
527518 }
528519
0 commit comments