diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 1bf223e8..18747442 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -79,18 +79,21 @@ use alloy::{ network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::BlockNumber, }; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; mod common; +mod range_iterator; mod reorg_handler; mod ring_buffer; mod sync_handler; +pub(crate) use range_iterator::RangeIterator; + use reorg_handler::ReorgHandler; pub use ring_buffer::RingBufferCapacity; pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000; -// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19 + pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000; @@ -472,8 +475,6 @@ impl Service { provider: &RobustProvider, reorg_handler: &mut ReorgHandler, ) { - let mut batch_count = 0; - // for checking whether reorg occurred let mut tip = from; @@ -490,29 +491,15 @@ impl Service { } }; - // we're iterating in reverse - let mut batch_from = from; let finalized_number = finalized_block.header().number(); // only check reorg if our tip is after the finalized block let check_reorg = tip.header().number() > finalized_number; - while batch_from >= to { - let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to); - + let mut iter = RangeIterator::reverse(from, to, max_block_range); + for range in &mut iter { // stream the range regularly, i.e. from smaller block number to greater - if !sender.try_stream(batch_to..=batch_from).await { - break; - } - - batch_count += 1; - if batch_count % 10 == 0 { - debug!(batch_count = batch_count, "Processed rewind batches"); - } - - // check early if end of stream achieved to avoid subtraction overflow when `to - // == 0` - if batch_to == to { + if !sender.try_stream(range).await { break; } @@ -539,11 +526,9 @@ impl Service { return; } } - - batch_from = batch_to - 1; } - info!(batch_count = batch_count, "Rewind completed"); + info!(batch_count = iter.batch_count(), "Rewind completed"); } /// Handles re-scanning of reorged blocks. @@ -586,18 +571,10 @@ impl Service { // Re-scan only the affected range (from common_ancestor + 1 up to tip) let rescan_from = common_ancestor + 1; - let mut rescan_batch_start = rescan_from; - while rescan_batch_start <= tip_number { - let rescan_batch_end = (rescan_batch_start + max_block_range - 1).min(tip_number); - - if !sender.try_stream(rescan_batch_start..=rescan_batch_end).await { + for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) { + if !sender.try_stream(batch).await { return false; } - - if rescan_batch_end == tip_number { - break; - } - rescan_batch_start = rescan_batch_end + 1; } true diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index 8d62fc1a..cbaf6824 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -3,7 +3,7 @@ use tokio_stream::StreamExt; use crate::{ ScannerError, - block_range_scanner::{BlockScannerResult, reorg_handler::ReorgHandler}, + block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler}, robust_provider::{RobustProvider, RobustSubscription, subscription}, types::{Notification, TryStream}, }; @@ -13,7 +13,7 @@ use alloy::{ network::{BlockResponse, Network}, primitives::BlockNumber, }; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; #[allow(clippy::too_many_arguments)] pub(crate) async fn stream_live_blocks( @@ -347,18 +347,17 @@ pub(crate) async fn stream_historical_range( }; // no reorg check for finalized blocks - let mut batch_start = start; let finalized_batch_end = finalized.min(end); - while batch_start <= finalized_batch_end { - let batch_end = batch_start.saturating_add(max_block_range - 1).min(finalized_batch_end); - - if !sender.try_stream(batch_start..=batch_end).await { + for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) { + if !sender.try_stream(range).await { return None; // channel closed } - - batch_start = batch_end + 1; } + // If start > finalized_batch_end, the loop above was empty and we should + // continue from start. Otherwise, continue from after finalized_batch_end. + let batch_start = start.max(finalized_batch_end + 1); + // covers case when `end <= finalized` if batch_start > end { return Some(()); // we're done @@ -392,35 +391,31 @@ pub(crate) async fn stream_historical_range( /// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks. pub(crate) async fn stream_range_with_reorg_handling( min_common_ancestor: BlockNumber, - mut next_start_block: BlockNumber, + next_start_block: BlockNumber, end: BlockNumber, max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, reorg_handler: &mut ReorgHandler, ) -> Option { - let mut batch_count = 0; + let mut last_batch_end: Option = None; + let mut iter = RangeIterator::forward(next_start_block, end, max_block_range); - loop { - let batch_end_num = next_start_block.saturating_add(max_block_range - 1).min(end); + while let Some(batch) = iter.next() { + let batch_end_num = *batch.end(); let batch_end = match provider.get_block_by_number(batch_end_num.into()).await { Ok(block) => block, Err(e) => { - error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch"); + error!(batch_start = batch.start(), batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch"); _ = sender.try_stream(e).await; return None; } }; - if !sender.try_stream(next_start_block..=batch_end_num).await { + if !sender.try_stream(batch).await { return None; // channel closed } - batch_count += 1; - if batch_count % 10 == 0 { - debug!(batch_count = batch_count, "Processed historical batches"); - } - let reorged_opt = match reorg_handler.check(&batch_end).await { Ok(opt) => opt, Err(e) => { @@ -430,19 +425,17 @@ pub(crate) async fn stream_range_with_reorg_handling( } }; - next_start_block = if let Some(common_ancestor) = reorged_opt { + if let Some(common_ancestor) = reorged_opt { let common_ancestor = common_ancestor.header().number(); if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await { return None; } - (common_ancestor + 1).max(min_common_ancestor) - } else { - batch_end_num + 1 - }; - - if next_start_block > end { - info!(batch_count = batch_count, "Historical sync completed"); - return Some(batch_end); + iter.reset_to((common_ancestor + 1).max(min_common_ancestor)); } + + last_batch_end = Some(batch_end); } + + info!(batch_count = iter.batch_count(), "Historical sync completed"); + last_batch_end } diff --git a/src/block_range_scanner/range_iterator.rs b/src/block_range_scanner/range_iterator.rs new file mode 100644 index 00000000..add17184 --- /dev/null +++ b/src/block_range_scanner/range_iterator.rs @@ -0,0 +1,337 @@ +use alloy::primitives::BlockNumber; +use std::{marker::PhantomData, ops::RangeInclusive}; +use tracing::debug; + +pub struct Forward; +pub struct Reverse; + +/// An iterator that yields block ranges of a configurable size. +#[derive(Debug, Clone)] +pub struct RangeIterator { + current: BlockNumber, + end: BlockNumber, + range_size: u64, + batch_count: u64, + total_batches: u64, + _direction: PhantomData, +} + +impl RangeIterator { + /// Creates a forward iterator (oldest to newest). + /// + /// Yields ranges from `start` toward `end`, inclusive. + /// + /// # Panics + /// + /// Panics if `max_block_range` is 0. + #[must_use] + pub const fn forward(start: BlockNumber, end: BlockNumber, max_block_range: u64) -> Self { + assert!(max_block_range >= 1, "max_block_range must be at least 1"); + let total_batches = if start > end { 0 } else { (end - start) / max_block_range + 1 }; + Self { + current: start, + end, + range_size: max_block_range, + batch_count: 0, + total_batches, + _direction: PhantomData, + } + } + + /// Resets the iterator to continue from a new position. + /// + /// Useful after detecting a reorg to rescan from a common ancestor. + pub fn reset_to(&mut self, block: BlockNumber) { + self.current = block; + self.total_batches = if block > self.end { + self.batch_count + } else { + self.batch_count + (self.end - block) / self.range_size + 1 + }; + } +} + +impl RangeIterator { + /// Creates a reverse iterator (newest to oldest). + /// + /// Yields ranges from `start` (higher) toward `end` (lower), inclusive. + /// Each yielded range is still formatted as `low..=high`. + /// + /// # Panics + /// + /// Panics if `max_block_range` is 0. + #[must_use] + pub const fn reverse(start: BlockNumber, end: BlockNumber, max_block_range: u64) -> Self { + assert!(max_block_range >= 1, "max_block_range must be at least 1"); + let total_batches = if start < end { 0 } else { (start - end) / max_block_range + 1 }; + Self { + current: start, + end, + range_size: max_block_range, + batch_count: 0, + total_batches, + _direction: PhantomData, + } + } +} + +impl RangeIterator { + /// Returns the number of batches yielded so far. + #[must_use] + pub fn batch_count(&self) -> u64 { + self.batch_count + } +} + +impl Iterator for RangeIterator { + type Item = RangeInclusive; + + fn next(&mut self) -> Option { + if self.batch_count >= self.total_batches { + return None; + } + + self.batch_count += 1; + if self.batch_count % 10 == 0 { + debug!(batch_count = self.batch_count, "Processed batches"); + } + + let batch_start = self.current; + let batch_end = batch_start.saturating_add(self.range_size - 1).min(self.end); + self.current = batch_end + 1; + + Some(batch_start..=batch_end) + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = + usize::try_from(self.total_batches - self.batch_count).unwrap_or(usize::MAX); + (remaining, None) + } +} + +impl Iterator for RangeIterator { + type Item = RangeInclusive; + + fn next(&mut self) -> Option { + if self.batch_count >= self.total_batches { + return None; + } + + self.batch_count += 1; + if self.batch_count % 10 == 0 { + debug!(batch_count = self.batch_count, "Processed batches"); + } + + let batch_high = self.current; + let batch_low = batch_high.saturating_sub(self.range_size - 1).max(self.end); + self.current = batch_low.saturating_sub(1); + + Some(batch_low..=batch_high) + } + + fn size_hint(&self) -> (usize, Option) { + match usize::try_from(self.total_batches - self.batch_count) { + Ok(remaining) => (remaining, Some(remaining)), + Err(_) => (usize::MAX, None), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn forward_basic() { + let mut iter = RangeIterator::forward(100, 250, 50); + assert_eq!(iter.next(), Some(100..=149)); + assert_eq!(iter.next(), Some(150..=199)); + assert_eq!(iter.next(), Some(200..=249)); + assert_eq!(iter.next(), Some(250..=250)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_basic() { + let mut iter = RangeIterator::reverse(250, 100, 50); + assert_eq!(iter.next(), Some(201..=250)); + assert_eq!(iter.next(), Some(151..=200)); + assert_eq!(iter.next(), Some(101..=150)); + assert_eq!(iter.next(), Some(100..=100)); + assert_eq!(iter.next(), None); + } + + #[test] + fn forward_single_batch() { + let mut iter = RangeIterator::forward(100, 120, 50); + assert_eq!(iter.next(), Some(100..=120)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_single_batch() { + let mut iter = RangeIterator::reverse(120, 100, 50); + assert_eq!(iter.next(), Some(100..=120)); + assert_eq!(iter.next(), None); + } + + #[test] + fn forward_exact_boundary() { + let mut iter = RangeIterator::forward(100, 199, 50); + assert_eq!(iter.next(), Some(100..=149)); + assert_eq!(iter.next(), Some(150..=199)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_exact_boundary() { + let mut iter = RangeIterator::reverse(199, 100, 50); + assert_eq!(iter.next(), Some(150..=199)); + assert_eq!(iter.next(), Some(100..=149)); + assert_eq!(iter.next(), None); + } + + #[test] + fn forward_empty_range() { + let mut iter = RangeIterator::forward(200, 100, 50); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_empty_range() { + let mut iter = RangeIterator::reverse(100, 200, 50); + assert_eq!(iter.next(), None); + } + + #[test] + fn forward_single_block() { + let mut iter = RangeIterator::forward(100, 100, 50); + assert_eq!(iter.next(), Some(100..=100)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_single_block() { + let mut iter = RangeIterator::reverse(100, 100, 50); + assert_eq!(iter.next(), Some(100..=100)); + assert_eq!(iter.next(), None); + } + + #[test] + fn forward_max_block_range_one() { + let mut iter = RangeIterator::forward(100, 103, 1); + assert_eq!(iter.next(), Some(100..=100)); + assert_eq!(iter.next(), Some(101..=101)); + assert_eq!(iter.next(), Some(102..=102)); + assert_eq!(iter.next(), Some(103..=103)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_max_block_range_one() { + let mut iter = RangeIterator::reverse(103, 100, 1); + assert_eq!(iter.next(), Some(103..=103)); + assert_eq!(iter.next(), Some(102..=102)); + assert_eq!(iter.next(), Some(101..=101)); + assert_eq!(iter.next(), Some(100..=100)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reset_to_rewinds_forward_iteration() { + let mut iter = RangeIterator::forward(100, 300, 50); + assert_eq!(iter.next(), Some(100..=149)); + assert_eq!(iter.next(), Some(150..=199)); + + iter.reset_to(175); + + assert_eq!(iter.next(), Some(175..=224)); + assert_eq!(iter.next(), Some(225..=274)); + assert_eq!(iter.next(), Some(275..=300)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reset_to_after_exhausted() { + let mut iter = RangeIterator::forward(100, 120, 50); + assert_eq!(iter.next(), Some(100..=120)); + assert_eq!(iter.next(), None); + + iter.reset_to(110); + + assert_eq!(iter.next(), Some(110..=120)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reset_to_beyond_end_exhausts_forward() { + let mut iter = RangeIterator::forward(100, 200, 50); + assert_eq!(iter.next(), Some(100..=149)); + + iter.reset_to(250); + + assert_eq!(iter.next(), None); + } + + #[test] + fn forward_starting_from_zero() { + let mut iter = RangeIterator::forward(0, 100, 50); + assert_eq!(iter.next(), Some(0..=49)); + assert_eq!(iter.next(), Some(50..=99)); + assert_eq!(iter.next(), Some(100..=100)); + assert_eq!(iter.next(), None); + } + + #[test] + fn reverse_ending_at_zero() { + let mut iter = RangeIterator::reverse(100, 0, 50); + assert_eq!(iter.next(), Some(51..=100)); + assert_eq!(iter.next(), Some(1..=50)); + assert_eq!(iter.next(), Some(0..=0)); + assert_eq!(iter.next(), None); + } + + #[test] + #[should_panic(expected = "max_block_range must be at least 1")] + fn forward_zero_max_block_range_panics() { + let _ = RangeIterator::forward(100, 200, 0); + } + + #[test] + #[should_panic(expected = "max_block_range must be at least 1")] + fn reverse_zero_max_block_range_panics() { + let _ = RangeIterator::reverse(200, 100, 0); + } + + #[test] + fn batch_count_increments() { + let mut iter = RangeIterator::forward(100, 300, 50); + assert_eq!(iter.batch_count(), 0); + + iter.next(); + assert_eq!(iter.batch_count(), 1); + + iter.next(); + assert_eq!(iter.batch_count(), 2); + + iter.next(); + assert_eq!(iter.batch_count(), 3); + } + + #[test] + fn batch_count_persists_after_reset() { + let mut iter = RangeIterator::forward(100, 300, 50); + iter.next(); + iter.next(); + assert_eq!(iter.batch_count(), 2); + + iter.reset_to(150); + + // batch_count is not reset + assert_eq!(iter.batch_count(), 2); + + iter.next(); + assert_eq!(iter.batch_count(), 3); + } +} diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 5d9e63a4..92dc3585 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -295,7 +295,8 @@ impl EventScannerBuilder { /// let mut scanner = EventScannerBuilder::latest(5) /// .from_block(1_000_000) /// .to_block(1_100_000) - /// .connect(robust_provider); + /// .connect(robust_provider) + /// .await?; /// # Ok(()) /// # } /// ```