@@ -50,6 +50,10 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
5050}
5151
5252#[ allow( clippy:: too_many_arguments) ]
53+ #[ cfg_attr(
54+ feature = "tracing" ,
55+ tracing:: instrument( level = "trace" , skip( subscription, sender, provider, reorg_handler) )
56+ ) ]
5357pub ( crate ) async fn stream_live_blocks < N : Network > (
5458 stream_start : BlockNumber ,
5559 subscription : RobustSubscription < N > ,
@@ -69,6 +73,16 @@ pub(crate) async fn stream_live_blocks<N: Network>(
6973 return ;
7074 } ;
7175
76+ debug ! (
77+ first_block = first_block. number( ) ,
78+ stream_start = stream_start,
79+ "Received first relevant block, starting live streaming"
80+ ) ;
81+
82+ // This check is necessary when running `sync` modes. It makes sense to stream this notification
83+ // only once the first relevant block is received from the subscription, and not before that;
84+ // otherwise callers might perform certain operations expecting the relevant blocks to start
85+ // coming, when in fact they are not.
7286 if notify_after_first_block && !sender. try_stream ( Notification :: SwitchingToLive ) . await {
7387 return ;
7488 }
@@ -100,8 +114,6 @@ pub(crate) async fn stream_live_blocks<N: Network>(
100114 reorg_handler,
101115 )
102116 . await ;
103-
104- warn ! ( "Live block subscription ended" ) ;
105117}
106118
107119async fn get_first_block <
@@ -119,7 +131,6 @@ async fn get_first_block<
119131 subscription:: Error :: Lagged ( _) => {
120132 // scanner already accounts for skipped block numbers
121133 // next block will be the actual incoming block
122- info ! ( "Skipping Error::Lagged, next block should be the first live block" ) ;
123134 }
124135 subscription:: Error :: Timeout => {
125136 _ = sender. try_stream ( ScannerError :: Timeout ) . await ;
@@ -154,8 +165,8 @@ fn skip_to_first_relevant_block<N: Network>(
154165 } )
155166}
156167
157- /// Initializes the streaming state after receiving the first block
158- /// Returns None if the channel is closed
168+ /// Initializes the streaming state after receiving the first block.
169+ /// Returns None if the channel is closed.
159170async fn initialize_live_streaming_state < N : Network > (
160171 first_block : N :: HeaderResponse ,
161172 stream_start : BlockNumber ,
@@ -165,10 +176,7 @@ async fn initialize_live_streaming_state<N: Network>(
165176 provider : & RobustProvider < N > ,
166177 reorg_handler : & mut ReorgHandler < N > ,
167178) -> Option < LiveStreamingState < N > > {
168- let incoming_block_num = first_block. number ( ) ;
169- info ! ( block_number = incoming_block_num, "Received first block header" ) ;
170-
171- let confirmed = incoming_block_num. saturating_sub ( block_confirmations) ;
179+ let confirmed = first_block. number ( ) . saturating_sub ( block_confirmations) ;
172180
173181 // The minimum common ancestor is the block before the stream start
174182 let min_common_ancestor = stream_start. saturating_sub ( 1 ) ;
@@ -210,10 +218,9 @@ async fn stream_blocks_continuously<
210218 let incoming_block = match incoming_block {
211219 Ok ( block) => block,
212220 Err ( e) => {
213- error ! ( error = %e, "Error receiving block from stream" ) ;
214221 match e {
215222 subscription:: Error :: Lagged ( _) => {
216- // scanner already accounts for skipped block numbers
223+ // scanner already accounts for skipped block numbers,
217224 // next block will be the actual incoming block
218225 continue ;
219226 }
@@ -233,8 +240,8 @@ async fn stream_blocks_continuously<
233240 }
234241 } ;
235242
236- let incoming_block_num = incoming_block. number ( ) ;
237- info ! ( block_number = incoming_block_num , "Received block header " ) ;
243+ let incoming_block = incoming_block. number ( ) ;
244+ trace ! ( received = incoming_block , "Received item from block subscription " ) ;
238245
239246 let Some ( previous_batch_end) = state. previous_batch_end . as_ref ( ) else {
240247 // previously detected reorg wasn't fully handled
@@ -244,7 +251,7 @@ async fn stream_blocks_continuously<
244251 let common_ancestor = match reorg_handler. check ( previous_batch_end) . await {
245252 Ok ( reorg_opt) => reorg_opt,
246253 Err ( e) => {
247- error ! ( error = %e , "Failed to perform reorg check" ) ;
254+ error ! ( "Failed to perform reorg check" ) ;
248255 _ = sender. try_stream ( e) . await ;
249256 return ;
250257 }
@@ -260,7 +267,7 @@ async fn stream_blocks_continuously<
260267 }
261268
262269 // Stream the next batch of confirmed blocks
263- let batch_end_num = incoming_block_num . saturating_sub ( block_confirmations) ;
270+ let batch_end_num = incoming_block . saturating_sub ( block_confirmations) ;
264271 if !stream_next_batch (
265272 batch_end_num,
266273 state,
@@ -287,23 +294,33 @@ async fn handle_reorg_detected<N: Network>(
287294) -> bool {
288295 let ancestor_num = common_ancestor. header ( ) . number ( ) ;
289296
297+ info ! (
298+ common_ancestor = ancestor_num,
299+ stream_start = stream_start,
300+ "Reorg detected during live streaming"
301+ ) ;
302+
290303 if !sender. try_stream ( Notification :: ReorgDetected { common_ancestor : ancestor_num } ) . await {
291304 return false ;
292305 }
293306
294307 // Reset streaming position based on common ancestor
295308 if ancestor_num < stream_start {
296309 // Reorg went before our starting point - restart from stream_start
297- info ! (
298- ancestor_block = ancestor_num,
310+ debug ! (
311+ common_ancestor = ancestor_num,
299312 stream_start = stream_start,
300- "Reorg detected before stream start, resetting to stream start "
313+ "Reorg predates stream start, restarting from stream_start "
301314 ) ;
302315 state. batch_start = stream_start;
303316 state. previous_batch_end = None ;
304317 } else {
305318 // Resume from after the common ancestor
306- info ! ( ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor" ) ;
319+ debug ! (
320+ common_ancestor = ancestor_num,
321+ resume_from = ancestor_num + 1 ,
322+ "Resuming from after common ancestor"
323+ ) ;
307324 state. batch_start = ancestor_num + 1 ;
308325 state. previous_batch_end = Some ( common_ancestor) ;
309326 }
@@ -361,6 +378,10 @@ struct LiveStreamingState<N: Network> {
361378}
362379
363380#[ must_use]
381+ #[ cfg_attr(
382+ feature = "tracing" ,
383+ tracing:: instrument( level = "trace" , skip( sender, provider, reorg_handler) )
384+ ) ]
364385pub ( crate ) async fn stream_historical_range < N : Network > (
365386 start : BlockNumber ,
366387 end : BlockNumber ,
@@ -369,20 +390,31 @@ pub(crate) async fn stream_historical_range<N: Network>(
369390 provider : & RobustProvider < N > ,
370391 reorg_handler : & mut ReorgHandler < N > ,
371392) -> Option < ( ) > {
372- info ! ( "Getting finalized block number" ) ;
373393 let finalized = match provider. get_block_number_by_id ( BlockNumberOrTag :: Finalized . into ( ) ) . await
374394 {
375395 Ok ( block) => block,
376396 Err ( e) => {
377- error ! ( error = %e , "Failed to get finalized block" ) ;
397+ warn ! ( "Failed to get finalized block" ) ;
378398 _ = sender. try_stream ( e) . await ;
379399 return None ;
380400 }
381401 } ;
382402
403+ debug ! ( finalized_block = finalized, "Got finalized block for historical range" ) ;
404+
383405 // no reorg check for finalized blocks
384406 let finalized_batch_end = finalized. min ( end) ;
407+ let finalized_range_count =
408+ RangeIterator :: forward ( start, finalized_batch_end, max_block_range) . count ( ) ;
409+ trace ! (
410+ start = start,
411+ finalized_batch_end = finalized_batch_end,
412+ batch_count = finalized_range_count,
413+ "Streaming finalized blocks (no reorg check)"
414+ ) ;
415+
385416 for range in RangeIterator :: forward ( start, finalized_batch_end, max_block_range) {
417+ trace ! ( range_start = * range. start( ) , range_end = * range. end( ) , "Streaming finalized range" ) ;
386418 if !sender. try_stream ( range) . await {
387419 return None ; // channel closed
388420 }
@@ -423,6 +455,10 @@ pub(crate) async fn stream_historical_range<N: Network>(
423455}
424456
425457/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
458+ #[ cfg_attr(
459+ feature = "tracing" ,
460+ tracing:: instrument( level = "trace" , skip( sender, provider, reorg_handler) )
461+ ) ]
426462pub ( crate ) async fn stream_range_with_reorg_handling < N : Network > (
427463 min_common_ancestor : BlockNumber ,
428464 next_start_block : BlockNumber ,
@@ -440,7 +476,11 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
440476 let batch_end = match provider. get_block_by_number ( batch_end_num. into ( ) ) . await {
441477 Ok ( block) => block,
442478 Err ( e) => {
443- error ! ( batch_start = batch. start( ) , batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch" ) ;
479+ error ! (
480+ batch_start = batch. start( ) ,
481+ batch_end = batch_end_num,
482+ "Failed to get ending block of the current batch"
483+ ) ;
444484 _ = sender. try_stream ( e) . await ;
445485 return None ;
446486 }
@@ -453,23 +493,28 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
453493 let reorged_opt = match reorg_handler. check ( & batch_end) . await {
454494 Ok ( opt) => opt,
455495 Err ( e) => {
456- error ! ( error = %e , "Failed to perform reorg check" ) ;
496+ error ! ( "Failed to perform reorg check" ) ;
457497 _ = sender. try_stream ( e) . await ;
458498 return None ;
459499 }
460500 } ;
461501
462502 if let Some ( common_ancestor) = reorged_opt {
463503 let common_ancestor = common_ancestor. header ( ) . number ( ) ;
504+ info ! (
505+ common_ancestor = common_ancestor,
506+ "Reorg detected during historical streaming, resetting range iterator"
507+ ) ;
464508 if !sender. try_stream ( Notification :: ReorgDetected { common_ancestor } ) . await {
465509 return None ;
466510 }
467- iter. reset_to ( ( common_ancestor + 1 ) . max ( min_common_ancestor) ) ;
511+ let reset_to = ( common_ancestor + 1 ) . max ( min_common_ancestor) ;
512+ debug ! ( reset_to = reset_to, "Resetting range iterator after reorg" ) ;
513+ iter. reset_to ( reset_to) ;
468514 }
469515
470516 last_batch_end = Some ( batch_end) ;
471517 }
472518
473- info ! ( batch_count = iter. batch_count( ) , "Historical sync completed" ) ;
474519 last_batch_end
475520}
0 commit comments