6767
6868use std:: { ops:: RangeInclusive , sync:: Arc } ;
6969
70- #[ cfg( test) ]
71- use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
70+ use std:: sync:: OnceLock ;
7271
7372use tokio:: sync:: { mpsc, oneshot} ;
7473use tokio_stream:: { StreamExt , wrappers:: ReceiverStream } ;
@@ -317,29 +316,15 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
317316 }
318317}
319318
320- #[ cfg( test) ]
321- static HIST_FETCH_DELAY_MS : AtomicU64 = AtomicU64 :: new ( 0 ) ;
322-
323- #[ cfg( test) ]
324- fn set_historical_fetch_delay_ms ( ms : u64 ) {
325- HIST_FETCH_DELAY_MS . store ( ms, Ordering :: Relaxed ) ;
326- }
327-
328- #[ cfg( test) ]
329- async fn maybe_delay_historical ( ) {
330- let ms = HIST_FETCH_DELAY_MS . load ( Ordering :: Relaxed ) ;
331- if ms > 0 {
332- use std:: time:: Duration ;
333-
334- use tokio:: time:: sleep;
335-
336- sleep ( Duration :: from_millis ( ms) ) . await ;
337- }
338- }
319+ #[ allow( dead_code) ]
320+ static TEST_HIST_LOCK : OnceLock < tokio:: sync:: Mutex < ( ) > > = OnceLock :: new ( ) ;
339321
340- #[ cfg ( not ( test ) ) ]
322+ #[ allow ( dead_code ) ]
341323#[ allow( clippy:: unused_async) ]
342- async fn maybe_delay_historical ( ) { }
324+ async fn lock_historical_for_testing ( ) {
325+ let lock = TEST_HIST_LOCK . get_or_init ( || tokio:: sync:: Mutex :: new ( ( ) ) ) ;
326+ let _guard = lock. lock ( ) . await ;
327+ }
343328
344329struct Service < N : Network > {
345330 config : Config ,
@@ -484,7 +469,9 @@ impl<N: Network> Service<N> {
484469 } ;
485470
486471 let end_num = end_block. header ( ) . number ( ) ;
472+ println ! ( "spawning monitor.." ) ;
487473 let monitor = self . spawn_live_header_monitor_if_needed ( end_num, subscriber) . await ?;
474+ println ! ( "monitor spawned!" ) ;
488475
489476 match self . stream_historical_blocks ( start_block, end_block. clone ( ) ) . await {
490477 Ok ( ( ) ) => {
@@ -624,6 +611,10 @@ impl<N: Network> Service<N> {
624611
625612 self . current = BlockHashAndNumber :: from_header :: < N > ( start. header ( ) ) ;
626613
614+ println ! ( "lock" ) ;
615+ #[ cfg( test) ]
616+ lock_historical_for_testing ( ) . await ;
617+
627618 while self . current . number < end. header ( ) . number ( ) {
628619 self . ensure_current_not_reorged ( ) . await ?;
629620
@@ -642,8 +633,6 @@ impl<N: Network> Service<N> {
642633
643634 self . send_to_subscriber ( BlockRangeMessage :: Data ( self . current . number ..=batch_to) ) . await ;
644635
645- maybe_delay_historical ( ) . await ;
646-
647636 self . current = BlockHashAndNumber :: from_header :: < N > ( batch_end_block. header ( ) ) ;
648637
649638 batch_count += 1 ;
@@ -846,6 +835,7 @@ impl<N: Network> Service<N> {
846835 Ok ( sub) => {
847836 let mut stream = sub. into_stream ( ) ;
848837 while let Some ( h) = stream. next ( ) . await {
838+ println ! ( "h {}" , h. number( ) ) ;
849839 if live_block_num_sender. send ( h. number ( ) ) . await . is_err ( ) {
850840 warn ! ( "Downstream channel closed, stopping live header monitor" ) ;
851841 break ;
@@ -1063,7 +1053,7 @@ impl BlockRangeScannerClient {
10631053mod tests {
10641054
10651055 use std:: time:: Duration ;
1066- use tokio:: time:: timeout;
1056+ use tokio:: { join , time:: timeout} ;
10671057
10681058 use alloy:: {
10691059 network:: Ethereum ,
@@ -1077,7 +1067,7 @@ mod tests {
10771067 } ;
10781068 use alloy_node_bindings:: Anvil ;
10791069 use serde_json:: { Value , json} ;
1080- use tokio:: { join , sync:: mpsc} ;
1070+ use tokio:: sync:: mpsc;
10811071 use tokio_stream:: StreamExt ;
10821072
10831073 use super :: * ;
@@ -1645,10 +1635,7 @@ mod tests {
16451635 }
16461636
16471637 #[ tokio:: test]
1648- #[ ignore = "Flaky test as it relies on timing roerg between start of historical scan and end (when its getting live blocks)" ]
16491638 async fn historical_emits_correction_range_when_reorg_below_end ( ) -> anyhow:: Result < ( ) > {
1650- super :: set_historical_fetch_delay_ms ( 100 ) ;
1651-
16521639 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
16531640 let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
16541641
@@ -1666,26 +1653,24 @@ mod tests {
16661653 . await ?
16671654 . run ( ) ?;
16681655
1669- let provider_reorg = provider . clone ( ) ;
1656+ let lock = super :: TEST_HIST_LOCK . get_or_init ( || tokio :: sync :: Mutex :: new ( ( ) ) ) . lock ( ) . await ;
16701657
1671- let reorg = async move {
1672- tokio :: time :: sleep ( Duration :: from_millis ( 500 ) ) . await ;
1658+ let fut = client
1659+ . stream_historical ( BlockNumberOrTag :: Number ( 0 ) , BlockNumberOrTag :: Number ( end_num ) ) ;
16731660
1674- let head_now = provider_reorg. get_block_number ( ) . await . unwrap ( ) ;
1661+ let roerg = async {
1662+ let head_now = provider. get_block_number ( ) . await . unwrap ( ) ;
16751663 let reorg_start = end_num - 5 ;
16761664 let depth = head_now - reorg_start + 1 ;
1677- provider_reorg
1678- . anvil_reorg ( ReorgOptions { depth, tx_block_pairs : vec ! [ ] } )
1679- . await
1680- . unwrap ( ) ;
1681- provider_reorg. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await . unwrap ( ) ;
1665+ let _ = provider. anvil_reorg ( ReorgOptions { depth, tx_block_pairs : vec ! [ ] } ) . await ;
1666+ let _ = provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ;
1667+
1668+ drop ( lock) ;
16821669 } ;
16831670
1684- let fut_stream = client
1685- . stream_historical ( BlockNumberOrTag :: Number ( 0 ) , BlockNumberOrTag :: Number ( end_num) ) ;
1671+ let ( res_stream, ( ) ) = join ! ( fut, roerg) ;
16861672
1687- let ( stream_res, ( ) ) = join ! ( fut_stream, reorg) ;
1688- let mut stream = stream_res. unwrap ( ) ;
1673+ let mut stream = res_stream. unwrap ( ) ;
16891674
16901675 let mut data_ranges = Vec :: new ( ) ;
16911676 let mut reorg = false ;
0 commit comments