2121//! let block_range_scanner = BlockRangeScanner::new()
2222//! .with_blocks_read_per_epoch(1000)
2323//! .with_reorg_rewind_depth(5)
24- //! .with_retry_interval(Duration::from_secs(12))
2524//! .with_block_confirmations(5)
2625//! .connect_ws::<Ethereum>(Url::parse("ws://localhost:8546").unwrap())
2726//! .await?;
6968//! }
7069//! ```
7170
72- use std:: { ops:: Range , time :: Duration } ;
71+ use std:: ops:: Range ;
7372
7473use tokio:: sync:: { mpsc, oneshot} ;
7574use tokio_stream:: wrappers:: ReceiverStream ;
@@ -93,13 +92,11 @@ use tracing::{debug, error, info, warn};
9392
9493// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
9594const DEFAULT_BLOCKS_READ_PER_EPOCH : usize = 1000 ;
96- const DEFAULT_RETRY_INTERVAL : Duration = Duration :: from_secs ( 12 ) ;
9795const DEFAULT_BLOCK_CONFIRMATIONS : u64 = 0 ;
9896// const BACK_OFF_MAX_RETRIES: u64 = 5;
9997
10098const MAX_BUFFERED_MESSAGES : usize = 50000 ;
10199
102- // TODO: determine check exact default value
103100const DEFAULT_REORG_REWIND_DEPTH : u64 = 0 ;
104101
105102// // State sync aware retry settings
@@ -182,16 +179,16 @@ impl BlockHashAndNumber {
182179struct Config {
183180 blocks_read_per_epoch : usize ,
184181 reorg_rewind_depth : u64 ,
185- #[ allow( dead_code, reason = "TODO: will be used in smart retry mechanism" ) ]
186- retry_interval : Duration ,
187- #[ allow( dead_code, reason = "TODO: will be used in reorg mechanism" ) ]
182+ #[ allow(
183+ dead_code,
184+ reason = "Will be used in reorg mechanism: https://github.com/OpenZeppelin/Event-Scanner/issues/5"
185+ ) ]
188186 block_confirmations : u64 ,
189187}
190188
191189pub struct BlockRangeScanner {
192190 blocks_read_per_epoch : usize ,
193191 reorg_rewind_depth : u64 ,
194- retry_interval : Duration ,
195192 block_confirmations : u64 ,
196193}
197194
@@ -207,7 +204,6 @@ impl BlockRangeScanner {
207204 Self {
208205 blocks_read_per_epoch : DEFAULT_BLOCKS_READ_PER_EPOCH ,
209206 reorg_rewind_depth : DEFAULT_REORG_REWIND_DEPTH ,
210- retry_interval : DEFAULT_RETRY_INTERVAL ,
211207 block_confirmations : DEFAULT_BLOCK_CONFIRMATIONS ,
212208 }
213209 }
@@ -224,12 +220,6 @@ impl BlockRangeScanner {
224220 self
225221 }
226222
227- #[ must_use]
228- pub fn with_retry_interval ( mut self , retry_interval : Duration ) -> Self {
229- self . retry_interval = retry_interval;
230- self
231- }
232-
233223 #[ must_use]
234224 pub fn with_block_confirmations ( mut self , block_confirmations : u64 ) -> Self {
235225 self . block_confirmations = block_confirmations;
@@ -252,7 +242,6 @@ impl BlockRangeScanner {
252242 config : Config {
253243 blocks_read_per_epoch : self . blocks_read_per_epoch ,
254244 reorg_rewind_depth : self . reorg_rewind_depth ,
255- retry_interval : self . retry_interval ,
256245 block_confirmations : self . block_confirmations ,
257246 } ,
258247 } )
@@ -273,7 +262,6 @@ impl BlockRangeScanner {
273262 config : Config {
274263 blocks_read_per_epoch : self . blocks_read_per_epoch ,
275264 reorg_rewind_depth : self . reorg_rewind_depth ,
276- retry_interval : self . retry_interval ,
277265 block_confirmations : self . block_confirmations ,
278266 } ,
279267 } )
@@ -292,7 +280,6 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
292280 & self . provider
293281 }
294282
295- // TODO: use wrapper errors
296283 /// Starts the subscription service and returns a client for sending commands.
297284 ///
298285 /// # Errors
@@ -393,8 +380,6 @@ impl<N: Network> Service<N> {
393380 return Err ( Error :: MultipleSubscribers ) ;
394381 }
395382
396- // TODO: update local state relate to reorg and validate data
397-
398383 info ! ( "Starting subscription from point: {start_height:?}" ) ;
399384 self . subscriber = Some ( sender) ;
400385
@@ -415,11 +400,8 @@ impl<N: Network> Service<N> {
415400 let ( start_block, sync_end_block) = if let Some ( end_height) = end_height {
416401 let start_block =
417402 self . provider . get_block_by_number ( start_height) . await ?. expect ( "already checked" ) ;
418- let end_block = self
419- . provider
420- . get_block ( end_height. into ( ) )
421- . await ?
422- . expect ( "TODO: check if really valid" ) ;
403+ let end_block =
404+ self . provider . get_block ( end_height. into ( ) ) . await ?. expect ( "should be valid" ) ;
423405 ( start_block, end_block)
424406 } else {
425407 let start_block =
@@ -428,7 +410,7 @@ impl<N: Network> Service<N> {
428410 . provider
429411 . get_block ( BlockId :: Number ( BlockNumberOrTag :: Latest ) )
430412 . await ?
431- . expect ( "TODO: check if really valid" ) ;
413+ . expect ( "should be valid" ) ;
432414 ( start_block, end_block)
433415 } ;
434416
@@ -447,7 +429,6 @@ impl<N: Network> Service<N> {
447429 }
448430 } ) ;
449431
450- // TODO: invoke with smart retry mechanism with backoff
451432 if let Err ( e) = self . sync_historical_data ( start_block, sync_end_block) . await {
452433 warn ! ( "aborting ws_task" ) ;
453434 ws_task. abort ( ) ;
@@ -494,11 +475,8 @@ impl<N: Network> Service<N> {
494475 self . current . as_ref ( ) . unwrap ( ) . number + self . config . blocks_read_per_epoch as u64
495476 } ;
496477
497- let batch_end_block = self
498- . provider
499- . get_block_by_number ( batch_to. into ( ) )
500- . await ?
501- . expect ( "TODO: check if really valid" ) ;
478+ let batch_end_block =
479+ self . provider . get_block_by_number ( batch_to. into ( ) ) . await ?. expect ( "should be valid" ) ;
502480
503481 self . send_to_subscriber ( Ok ( self . current . as_ref ( ) . unwrap ( ) . number ..batch_to) ) . await ;
504482
@@ -560,18 +538,12 @@ impl<N: Network> Service<N> {
560538 provider : P ,
561539 buffer_sender : mpsc:: Sender < Range < BlockNumber > > ,
562540 ) {
563- // TODO: consider passing errors to the caller for handling
564-
565- // TODO: use smart retry mechanism
566541 match Self :: get_block_subscription ( & provider) . await {
567542 Ok ( mut ws_stream) => {
568543 info ! ( "WebSocket connected for buffering" ) ;
569544
570- // TODO: if latest != ws_stream.next(), then return latest.number and empty the
571- // ws_stream backlog
572545 while let Ok ( header_resp) = ws_stream. recv ( ) . await {
573546 info ! ( "Received block header: {}" , header_resp. number( ) ) ;
574- // TODO: handle reorgs
575547 if current == header_resp. number ( ) {
576548 continue ;
577549 }
@@ -612,7 +584,6 @@ impl<N: Network> Service<N> {
612584 }
613585 processed += end - start;
614586 } else if end > cutoff {
615- // TODO: verify the math
616587 discarded += cutoff - start;
617588
618589 let start = cutoff;
@@ -778,7 +749,6 @@ mod tests {
778749 let client = BlockRangeScanner :: new ( )
779750 . with_blocks_read_per_epoch ( 3 )
780751 . with_reorg_rewind_depth ( 5 )
781- . with_retry_interval ( Duration :: from_secs ( 1 ) )
782752 . with_block_confirmations ( 1 )
783753 . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
784754 . await ?
0 commit comments