@@ -472,15 +472,13 @@ impl<N: Network> Service<N> {
472472 } ;
473473
474474 let end_num = end_block. header ( ) . number ( ) ;
475- println ! ( "spawning monitor.." ) ;
476- let monitor = self . spawn_live_header_monitor_if_needed ( end_num, subscriber) . await ?;
477- println ! ( "monitor spawned!" ) ;
475+ let monitor = self . spawn_reorg_monitor_if_needed ( end_num, subscriber) . await ?;
478476
479477 match self . stream_historical_blocks ( start_block, end_block. clone ( ) ) . await {
480478 Ok ( ( ) ) => {
481479 // Post-sync: stop header collection and evaluate correction range
482480 if let Some ( ( handle, rx) ) = monitor {
483- self . drain_header_monitor_and_emit_correction ( rx, end_num) . await ;
481+ self . drain_reorg_monitor_and_emit_correction ( rx, end_num) . await ;
484482 handle. abort ( ) ;
485483 }
486484 }
@@ -614,7 +612,6 @@ impl<N: Network> Service<N> {
614612
615613 self . next_start_block = BlockHashAndNumber :: from_header :: < N > ( start. header ( ) ) ;
616614
617- println ! ( "lock" ) ;
618615 #[ cfg( test) ]
619616 lock_historical_for_testing ( ) . await ;
620617
@@ -820,7 +817,7 @@ impl<N: Network> Service<N> {
820817 Ok ( ws_stream)
821818 }
822819
823- async fn spawn_live_header_monitor_if_needed (
820+ async fn spawn_reorg_monitor_if_needed (
824821 & self ,
825822 end_num : BlockNumber ,
826823 subscriber : mpsc:: Sender < BlockRangeMessage > ,
@@ -883,7 +880,7 @@ impl<N: Network> Service<N> {
883880 Ok ( Option :: Some ( ( live_subscription_task, live_block_num_receiver) ) )
884881 }
885882
886- async fn drain_header_monitor_and_emit_correction (
883+ async fn drain_reorg_monitor_and_emit_correction (
887884 & mut self ,
888885 mut rx : mpsc:: Receiver < BlockNumber > ,
889886 end_num : BlockNumber ,
@@ -1086,7 +1083,11 @@ impl BlockRangeScannerClient {
10861083mod tests {
10871084
10881085 use std:: time:: Duration ;
1089- use tokio:: { join, sync:: mpsc:: Receiver , time:: timeout} ;
1086+ use tokio:: {
1087+ join,
1088+ sync:: mpsc:: Receiver ,
1089+ time:: { sleep, timeout} ,
1090+ } ;
10901091
10911092 use alloy:: {
10921093 network:: Ethereum ,
@@ -1533,6 +1534,156 @@ mod tests {
15331534 Ok ( ( ) )
15341535 }
15351536
1537+ #[ tokio:: test]
1538+ async fn historical_emits_correction_range_when_reorg_below_end ( ) -> anyhow:: Result < ( ) > {
1539+ let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1540+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
1541+
1542+ provider. anvil_mine ( Option :: Some ( 120 ) , Option :: None ) . await ?;
1543+
1544+ let finalized_block = provider. get_block_by_number ( BlockNumberOrTag :: Finalized ) . await ?;
1545+ let finalized_num = finalized_block. unwrap ( ) . header ( ) . number ( ) ;
1546+
1547+ let end_num = finalized_num + 10 ;
1548+ let head_before = provider. get_block_number ( ) . await ?;
1549+ assert ! ( end_num <= head_before) ;
1550+
1551+ let client = BlockRangeScanner :: new ( )
1552+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
1553+ . await ?
1554+ . run ( ) ?;
1555+
1556+ let lock = super :: TEST_HIST_LOCK . get_or_init ( || tokio:: sync:: Mutex :: new ( ( ) ) ) . lock ( ) . await ;
1557+
1558+ let fut_stream = client
1559+ . stream_historical ( BlockNumberOrTag :: Number ( 0 ) , BlockNumberOrTag :: Number ( end_num) ) ;
1560+
1561+ let roerg = async {
1562+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
1563+ let _ = provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ;
1564+ let head_now = provider. get_block_number ( ) . await . unwrap ( ) ;
1565+ let reorg_start = end_num - 5 ;
1566+ let depth = head_now - reorg_start + 1 ;
1567+ let _ = provider. anvil_reorg ( ReorgOptions { depth, tx_block_pairs : vec ! [ ] } ) . await ;
1568+ let _ = provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ;
1569+
1570+ drop ( lock) ;
1571+ } ;
1572+
1573+ let ( res_stream, ( ) ) = join ! ( fut_stream, roerg) ;
1574+
1575+ let mut stream = res_stream. unwrap ( ) ;
1576+
1577+ let mut data_ranges = Vec :: new ( ) ;
1578+ let mut reorg = false ;
1579+ while let Some ( msg) = stream. next ( ) . await {
1580+ match msg {
1581+ BlockRangeMessage :: Data ( range) => data_ranges. push ( range) ,
1582+ BlockRangeMessage :: Status ( status) => {
1583+ if matches ! ( status, ScannerStatus :: ReorgDetected ) {
1584+ reorg = true ;
1585+ }
1586+ }
1587+ BlockRangeMessage :: Error ( _) => {
1588+ panic ! ( "error" ) ;
1589+ }
1590+ }
1591+ }
1592+
1593+ assert ! ( reorg, "no reorg detected" ) ;
1594+
1595+ let reorg_start = end_num - 5 ;
1596+ let last_range = data_ranges. last ( ) . expect ( "should have at least one range" ) ;
1597+ assert_eq ! (
1598+ * last_range. start( ) ,
1599+ reorg_start,
1600+ "expected last range to start at {reorg_start}, got: {last_range:?}"
1601+ ) ;
1602+ assert_eq ! (
1603+ * last_range. end( ) ,
1604+ end_num,
1605+ "expected last range to end at {end_num}, got: {last_range:?}"
1606+ ) ;
1607+
1608+ Ok ( ( ) )
1609+ }
1610+
1611+ #[ tokio:: test]
1612+ async fn historical_emits_correction_range_when_end_num_reorgs ( ) -> anyhow:: Result < ( ) > {
1613+ let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1614+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
1615+
1616+ provider. anvil_mine ( Option :: Some ( 120 ) , Option :: None ) . await ?;
1617+
1618+ let head_block = provider. get_block_by_number ( BlockNumberOrTag :: Latest ) . await ?;
1619+ let end_num = head_block. unwrap ( ) . header ( ) . number ( ) ;
1620+
1621+ println ! ( "END NUM {end_num}" ) ;
1622+
1623+ let client = BlockRangeScanner :: new ( )
1624+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
1625+ . await ?
1626+ . run ( ) ?;
1627+
1628+ let lock = super :: TEST_HIST_LOCK . get_or_init ( || tokio:: sync:: Mutex :: new ( ( ) ) ) . lock ( ) . await ;
1629+
1630+ let fut_stream = client
1631+ . stream_historical ( BlockNumberOrTag :: Number ( 0 ) , BlockNumberOrTag :: Number ( end_num) ) ;
1632+
1633+ let roerg = async {
1634+ sleep ( Duration :: from_millis ( 100 ) ) . await ;
1635+ let pre_reorg_mine = 20 ;
1636+ let _ = provider. anvil_mine ( Option :: Some ( pre_reorg_mine) , Option :: None ) . await ;
1637+ let head_now = provider. get_block_number ( ) . await . unwrap ( ) ;
1638+ // Reorg back to previous head aka our end num
1639+ let depth = pre_reorg_mine + 1 ;
1640+ println ! ( "HEAD {head_now}" ) ;
1641+ println ! ( "DEPTH {depth}" ) ;
1642+ let _ = provider. anvil_reorg ( ReorgOptions { depth, tx_block_pairs : vec ! [ ] } ) . await ;
1643+ let _ = provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ;
1644+
1645+ drop ( lock) ;
1646+ } ;
1647+
1648+ let ( res_stream, ( ) ) = join ! ( fut_stream, roerg) ;
1649+
1650+ let mut stream = res_stream. unwrap ( ) ;
1651+
1652+ let mut data_ranges = Vec :: new ( ) ;
1653+ let mut reorg = false ;
1654+ while let Some ( msg) = stream. next ( ) . await {
1655+ match msg {
1656+ BlockRangeMessage :: Data ( range) => data_ranges. push ( range) ,
1657+ BlockRangeMessage :: Status ( status) => {
1658+ if matches ! ( status, ScannerStatus :: ReorgDetected ) {
1659+ reorg = true ;
1660+ }
1661+ }
1662+ BlockRangeMessage :: Error ( _) => {
1663+ panic ! ( "error" ) ;
1664+ }
1665+ }
1666+ }
1667+
1668+ assert ! ( reorg, "no reorg detected" ) ;
1669+
1670+ // reorg of 1 so last range should be end num
1671+ let reorg_start = end_num;
1672+ let last_range = data_ranges. last ( ) . expect ( "should have at least one range" ) ;
1673+ assert_eq ! (
1674+ * last_range. start( ) ,
1675+ reorg_start,
1676+ "expected last range to start at {reorg_start}, got: {last_range:?}"
1677+ ) ;
1678+ assert_eq ! (
1679+ * last_range. end( ) ,
1680+ end_num,
1681+ "expected last range to end at {end_num}, got: {last_range:?}"
1682+ ) ;
1683+
1684+ Ok ( ( ) )
1685+ }
1686+
15361687 #[ tokio:: test]
15371688 async fn historic_mode_respects_blocks_read_per_epoch ( ) -> anyhow:: Result < ( ) > {
15381689 let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
@@ -1755,76 +1906,4 @@ mod tests {
17551906
17561907 Ok ( ( ) )
17571908 }
1758-
1759- #[ tokio:: test]
1760- async fn historical_emits_correction_range_when_reorg_below_end ( ) -> anyhow:: Result < ( ) > {
1761- let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1762- let provider = ProviderBuilder :: new ( ) . connect ( anvil. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
1763-
1764- provider. anvil_mine ( Option :: Some ( 120 ) , Option :: None ) . await ?;
1765-
1766- let finalized_block = provider. get_block_by_number ( BlockNumberOrTag :: Finalized ) . await ?;
1767- let finalized_num = finalized_block. unwrap ( ) . header ( ) . number ( ) ;
1768-
1769- let end_num = finalized_num + 10 ;
1770- let head_before = provider. get_block_number ( ) . await ?;
1771- assert ! ( end_num <= head_before) ;
1772-
1773- let client = BlockRangeScanner :: new ( )
1774- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
1775- . await ?
1776- . run ( ) ?;
1777-
1778- let lock = super :: TEST_HIST_LOCK . get_or_init ( || tokio:: sync:: Mutex :: new ( ( ) ) ) . lock ( ) . await ;
1779-
1780- let fut = client
1781- . stream_historical ( BlockNumberOrTag :: Number ( 0 ) , BlockNumberOrTag :: Number ( end_num) ) ;
1782-
1783- let roerg = async {
1784- let head_now = provider. get_block_number ( ) . await . unwrap ( ) ;
1785- let reorg_start = end_num - 5 ;
1786- let depth = head_now - reorg_start + 1 ;
1787- let _ = provider. anvil_reorg ( ReorgOptions { depth, tx_block_pairs : vec ! [ ] } ) . await ;
1788- let _ = provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ;
1789-
1790- drop ( lock) ;
1791- } ;
1792-
1793- let ( res_stream, ( ) ) = join ! ( fut, roerg) ;
1794-
1795- let mut stream = res_stream. unwrap ( ) ;
1796-
1797- let mut data_ranges = Vec :: new ( ) ;
1798- let mut reorg = false ;
1799- while let Some ( msg) = stream. next ( ) . await {
1800- match msg {
1801- BlockRangeMessage :: Data ( range) => data_ranges. push ( range) ,
1802- BlockRangeMessage :: Status ( status) => {
1803- if matches ! ( status, ScannerStatus :: ReorgDetected ) {
1804- reorg = true ;
1805- }
1806- }
1807- BlockRangeMessage :: Error ( _) => {
1808- panic ! ( "error" ) ;
1809- }
1810- }
1811- }
1812-
1813- assert ! ( reorg, "no reorg detected" ) ;
1814-
1815- let reorg_start = end_num - 5 ;
1816- let last_range = data_ranges. last ( ) . expect ( "should have at least one range" ) ;
1817- assert_eq ! (
1818- * last_range. start( ) ,
1819- reorg_start,
1820- "expected last range to start at {reorg_start}, got: {last_range:?}"
1821- ) ;
1822- assert_eq ! (
1823- * last_range. end( ) ,
1824- end_num,
1825- "expected last range to end at {end_num}, got: {last_range:?}"
1826- ) ;
1827-
1828- Ok ( ( ) )
1829- }
18301909}
0 commit comments