@@ -714,18 +714,25 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
714714 // Provides a stream over all L1 messages with increasing queue index starting that have
715715 // not been included in an L2 block and have a block number less than or equal to the
716716 // finalized L1 block number (they have been finalized on L1).
717- Some ( L1MessageKey :: NotIncluded ( NotIncludedStart :: Finalized ) ) => {
717+ Some ( L1MessageKey :: NotIncluded ( NotIncludedStart :: FinalizedWithBlockDepth ( depth ) ) ) => {
718718 // Lookup the finalized L1 block number.
719719 let finalized_block_number = self . get_finalized_l1_block_number ( ) . await ?;
720720
721+ // Calculate the target block number by subtracting the depth from the finalized
722+ // block number. If the depth is greater than the finalized block number, we return
723+ // None as there are no messages that satisfy the condition.
724+ let target_block_number =
725+ if let Some ( target_block_number) = finalized_block_number. checked_sub ( depth) {
726+ target_block_number
727+ } else {
728+ return Ok ( None ) ;
729+ } ;
730+
721731 // Create a filter condition for messages that have an L1 block number less than or
722732 // equal to the finalized block number and have not been included in an L2 block
723733 // (i.e. L2BlockNumber is null).
724734 let condition = Condition :: all ( )
725- . add (
726- models:: l1_message:: Column :: L1BlockNumber
727- . lte ( finalized_block_number as i64 ) ,
728- )
735+ . add ( models:: l1_message:: Column :: L1BlockNumber . lte ( target_block_number as i64 ) )
729736 . add ( models:: l1_message:: Column :: L2BlockNumber . is_null ( ) ) ;
730737 // Yield a stream of messages matching the condition ordered by increasing queue
731738 // index.
@@ -749,32 +756,28 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync {
749756 // Calculate the target block number by subtracting the depth from the latest block
750757 // number. If the depth is greater than the latest block number, we return None as
751758 // there are no messages that satisfy the condition.
752- let target_block_number = latest_block_number. checked_sub ( depth) ;
753- if let Some ( target_block_number) = target_block_number {
754- // Create a filter condition for messages that have an L1 block number less than
755- // or equal to the target block number and have not been included in an L2 block
756- // (i.e. L2BlockNumber is null).
757- let condition = Condition :: all ( )
758- . add (
759- models:: l1_message:: Column :: L1BlockNumber
760- . lte ( target_block_number as i64 ) ,
761- )
762- . add ( models:: l1_message:: Column :: L2BlockNumber . is_null ( ) ) ;
763- // Yield a stream of messages matching the condition ordered by increasing
764- // queue index.
765- Ok ( Some (
766- models:: l1_message:: Entity :: find ( )
767- . filter ( condition)
768- . order_by_asc ( models:: l1_message:: Column :: QueueIndex )
769- . stream ( self . get_connection ( ) )
770- . await ?
771- . map ( map_l1_message_result) ,
772- ) )
773- } else {
774- // If the depth is greater than the latest block number, return None as there
775- // are no messages that satisfy the condition.
776- Ok ( None )
777- }
759+ let target_block_number =
760+ if let Some ( target_block_number) = latest_block_number. checked_sub ( depth) {
761+ target_block_number
762+ } else {
763+ return Ok ( None ) ;
764+ } ;
765+ // Create a filter condition for messages that have an L1 block number less than
766+ // or equal to the target block number and have not been included in an L2 block
767+ // (i.e. L2BlockNumber is null).
768+ let condition = Condition :: all ( )
769+ . add ( models:: l1_message:: Column :: L1BlockNumber . lte ( target_block_number as i64 ) )
770+ . add ( models:: l1_message:: Column :: L2BlockNumber . is_null ( ) ) ;
771+ // Yield a stream of messages matching the condition ordered by increasing
772+ // queue index.
773+ Ok ( Some (
774+ models:: l1_message:: Entity :: find ( )
775+ . filter ( condition)
776+ . order_by_asc ( models:: l1_message:: Column :: QueueIndex )
777+ . stream ( self . get_connection ( ) )
778+ . await ?
779+ . map ( map_l1_message_result) ,
780+ ) )
778781 }
779782 // Provides a stream over all L1 messages with increasing queue index starting from the
780783 // beginning.
@@ -966,8 +969,10 @@ impl L1MessageKey {
966969/// block yet.
967970#[ derive( Debug , Clone , PartialEq , Eq ) ]
968971pub enum NotIncludedStart {
969- /// Start from finalized messages that have not been included in a block yet.
970- Finalized ,
972+ /// Start from finalized messages that have not been included in a block yet and have a L1
973+ /// block number that is a specified number of blocks below the current finalized L1 block
974+ /// number.
975+ FinalizedWithBlockDepth ( u64 ) ,
971976 /// Start from unfinalized messages that are included in L1 blocks at a specific depth.
972977 BlockDepth ( u64 ) ,
973978}
@@ -986,7 +991,9 @@ impl fmt::Display for L1MessageKey {
986991 Self :: TransactionHash ( hash) => write ! ( f, "TransactionHash({hash:#x})" ) ,
987992 Self :: BlockNumber ( number) => write ! ( f, "BlockNumber({number})" ) ,
988993 Self :: NotIncluded ( start) => match start {
989- NotIncludedStart :: Finalized => write ! ( f, "NotIncluded(Finalized)" ) ,
994+ NotIncludedStart :: FinalizedWithBlockDepth ( depth) => {
995+ write ! ( f, "NotIncluded(Finalized:{depth})" )
996+ }
990997 NotIncludedStart :: BlockDepth ( depth) => {
991998 write ! ( f, "NotIncluded(BlockDepth({depth}))" )
992999 }
0 commit comments