@@ -411,15 +411,16 @@ impl<N: Network> Service<N> {
411411
412412 async fn handle_live ( & mut self ) -> Result < ( ) , BlockRangeScannerError > {
413413 let provider = self . provider . clone ( ) ;
414- let start = self . provider . get_block_number ( ) . await ?;
414+ let latest = self . provider . get_block_number ( ) . await ?;
415415
416416 let Some ( sender) = self . subscriber . clone ( ) else {
417417 return Err ( BlockRangeScannerError :: ServiceShutdown ) ;
418418 } ;
419419
420420 let block_confirmations = self . config . block_confirmations ;
421+ let expected_next = latest. saturating_sub ( block_confirmations) ;
421422 tokio:: spawn ( async move {
422- Self :: stream_live_blocks ( start , provider, sender, block_confirmations) . await ;
423+ Self :: stream_live_blocks ( expected_next , provider, sender, block_confirmations) . await ;
423424 } ) ;
424425
425426 Ok ( ( ) )
@@ -474,8 +475,40 @@ impl<N: Network> Service<N> {
474475 ) ) ,
475476 ) ?;
476477
477- let end_block = self . provider . get_block_by_number ( BlockNumberOrTag :: Latest ) . await ?. ok_or (
478- BlockRangeScannerError :: HistoricalSyncError ( "Latest block not found" . to_string ( ) ) ,
478+ let latest_block =
479+ self . provider . get_block_by_number ( BlockNumberOrTag :: Latest ) . await ?. ok_or (
480+ BlockRangeScannerError :: HistoricalSyncError ( "Latest block not found" . to_string ( ) ) ,
481+ ) ?;
482+
483+ let block_confirmations = self . config . block_confirmations ;
484+ let confirmed_tip_num = latest_block. header ( ) . number ( ) . saturating_sub ( block_confirmations) ;
485+
486+ // If start is beyond confirmed tip, skip historical and go straight to live
487+ if start_block. header ( ) . number ( ) > confirmed_tip_num {
488+ info ! (
489+ start_block = start_block. header( ) . number( ) ,
490+ confirmed_tip = confirmed_tip_num,
491+ "Start block is beyond confirmed tip, starting live stream"
492+ ) ;
493+
494+ let Some ( sender) = self . subscriber . clone ( ) else {
495+ return Err ( BlockRangeScannerError :: ServiceShutdown ) ;
496+ } ;
497+
498+ let provider = self . provider . clone ( ) ;
499+ let expected_next = start_block. header ( ) . number ( ) ;
500+ tokio:: spawn ( async move {
501+ Self :: stream_live_blocks ( expected_next, provider, sender, block_confirmations)
502+ . await ;
503+ } ) ;
504+
505+ return Ok ( ( ) ) ;
506+ }
507+
508+ let end_block = self . provider . get_block_by_number ( confirmed_tip_num. into ( ) ) . await ?. ok_or (
509+ BlockRangeScannerError :: HistoricalSyncError ( format ! (
510+ "Confirmed tip block {confirmed_tip_num} not found"
511+ ) ) ,
479512 ) ?;
480513
481514 info ! (
@@ -495,8 +528,6 @@ impl<N: Network> Service<N> {
495528 // Any block > cutoff will come from the live stream
496529 let cutoff = end_block. header ( ) . number ( ) ;
497530
498- let block_confirmations = self . config . block_confirmations ;
499-
500531 // This task runs independently, accumulating new blocks while wehistorical data is syncing
501532 let live_subscription_task = tokio:: spawn ( async move {
502533 Self :: stream_live_blocks (
@@ -1049,7 +1080,10 @@ mod tests {
10491080
10501081 #[ tokio:: test]
10511082 async fn live_mode_respects_block_confirmations ( ) -> anyhow:: Result < ( ) > {
1052- let anvil = Anvil :: new ( ) . block_time_f64 ( 0.1 ) . try_spawn ( ) ?;
1083+ let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1084+
1085+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
1086+ provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ?;
10531087
10541088 let block_confirmations = 5 ;
10551089
@@ -1062,18 +1096,18 @@ mod tests {
10621096 let expected_blocks = 10 ;
10631097
10641098 let mut receiver = client. stream_live ( ) . await ?. take ( expected_blocks) ;
1099+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
1100+ let latest_head = provider. get_block_number ( ) . await ?;
1101+ provider. anvil_mine ( Option :: Some ( expected_blocks as u64 ) , Option :: None ) . await ?;
10651102
10661103 let mut block_range_start = 0 ;
10671104
10681105 while let Some ( BlockRangeMessage :: Data ( range) ) = receiver. next ( ) . await {
1069- info ! ( "Received block range: [{range:?}]" ) ;
10701106 if block_range_start == 0 {
10711107 block_range_start = * range. start ( ) ;
1108+ assert_eq ! ( * range. start( ) , latest_head. saturating_sub( block_confirmations) ) ;
10721109 }
10731110
1074- let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
1075- let latest_head = provider. get_block_number ( ) . await ?;
1076- assert_eq ! ( * range. end( ) , latest_head. saturating_sub( block_confirmations) ) ;
10771111 assert_eq ! ( block_range_start, * range. start( ) ) ;
10781112 assert ! ( * range. end( ) >= * range. start( ) ) ;
10791113 block_range_start = * range. end ( ) + 1 ;
0 commit comments