@@ -3,7 +3,7 @@ use crate::DatabaseConnectionProvider;
33use alloy_primitives:: B256 ;
44use futures:: { Stream , StreamExt } ;
55use rollup_node_primitives:: {
6- BatchCommitData , BatchInfo , BlockInfo , L1MessageEnvelope , L2BlockInfoWithL1Messages ,
6+ BatchCommitData , BatchInfo , BlockInfo , L1MessageEnvelope , L2BlockInfoWithL1Messages , Metadata ,
77} ;
88use scroll_alloy_rpc_types_engine:: BlockDataHint ;
99use sea_orm:: {
@@ -19,8 +19,20 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
1919 async fn insert_batch ( & self , batch_commit : BatchCommitData ) -> Result < ( ) , DatabaseError > {
2020 tracing:: trace!( target: "scroll::db" , batch_hash = ?batch_commit. hash, batch_index = batch_commit. index, "Inserting batch input into database." ) ;
2121 let batch_commit: models:: batch_commit:: ActiveModel = batch_commit. into ( ) ;
22- batch_commit. insert ( self . get_connection ( ) ) . await ?;
23- Ok ( ( ) )
22+ Ok ( models:: batch_commit:: Entity :: insert ( batch_commit)
23+ . on_conflict (
24+ OnConflict :: column ( models:: batch_commit:: Column :: Index )
25+ . update_columns ( vec ! [
26+ models:: batch_commit:: Column :: Hash ,
27+ models:: batch_commit:: Column :: BlockNumber ,
28+ models:: batch_commit:: Column :: BlockTimestamp ,
29+ models:: batch_commit:: Column :: FinalizedBlockNumber ,
30+ ] )
31+ . to_owned ( ) ,
32+ )
33+ . exec ( self . get_connection ( ) )
34+ . await
35+ . map ( |_| ( ) ) ?)
2436 }
2537
2638 /// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
@@ -68,6 +80,37 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
6880 . map ( |x| x. map ( Into :: into) ) ?)
6981 }
7082
83+ /// Set the latest finalized L1 block number.
84+ async fn set_latest_finalized_l1_block_number (
85+ & self ,
86+ block_number : u64 ,
87+ ) -> Result < ( ) , DatabaseError > {
88+ tracing:: trace!( target: "scroll::db" , block_number, "Updating the latest finalized L1 block number in the database." ) ;
89+ let metadata: models:: metadata:: ActiveModel =
90+ Metadata { l1_finalized_block : block_number } . into ( ) ;
91+ Ok ( models:: metadata:: Entity :: insert ( metadata)
92+ . on_conflict (
93+ OnConflict :: column ( models:: metadata:: Column :: Key )
94+ . update_column ( models:: metadata:: Column :: Value )
95+ . to_owned ( ) ,
96+ )
97+ . exec ( self . get_connection ( ) )
98+ . await
99+ . map ( |_| ( ) ) ?)
100+ }
101+
102+ /// Get the finalized L1 block number from the database.
103+ async fn get_finalized_l1_block_number ( & self ) -> Result < Option < u64 > , DatabaseError > {
104+ Ok ( models:: metadata:: Entity :: find ( )
105+ . filter ( models:: metadata:: Column :: Key . eq ( "l1_finalized_block" ) )
106+ . select_only ( )
107+ . column ( models:: metadata:: Column :: Value )
108+ . into_tuple :: < String > ( )
109+ . one ( self . get_connection ( ) )
110+ . await
111+ . map ( |x| x. and_then ( |x| x. parse :: < u64 > ( ) . ok ( ) ) ) ?)
112+ }
113+
71114 /// Get the newest finalized batch hash up to or at the provided height.
72115 async fn get_finalized_batch_hash_at_height (
73116 & self ,
@@ -113,7 +156,23 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
113156 async fn insert_l1_message ( & self , l1_message : L1MessageEnvelope ) -> Result < ( ) , DatabaseError > {
114157 tracing:: trace!( target: "scroll::db" , queue_index = l1_message. transaction. queue_index, "Inserting L1 message into database." ) ;
115158 let l1_message: models:: l1_message:: ActiveModel = l1_message. into ( ) ;
116- l1_message. insert ( self . get_connection ( ) ) . await ?;
159+ models:: l1_message:: Entity :: insert ( l1_message)
160+ . on_conflict (
161+ OnConflict :: column ( models:: l1_message:: Column :: QueueIndex )
162+ . update_columns ( vec ! [
163+ models:: l1_message:: Column :: QueueHash ,
164+ models:: l1_message:: Column :: Hash ,
165+ models:: l1_message:: Column :: L1BlockNumber ,
166+ models:: l1_message:: Column :: GasLimit ,
167+ models:: l1_message:: Column :: To ,
168+ models:: l1_message:: Column :: Value ,
169+ models:: l1_message:: Column :: Sender ,
170+ models:: l1_message:: Column :: Input ,
171+ ] )
172+ . to_owned ( ) ,
173+ )
174+ . exec ( self . get_connection ( ) )
175+ . await ?;
117176 Ok ( ( ) )
118177 }
119178
@@ -208,15 +267,25 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
208267 } ) ?)
209268 }
210269
211- /// Get the latest safe L2 [`BlockInfo`] from the database.
212- async fn get_latest_safe_l2_block ( & self ) -> Result < Option < BlockInfo > , DatabaseError > {
270+ /// Get the latest safe L2 ([`BlockInfo`], [`BatchInfo`]) from the database.
271+ async fn get_latest_safe_l2_info (
272+ & self ,
273+ ) -> Result < Option < ( BlockInfo , BatchInfo ) > , DatabaseError > {
213274 tracing:: trace!( target: "scroll::db" , "Fetching latest safe L2 block from database." ) ;
214275 Ok ( models:: l2_block:: Entity :: find ( )
215276 . filter ( models:: l2_block:: Column :: BatchIndex . is_not_null ( ) )
216277 . order_by_desc ( models:: l2_block:: Column :: BlockNumber )
217278 . one ( self . get_connection ( ) )
218279 . await
219- . map ( |x| x. map ( |x| x. block_info ( ) ) ) ?)
280+ . map ( |x| {
281+ x. map ( |x| {
282+ (
283+ x. block_info ( ) ,
284+ x. batch_info ( )
285+ . expect ( "Batch info must be present due to database query arguments" ) ,
286+ )
287+ } )
288+ } ) ?)
220289 }
221290
222291 /// Get the latest L2 [`BlockInfo`] from the database.
@@ -229,6 +298,44 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
229298 . map ( |x| x. map ( |x| x. block_info ( ) ) ) ?)
230299 }
231300
301+ /// Prepare the database on startup and return metadata used for other components in the
302+ /// rollup-node.
303+ ///
304+ /// This method first unwinds the database to the finalized L1 block. It then fetches the batch
305+ /// info for the latest safe L2 block. It takes note of the L1 block number at which
306+ /// this batch was produced. It then retrieves the latest block for the previous batch
307+ /// (i.e., the batch before the latest safe block). It returns a tuple of this latest
308+ /// fetched block and the L1 block number of the batch.
309+ async fn prepare_on_startup (
310+ & self ,
311+ genesis_hash : B256 ,
312+ ) -> Result < ( Option < BlockInfo > , Option < u64 > ) , DatabaseError > {
313+ tracing:: trace!( target: "scroll::db" , "Fetching startup safe block from database." ) ;
314+ let finalized_block_number = self . get_finalized_l1_block_number ( ) . await ?. unwrap_or ( 0 ) ;
315+ self . unwind ( genesis_hash, finalized_block_number) . await ?;
316+ let safe = if let Some ( batch_info) = self
317+ . get_latest_safe_l2_info ( )
318+ . await ?
319+ . map ( |( _, batch_info) | batch_info)
320+ . filter ( |b| b. index > 1 )
321+ {
322+ let batch = self
323+ . get_batch_by_index ( batch_info. index )
324+ . await ?
325+ . expect ( "Batch info must be present due to database query arguments" ) ;
326+ let previous_batch = self
327+ . get_batch_by_index ( batch_info. index - 1 )
328+ . await ?
329+ . expect ( "Batch info must be present due to database query arguments" ) ;
330+ let l2_block = self . get_highest_block_for_batch ( previous_batch. hash ) . await ?;
331+ ( l2_block, Some ( batch. block_number ) )
332+ } else {
333+ ( None , None )
334+ } ;
335+
336+ Ok ( safe)
337+ }
338+
232339 /// Delete all L2 blocks with a block number greater than the provided block number.
233340 async fn delete_l2_blocks_gt ( & self , block_number : u64 ) -> Result < u64 , DatabaseError > {
234341 tracing:: trace!( target: "scroll::db" , block_number, "Deleting L2 blocks greater than provided block number." ) ;
@@ -312,6 +419,64 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
312419 Ok ( None )
313420 }
314421 }
422+
423+ /// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number.
424+ async fn unwind (
425+ & self ,
426+ genesis_hash : B256 ,
427+ l1_block_number : u64 ,
428+ ) -> Result < UnwindResult , DatabaseError > {
429+ // delete batch inputs and l1 messages
430+ let batches_removed = self . delete_batches_gt ( l1_block_number) . await ?;
431+ let deleted_messages = self . delete_l1_messages_gt ( l1_block_number) . await ?;
432+
433+ // filter and sort the executed L1 messages
434+ let mut removed_executed_l1_messages: Vec < _ > =
435+ deleted_messages. into_iter ( ) . filter ( |x| x. l2_block_number . is_some ( ) ) . collect ( ) ;
436+ removed_executed_l1_messages
437+ . sort_by ( |a, b| a. transaction . queue_index . cmp ( & b. transaction . queue_index ) ) ;
438+
439+ // check if we need to reorg the L2 head and delete some L2 blocks
440+ let ( queue_index, l2_head_block_info) =
441+ if let Some ( msg) = removed_executed_l1_messages. first ( ) {
442+ let l2_reorg_block_number = msg
443+ . l2_block_number
444+ . expect ( "we guarantee that this is Some(u64) due to the filter above" ) -
445+ 1 ;
446+ let l2_block_info = self . get_l2_block_info_by_number ( l2_reorg_block_number) . await ?;
447+ self . delete_l2_blocks_gt ( l2_reorg_block_number) . await ?;
448+ ( Some ( msg. transaction . queue_index ) , l2_block_info)
449+ } else {
450+ ( None , None )
451+ } ;
452+
453+ // check if we need to reorg the L2 safe block
454+ let l2_safe_block_info = if batches_removed > 0 {
455+ if let Some ( x) = self . get_latest_safe_l2_info ( ) . await ? {
456+ Some ( x. 0 )
457+ } else {
458+ Some ( BlockInfo :: new ( 0 , genesis_hash) )
459+ }
460+ } else {
461+ None
462+ } ;
463+
464+ // commit the transaction
465+ Ok ( UnwindResult { l1_block_number, queue_index, l2_head_block_info, l2_safe_block_info } )
466+ }
467+ }
468+
469+ /// The result of [`DatabaseOperations::unwind`].
470+ #[ derive( Debug ) ]
471+ pub struct UnwindResult {
472+ /// The L1 block number that we unwinded to.
473+ pub l1_block_number : u64 ,
474+ /// The latest unconsumed queue index after the uwnind.
475+ pub queue_index : Option < u64 > ,
476+ /// The L2 head block info after the unwind. This is only populated if the L2 head has reorged.
477+ pub l2_head_block_info : Option < BlockInfo > ,
478+ /// The L2 safe block info after the unwind. This is only populated if the L2 safe has reorged.
479+ pub l2_safe_block_info : Option < BlockInfo > ,
315480}
316481
317482impl < T > DatabaseOperations for T where T : DatabaseConnectionProvider { }
0 commit comments