@@ -175,8 +175,8 @@ pub enum Command {
175175 } ,
176176 Rewind {
177177 sender : mpsc:: Sender < BlockRangeMessage > ,
178- start_height : Option < BlockNumberOrTag > ,
179- end_height : Option < BlockNumberOrTag > ,
178+ start_height : BlockNumberOrTag ,
179+ end_height : BlockNumberOrTag ,
180180 response : oneshot:: Sender < Result < ( ) , BlockRangeScannerError > > ,
181181 } ,
182182 Unsubscribe {
@@ -402,8 +402,6 @@ impl<N: Network> Service<N> {
402402 Command :: Rewind { sender, start_height, end_height, response } => {
403403 self . ensure_no_subscriber ( ) ?;
404404 self . subscriber = Some ( sender) ;
405- let start_height = start_height. unwrap_or ( BlockNumberOrTag :: Latest ) ;
406- let end_height = end_height. unwrap_or ( BlockNumberOrTag :: Earliest ) ;
407405 info ! ( start_height = ?start_height, end_height = ?end_height, "Starting rewind" ) ;
408406 let result = self . handle_rewind ( start_height, end_height) . await ;
409407 let _ = response. send ( result) ;
@@ -977,16 +975,16 @@ impl BlockRangeScannerClient {
977975 /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
978976 pub async fn rewind < BN : Into < BlockNumberOrTag > > (
979977 & self ,
980- start_height : Option < BN > ,
981- end_height : Option < BN > ,
978+ start_height : BN ,
979+ end_height : BN ,
982980 ) -> Result < ReceiverStream < BlockRangeMessage > , BlockRangeScannerError > {
983981 let ( blocks_sender, blocks_receiver) = mpsc:: channel ( MAX_BUFFERED_MESSAGES ) ;
984982 let ( response_tx, response_rx) = oneshot:: channel ( ) ;
985983
986984 let command = Command :: Rewind {
987985 sender : blocks_sender,
988- start_height : start_height. map ( |n| n . into ( ) ) ,
989- end_height : end_height. map ( |n| n . into ( ) ) ,
986+ start_height : start_height. into ( ) ,
987+ end_height : end_height. into ( ) ,
990988 response : response_tx,
991989 } ;
992990
@@ -1754,4 +1752,96 @@ mod tests {
17541752 assert_eq ! ( received, vec![ 8 ..=8 , 7 ..=7 , 6 ..=6 , 5 ..=5 ] ) ;
17551753 Ok ( ( ) )
17561754 }
1755+
1756+ #[ tokio:: test]
1757+ async fn command_rewind_defaults_latest_to_earliest_batches_correctly ( ) -> anyhow:: Result < ( ) > {
1758+ let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1759+
1760+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
1761+ // Mine 20 blocks, so the total number of blocks is 21 (including 0th block)
1762+ provider. anvil_mine ( Option :: Some ( 20 ) , Option :: None ) . await ?;
1763+
1764+ let client = BlockRangeScanner :: new ( )
1765+ . with_blocks_read_per_epoch ( 7 )
1766+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
1767+ . await ?
1768+ . run ( ) ?;
1769+
1770+ let mut stream = client
1771+ . rewind :: < BlockNumberOrTag > ( BlockNumberOrTag :: Earliest , BlockNumberOrTag :: Latest )
1772+ . await ?;
1773+
1774+ let mut received = Vec :: new ( ) ;
1775+ while let Some ( msg) = stream. next ( ) . await {
1776+ if let BlockRangeMessage :: Data ( range) = msg {
1777+ received. push ( range) ;
1778+ }
1779+ }
1780+
1781+ // With epoch=7 over [0..=20] -> [14..=20, 7..=13, 0..=6]
1782+ assert_eq ! ( received, vec![ 14 ..=20 , 7 ..=13 , 0 ..=6 ] ) ;
1783+ Ok ( ( ) )
1784+ }
1785+
1786+ #[ tokio:: test]
1787+ async fn command_rewind_handles_start_and_end_in_any_order ( ) -> anyhow:: Result < ( ) > {
1788+ let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1789+
1790+ let provider = ProviderBuilder :: new ( ) . connect ( anvil. endpoint ( ) . as_str ( ) ) . await ?;
1791+ // Ensure blocks at 3 and 15 exist
1792+ provider. anvil_mine ( Option :: Some ( 16 ) , Option :: None ) . await ?;
1793+
1794+ let client = BlockRangeScanner :: new ( )
1795+ . with_blocks_read_per_epoch ( 5 )
1796+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
1797+ . await ?
1798+ . run ( ) ?;
1799+
1800+ let mut stream = client. rewind ( 15 , 3 ) . await ?;
1801+
1802+ let mut received = Vec :: new ( ) ;
1803+ while let Some ( msg) = stream. next ( ) . await {
1804+ if let BlockRangeMessage :: Data ( range) = msg {
1805+ received. push ( range) ;
1806+ }
1807+ }
1808+
1809+ // Range normalized to [3..=15] with epoch=5 -> [11..=15, 6..=10, 3..=5]
1810+ assert_eq ! ( received, vec![ 11 ..=15 , 6 ..=10 , 3 ..=5 ] ) ;
1811+
1812+ let mut stream = client. rewind ( 3 , 15 ) . await ?;
1813+
1814+ let mut received = Vec :: new ( ) ;
1815+ while let Some ( msg) = stream. next ( ) . await {
1816+ if let BlockRangeMessage :: Data ( range) = msg {
1817+ received. push ( range) ;
1818+ }
1819+ }
1820+
1821+ // Range normalized to [3..=15] with epoch=5 -> [11..=15, 6..=10, 3..=5]
1822+ assert_eq ! ( received, vec![ 11 ..=15 , 6 ..=10 , 3 ..=5 ] ) ;
1823+
1824+ Ok ( ( ) )
1825+ }
1826+
1827+ #[ tokio:: test]
1828+ async fn command_rewind_propagates_block_not_found_error ( ) -> anyhow:: Result < ( ) > {
1829+ let anvil = Anvil :: new ( ) . try_spawn ( ) ?;
1830+
1831+ // Do not mine up to 999 so start won't exist
1832+ let client = BlockRangeScanner :: new ( )
1833+ . with_blocks_read_per_epoch ( 5 )
1834+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
1835+ . await ?
1836+ . run ( ) ?;
1837+
1838+ let res = client. rewind ( 0 , 999 ) . await ;
1839+
1840+ match res {
1841+ Err ( BlockRangeScannerError :: BlockNotFound ( _) ) => { }
1842+ other => panic ! ( "unexpected result: {other:?}" ) ,
1843+ }
1844+
1845+ Ok ( ( ) )
1846+ }
17571847}
0 commit comments