diff --git a/src/block_range_scanner/builder.rs b/src/block_range_scanner/builder.rs new file mode 100644 index 00000000..8da5e8a9 --- /dev/null +++ b/src/block_range_scanner/builder.rs @@ -0,0 +1,101 @@ +use alloy::network::Network; + +use crate::{ + ScannerError, + block_range_scanner::{ + DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity, + scanner::BlockRangeScanner, + }, + robust_provider::IntoRobustProvider, +}; + +/// Builder/configuration for the block-range streaming service. +#[derive(Clone, Debug)] +pub struct BlockRangeScannerBuilder { + /// Maximum number of blocks per streamed range. + pub max_block_range: u64, + /// How many past block hashes to keep in memory for reorg detection. + /// + /// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled. + pub past_blocks_storage_capacity: RingBufferCapacity, + pub buffer_capacity: usize, +} + +impl Default for BlockRangeScannerBuilder { + fn default() -> Self { + Self::new() + } +} + +impl BlockRangeScannerBuilder { + /// Creates a scanner with default configuration. + #[must_use] + pub fn new() -> Self { + Self { + max_block_range: DEFAULT_MAX_BLOCK_RANGE, + past_blocks_storage_capacity: RingBufferCapacity::Limited(10), + buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY, + } + } + + /// Sets the maximum number of blocks per streamed range. + /// + /// This controls batching for historical scans and for catch-up in live/sync scanners. + /// + /// Must be greater than 0. + #[must_use] + pub fn max_block_range(mut self, max_block_range: u64) -> Self { + self.max_block_range = max_block_range; + self + } + + /// Sets how many past block hashes to keep in memory for reorg detection. + /// + /// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled. + #[must_use] + pub fn past_blocks_storage_capacity( + mut self, + past_blocks_storage_capacity: RingBufferCapacity, + ) -> Self { + self.past_blocks_storage_capacity = past_blocks_storage_capacity; + self + } + + /// Sets the stream buffer capacity. + /// + /// Controls the maximum number of messages that can be buffered in the stream + /// before backpressure is applied. + /// + /// # Arguments + /// + /// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0) + #[must_use] + pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self { + self.buffer_capacity = buffer_capacity; + self + } + + /// Connects to an existing provider + /// + /// # Errors + /// + /// Returns an error if the provider connection fails. + pub async fn connect( + self, + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + if self.max_block_range == 0 { + return Err(ScannerError::InvalidMaxBlockRange); + } + if self.buffer_capacity == 0 { + return Err(ScannerError::InvalidBufferCapacity); + } + let provider = provider.into_robust_provider().await?; + Ok(BlockRangeScanner { + provider, + max_block_range: self.max_block_range, + past_blocks_storage_capacity: self.past_blocks_storage_capacity, + buffer_capacity: self.buffer_capacity, + }) + } +} diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs index b290d3b3..a7aa9e3f 100644 --- a/src/block_range_scanner/common.rs +++ b/src/block_range_scanner/common.rs @@ -1,11 +1,13 @@ +use std::ops::RangeInclusive; + use tokio::sync::mpsc; use tokio_stream::StreamExt; use crate::{ - ScannerError, - block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler}, + ScannerError, ScannerMessage, + block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler}, robust_provider::{RobustProvider, RobustSubscription, subscription}, - types::{Notification, TryStream}, + types::{IntoScannerResult, Notification, ScannerResult, TryStream}, }; use alloy::{ consensus::BlockHeader, @@ -14,6 +16,39 @@ use alloy::{ primitives::BlockNumber, }; +/// Default maximum number of blocks per streamed range. +pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000; + +/// Default confirmation depth used by scanners that accept a `block_confirmations` setting. +pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; + +/// Default per-stream buffer size used by scanners. +pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000; + +/// The result type yielded by block-range streams. +pub type BlockScannerResult = ScannerResult>; + +/// Convenience alias for a streamed block-range message. +pub type Message = ScannerMessage>; + +impl From> for Message { + fn from(range: RangeInclusive) -> Self { + Message::Data(range) + } +} + +impl PartialEq> for Message { + fn eq(&self, other: &RangeInclusive) -> bool { + if let Message::Data(range) = self { range.eq(other) } else { false } + } +} + +impl IntoScannerResult> for RangeInclusive { + fn into_scanner_message_result(self) -> BlockScannerResult { + Ok(Message::Data(self)) + } +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn stream_live_blocks( stream_start: BlockNumber, diff --git a/src/block_range_scanner/mod.rs b/src/block_range_scanner/mod.rs new file mode 100644 index 00000000..642a7523 --- /dev/null +++ b/src/block_range_scanner/mod.rs @@ -0,0 +1,16 @@ +mod builder; +mod common; +mod range_iterator; +mod reorg_handler; +mod ring_buffer; +mod scanner; +mod sync_handler; + +pub use builder::BlockRangeScannerBuilder; +pub use common::BlockScannerResult; +pub use ring_buffer::RingBufferCapacity; +pub use scanner::BlockRangeScanner; + +pub use common::{ + DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, +}; diff --git a/src/block_range_scanner.rs b/src/block_range_scanner/scanner.rs similarity index 76% rename from src/block_range_scanner.rs rename to src/block_range_scanner/scanner.rs index 5920be28..1b919232 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner/scanner.rs @@ -31,10 +31,7 @@ //! //! use alloy::providers::{Provider, ProviderBuilder}; //! use event_scanner::{ -//! ScannerError, ScannerMessage, -//! block_range_scanner::{ -//! BlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, -//! }, +//! BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, ScannerError, ScannerMessage, //! robust_provider::RobustProviderBuilder, //! }; //! use tokio::time::Duration; @@ -48,7 +45,7 @@ //! // Configuration //! let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?; //! let robust_provider = RobustProviderBuilder::new(provider).build().await?; -//! let block_range_scanner = BlockRangeScanner::new().connect(robust_provider).await?; +//! let block_range_scanner = BlockRangeScannerBuilder::new().connect(robust_provider).await?; //! //! let mut stream = block_range_scanner //! .stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS) @@ -80,169 +77,39 @@ //! } //! ``` -use std::{cmp::Ordering, ops::RangeInclusive}; +use std::cmp::Ordering; use tokio::{sync::mpsc, try_join}; use tokio_stream::wrappers::ReceiverStream; use crate::{ - ScannerError, ScannerMessage, - block_range_scanner::sync_handler::SyncHandler, - robust_provider::{IntoRobustProvider, RobustProvider}, - types::{IntoScannerResult, Notification, ScannerResult, TryStream}, + Notification, ScannerError, + block_range_scanner::{ + RingBufferCapacity, + common::{self, BlockScannerResult}, + range_iterator::RangeIterator, + reorg_handler::ReorgHandler, + sync_handler::SyncHandler, + }, + robust_provider::RobustProvider, + types::TryStream, }; use alloy::{ consensus::BlockHeader, eips::{BlockId, BlockNumberOrTag}, network::{BlockResponse, Network}, - primitives::BlockNumber, }; -mod common; -mod range_iterator; -mod reorg_handler; -mod ring_buffer; -mod sync_handler; - -pub(crate) use range_iterator::RangeIterator; - -use reorg_handler::ReorgHandler; -pub use ring_buffer::RingBufferCapacity; - -/// Default maximum number of blocks per streamed range. -pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000; - -/// Default confirmation depth used by scanners that accept a `block_confirmations` setting. -pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; - -/// Default per-stream buffer size used by scanners. -pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000; - -/// The result type yielded by block-range streams. -pub type BlockScannerResult = ScannerResult>; - -/// Convenience alias for a streamed block-range message. -pub type Message = ScannerMessage>; - -impl From> for Message { - fn from(range: RangeInclusive) -> Self { - Message::Data(range) - } -} - -impl PartialEq> for Message { - fn eq(&self, other: &RangeInclusive) -> bool { - if let Message::Data(range) = self { range.eq(other) } else { false } - } -} - -impl IntoScannerResult> for RangeInclusive { - fn into_scanner_message_result(self) -> BlockScannerResult { - Ok(Message::Data(self)) - } -} - -/// Builder/configuration for the block-range streaming service. -#[derive(Clone, Debug)] -pub struct BlockRangeScanner { - /// Maximum number of blocks per streamed range. - pub max_block_range: u64, - /// How many past block hashes to keep in memory for reorg detection. - /// - /// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled. - pub past_blocks_storage_capacity: RingBufferCapacity, - pub buffer_capacity: usize, -} - -impl Default for BlockRangeScanner { - fn default() -> Self { - Self::new() - } -} - -impl BlockRangeScanner { - /// Creates a scanner with default configuration. - #[must_use] - pub fn new() -> Self { - Self { - max_block_range: DEFAULT_MAX_BLOCK_RANGE, - past_blocks_storage_capacity: RingBufferCapacity::Limited(10), - buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY, - } - } - - /// Sets the maximum number of blocks per streamed range. - /// - /// This controls batching for historical scans and for catch-up in live/sync scanners. - /// - /// Must be greater than 0. - #[must_use] - pub fn max_block_range(mut self, max_block_range: u64) -> Self { - self.max_block_range = max_block_range; - self - } - - /// Sets how many past block hashes to keep in memory for reorg detection. - /// - /// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled. - #[must_use] - pub fn past_blocks_storage_capacity( - mut self, - past_blocks_storage_capacity: RingBufferCapacity, - ) -> Self { - self.past_blocks_storage_capacity = past_blocks_storage_capacity; - self - } - - /// Sets the stream buffer capacity. - /// - /// Controls the maximum number of messages that can be buffered in the stream - /// before backpressure is applied. - /// - /// # Arguments - /// - /// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0) - #[must_use] - pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self { - self.buffer_capacity = buffer_capacity; - self - } - - /// Connects to an existing provider - /// - /// # Errors - /// - /// Returns an error if the provider connection fails. - pub async fn connect( - self, - provider: impl IntoRobustProvider, - ) -> Result, ScannerError> { - if self.max_block_range == 0 { - return Err(ScannerError::InvalidMaxBlockRange); - } - if self.buffer_capacity == 0 { - return Err(ScannerError::InvalidBufferCapacity); - } - let provider = provider.into_robust_provider().await?; - Ok(ConnectedBlockRangeScanner { - provider, - max_block_range: self.max_block_range, - past_blocks_storage_capacity: self.past_blocks_storage_capacity, - buffer_capacity: self.buffer_capacity, - }) - } -} - -/// A [`BlockRangeScanner`] connected to a provider. +/// A [`BlockRangeScanner`](crate::BlockRangeScanner) connected to a provider. #[derive(Debug)] -pub struct ConnectedBlockRangeScanner { - provider: RobustProvider, - max_block_range: u64, - past_blocks_storage_capacity: RingBufferCapacity, - buffer_capacity: usize, +pub struct BlockRangeScanner { + pub(crate) provider: RobustProvider, + pub(crate) max_block_range: u64, + pub(crate) past_blocks_storage_capacity: RingBufferCapacity, + pub(crate) buffer_capacity: usize, } -impl ConnectedBlockRangeScanner { +impl BlockRangeScanner { /// Returns the underlying [`RobustProvider`]. #[must_use] pub fn provider(&self) -> &RobustProvider { @@ -597,6 +464,10 @@ impl ConnectedBlockRangeScanner { #[cfg(test)] mod tests { + use crate::block_range_scanner::{ + BlockRangeScannerBuilder, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, + }; + use super::*; use alloy::{ eips::{BlockId, BlockNumberOrTag}, @@ -608,7 +479,7 @@ mod tests { #[test] fn block_range_scanner_defaults_match_constants() { - let scanner = BlockRangeScanner::new(); + let scanner = BlockRangeScannerBuilder::new(); assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE); assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY); @@ -616,7 +487,7 @@ mod tests { #[test] fn builder_methods_update_configuration() { - let scanner = BlockRangeScanner::new().max_block_range(42).buffer_capacity(33); + let scanner = BlockRangeScannerBuilder::new().max_block_range(42).buffer_capacity(33); assert_eq!(scanner.max_block_range, 42); assert_eq!(scanner.buffer_capacity, 33); @@ -637,7 +508,7 @@ mod tests { #[tokio::test] async fn returns_error_with_zero_buffer_capacity() { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let result = BlockRangeScanner::new().buffer_capacity(0).connect(provider).await; + let result = BlockRangeScannerBuilder::new().buffer_capacity(0).connect(provider).await; assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity))); } @@ -645,7 +516,7 @@ mod tests { #[tokio::test] async fn returns_error_with_zero_max_block_range() { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let result = BlockRangeScanner::new().max_block_range(0).connect(provider).await; + let result = BlockRangeScannerBuilder::new().max_block_range(0).connect(provider).await; assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange))); } diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs index a9f4dcda..f14c4082 100644 --- a/src/block_range_scanner/sync_handler.rs +++ b/src/block_range_scanner/sync_handler.rs @@ -4,7 +4,9 @@ use tokio::sync::mpsc; use crate::{ Notification, ScannerError, block_range_scanner::{ - BlockScannerResult, common, reorg_handler::ReorgHandler, ring_buffer::RingBufferCapacity, + common::{self, BlockScannerResult}, + reorg_handler::ReorgHandler, + ring_buffer::RingBufferCapacity, }, robust_provider::RobustProvider, types::TryStream, diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index 621da264..66b8ffb1 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -163,7 +163,7 @@ impl EventScanner { #[cfg(test)] mod tests { use crate::{ - DEFAULT_STREAM_BUFFER_CAPACITY, block_range_scanner::DEFAULT_MAX_BLOCK_RANGE, + block_range_scanner::{DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY}, event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, }; diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index c946f515..2571c161 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -174,8 +174,9 @@ impl EventScanner { #[cfg(test)] mod tests { use crate::{ - DEFAULT_STREAM_BUFFER_CAPACITY, - block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE}, + block_range_scanner::{ + DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, + }, event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, }; diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/scanner/live.rs index 0a6e5bfb..369ec081 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/scanner/live.rs @@ -100,8 +100,9 @@ mod tests { use alloy_node_bindings::Anvil; use crate::{ - DEFAULT_STREAM_BUFFER_CAPACITY, - block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE}, + block_range_scanner::{ + DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, + }, event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, }; diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 9eec42fa..0398830c 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -39,11 +39,8 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use crate::{ - EventFilter, ScannerError, - block_range_scanner::{ - BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, - RingBufferCapacity, - }, + BlockRangeScannerBuilder, EventFilter, ScannerError, + block_range_scanner::{BlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, RingBufferCapacity}, event_scanner::{EventScannerResult, listener::EventListener}, robust_provider::IntoRobustProvider, }; @@ -164,7 +161,7 @@ impl Default for Live { #[derive(Debug)] pub struct EventScanner { config: Mode, - block_range_scanner: ConnectedBlockRangeScanner, + block_range_scanner: BlockRangeScanner, listeners: Vec, } @@ -172,7 +169,7 @@ pub struct EventScanner { #[derive(Default, Debug)] pub struct EventScannerBuilder { pub(crate) config: Mode, - pub(crate) block_range_scanner: BlockRangeScanner, + pub(crate) block_range_scanner: BlockRangeScannerBuilder, } impl EventScannerBuilder { @@ -466,7 +463,7 @@ impl EventScannerBuilder { block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES, }, - block_range_scanner: BlockRangeScanner::default(), + block_range_scanner: BlockRangeScannerBuilder::default(), } } } @@ -480,7 +477,7 @@ impl EventScannerBuilder { block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES, }, - block_range_scanner: BlockRangeScanner::default(), + block_range_scanner: BlockRangeScannerBuilder::default(), } } } @@ -494,7 +491,7 @@ impl EventScannerBuilder { block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES, }, - block_range_scanner: BlockRangeScanner::default(), + block_range_scanner: BlockRangeScannerBuilder::default(), } } } @@ -605,7 +602,7 @@ mod tests { rpc::client::RpcClient, }; - use crate::DEFAULT_STREAM_BUFFER_CAPACITY; + use crate::block_range_scanner::DEFAULT_STREAM_BUFFER_CAPACITY; use super::*; diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index e5560246..8a457c0b 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -114,8 +114,9 @@ mod tests { use alloy_node_bindings::Anvil; use crate::{ - DEFAULT_STREAM_BUFFER_CAPACITY, - block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE}, + block_range_scanner::{ + DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, + }, event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, }; diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index f883677f..365d93b3 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -155,8 +155,9 @@ mod tests { use alloy_node_bindings::Anvil; use crate::{ - DEFAULT_STREAM_BUFFER_CAPACITY, - block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE}, + block_range_scanner::{ + DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, + }, event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, }; diff --git a/src/lib.rs b/src/lib.rs index 54d585bc..81afbe9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,9 @@ mod event_scanner; mod types; pub use block_range_scanner::{ - DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity as PastBlocksStorageCapacity, + BlockRangeScanner, BlockRangeScannerBuilder, BlockScannerResult, DEFAULT_BLOCK_CONFIRMATIONS, + DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity, + RingBufferCapacity as PastBlocksStorageCapacity, }; pub use error::ScannerError; diff --git a/tests/block_range_scanner.rs b/tests/block_range_scanner.rs index 2952f6a6..7460cc9f 100644 --- a/tests/block_range_scanner.rs +++ b/tests/block_range_scanner.rs @@ -5,8 +5,8 @@ use alloy::{ }; use alloy_node_bindings::Anvil; use event_scanner::{ - Notification, ScannerError, assert_closed, assert_empty, assert_next, assert_range_coverage, - block_range_scanner::BlockRangeScanner, + BlockRangeScannerBuilder, Notification, ScannerError, assert_closed, assert_empty, assert_next, + assert_range_coverage, }; #[tokio::test] @@ -16,9 +16,9 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh // --- Zero block confirmations -> stream immediately --- - let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; + let mut brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?; - let mut stream = client.stream_live(0).await?; + let mut stream = brs.stream_live(0).await?; provider.anvil_mine(Some(5), None).await?; @@ -32,7 +32,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh // --- 1 block confirmation --- - let mut stream = client.stream_live(1).await?; + let mut stream = brs.stream_live(1).await?; provider.anvil_mine(Some(5), None).await?; @@ -51,9 +51,9 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh async fn live_with_block_confirmations_always_emits_genesis_block() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; + let mut brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?; - let mut stream = client.stream_live(3).await?; + let mut stream = brs.stream_live(3).await?; provider.anvil_mine(Some(1), None).await?; assert_next!(stream, 0..=0); @@ -78,11 +78,11 @@ async fn stream_from_starts_at_latest_once_it_has_enough_confirmations() -> anyh let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect(provider.clone()).await?; + let brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?; provider.anvil_mine(Some(20), None).await?; - let stream = client.stream_from(BlockNumberOrTag::Latest, 5).await?; + let stream = brs.stream_from(BlockNumberOrTag::Latest, 5).await?; let stream = assert_empty!(stream); @@ -106,9 +106,9 @@ async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Re let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; + let mut brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?; - let mut stream = client.stream_live(5).await?; + let mut stream = brs.stream_live(5).await?; // mine initial blocks provider.anvil_mine(Some(10), None).await?; @@ -138,9 +138,9 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result< let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; + let mut brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?; - let mut stream = client.stream_live(3).await?; + let mut stream = brs.stream_live(3).await?; // mine initial blocks provider.anvil_mine(Some(10), None).await?; @@ -182,9 +182,10 @@ async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Res let end_num = 110; - let mut client = BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(30).connect(provider.clone()).await?; - let mut stream = client + let mut stream = brs .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num)) .await?; @@ -213,10 +214,11 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu provider.anvil_mine(Some(120), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(30).connect(provider.clone()).await?; let mut stream = - client.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Latest).await?; + brs.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Latest).await?; assert_next!(stream, 0..=29); assert_next!(stream, 30..=56); @@ -245,10 +247,11 @@ async fn historical_reorg_occurring_immediately_after_finalized_processing_is_ig provider.anvil_mine(Some(11), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(10).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(10).connect(provider.clone()).await?; let mut stream = - client.stream_historical(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; + brs.stream_historical(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; assert_next!(stream, 0..=0); let mut stream = assert_empty!(stream); @@ -269,10 +272,11 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> { provider.anvil_mine(Some(100), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(5).connect(provider.clone()).await?; // ranges where each batch is of max blocks per epoch size - let mut stream = client.stream_historical(0, 19).await?; + let mut stream = brs.stream_historical(0, 19).await?; assert_next!(stream, 0..=4); assert_next!(stream, 5..=9); assert_next!(stream, 10..=14); @@ -280,29 +284,29 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> { assert_closed!(stream); // ranges where last batch is smaller than blocks per epoch - let mut stream = client.stream_historical(93, 99).await?; + let mut stream = brs.stream_historical(93, 99).await?; assert_next!(stream, 93..=97); assert_next!(stream, 98..=99); assert_closed!(stream); // range where blocks per epoch is larger than the number of blocks in the range - let mut stream = client.stream_historical(3, 5).await?; + let mut stream = brs.stream_historical(3, 5).await?; assert_next!(stream, 3..=5); assert_closed!(stream); // single item range - let mut stream = client.stream_historical(3, 3).await?; + let mut stream = brs.stream_historical(3, 3).await?; assert_next!(stream, 3..=3); assert_closed!(stream); // range where blocks per epoch is larger than the number of blocks on chain - let mut client = BlockRangeScanner::new().max_block_range(200).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(200).connect(provider).await?; - let mut stream = client.stream_historical(0, 20).await?; + let mut stream = brs.stream_historical(0, 20).await?; assert_next!(stream, 0..=20); assert_closed!(stream); - let mut stream = client.stream_historical(0, 99).await?; + let mut stream = brs.stream_historical(0, 99).await?; assert_next!(stream, 0..=36); assert_next!(stream, 37..=99); assert_closed!(stream); @@ -317,15 +321,15 @@ async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> { provider.anvil_mine(Some(11), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(5).connect(provider).await?; - let mut stream = client.stream_historical(10, 0).await?; + let mut stream = brs.stream_historical(10, 0).await?; assert_next!(stream, 0..=0); assert_next!(stream, 1..=5); assert_next!(stream, 6..=10); assert_closed!(stream); - let mut stream = client.stream_historical(0, 10).await?; + let mut stream = brs.stream_historical(0, 10).await?; assert_next!(stream, 0..=0); assert_next!(stream, 1..=5); assert_next!(stream, 6..=10); @@ -341,9 +345,9 @@ async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<() provider.anvil_mine(Some(150), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(100).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(100).connect(provider).await?; - let mut stream = client.stream_rewind(100, 150).await?; + let mut stream = brs.stream_rewind(100, 150).await?; // Range length is 51, epoch is 100 -> single batch [100..=150] assert_next!(stream, 100..=150); @@ -359,9 +363,9 @@ async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> any provider.anvil_mine(Some(15), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(5).connect(provider).await?; - let mut stream = client.stream_rewind(0, 14).await?; + let mut stream = brs.stream_rewind(0, 14).await?; // 0..=14 with epoch 5 -> [10..=14, 5..=9, 0..=4] assert_next!(stream, 10..=14); @@ -379,9 +383,9 @@ async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Re provider.anvil_mine(Some(15), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(4).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(4).connect(provider).await?; - let mut stream = client.stream_rewind(3, 12).await?; + let mut stream = brs.stream_rewind(3, 12).await?; // 3..=12 with epoch 4 -> ends: 12,8,4 -> batches: [9..=12, 5..=8, 3..=4] assert_next!(stream, 9..=12); @@ -399,9 +403,9 @@ async fn rewind_single_block_range() -> anyhow::Result<()> { provider.anvil_mine(Some(15), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(5).connect(provider).await?; - let mut stream = client.stream_rewind(7, 7).await?; + let mut stream = brs.stream_rewind(7, 7).await?; assert_next!(stream, 7..=7); assert_closed!(stream); @@ -416,9 +420,9 @@ async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Resu provider.anvil_mine(Some(15), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(1).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(1).connect(provider).await?; - let mut stream = client.stream_rewind(5, 8).await?; + let mut stream = brs.stream_rewind(5, 8).await?; // 5..=8 with epoch 1 -> [8..=8, 7..=7, 6..=6, 5..=5] assert_next!(stream, 8..=8); @@ -438,10 +442,10 @@ async fn command_rewind_defaults_latest_to_earliest_batches_correctly() -> anyho // Mine 20 blocks, so the total number of blocks is 21 (including 0th block) provider.anvil_mine(Some(20), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(7).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(7).connect(provider).await?; let mut stream = - client.stream_rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; + brs.stream_rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; assert_next!(stream, 14..=20); assert_next!(stream, 7..=13); @@ -459,16 +463,16 @@ async fn command_rewind_handles_start_and_end_in_any_order() -> anyhow::Result<( // Ensure blocks at 3 and 15 exist provider.anvil_mine(Some(16), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(5).connect(provider).await?; - let mut stream = client.stream_rewind(15, 3).await?; + let mut stream = brs.stream_rewind(15, 3).await?; assert_next!(stream, 11..=15); assert_next!(stream, 6..=10); assert_next!(stream, 3..=5); assert_closed!(stream); - let mut stream = client.stream_rewind(3, 15).await?; + let mut stream = brs.stream_rewind(3, 15).await?; assert_next!(stream, 11..=15); assert_next!(stream, 6..=10); @@ -484,9 +488,9 @@ async fn command_rewind_propagates_block_not_found_error() -> anyhow::Result<()> // Do not mine up to 999 so start won't exist let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; + let mut brs = BlockRangeScannerBuilder::new().max_block_range(5).connect(provider).await?; - let stream = client.stream_rewind(0, 999).await; + let stream = brs.stream_rewind(0, 999).await; assert!(matches!( stream, @@ -504,9 +508,10 @@ async fn rewind_reorg_emits_notification_and_rescans_affected_range() -> anyhow: provider.anvil_mine(Some(20), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(5).connect(provider.clone()).await?; - let mut stream = client.stream_rewind(5, 20).await?; + let mut stream = brs.stream_rewind(5, 20).await?; assert_next!(stream, 16..=20); assert_next!(stream, 11..=15); @@ -533,9 +538,10 @@ async fn deep_rewind_reorg_streams_affected_range_in_chronologi() -> anyhow::Res provider.anvil_mine(Some(20), None).await?; - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(5).connect(provider.clone()).await?; - let mut stream = client.stream_rewind(5, 20).await?; + let mut stream = brs.stream_rewind(5, 20).await?; assert_next!(stream, 16..=20); @@ -565,10 +571,11 @@ async fn rewind_skips_reorg_check_when_tip_below_finalized() -> anyhow::Result<( let finalized = provider.get_block_by_number(BlockNumberOrTag::Finalized).await?.unwrap(); - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(5).connect(provider.clone()).await?; // Rewind with tip < finalized - let mut stream = client.stream_rewind(0, finalized.header.number - 1).await?; + let mut stream = brs.stream_rewind(0, finalized.header.number - 1).await?; assert_next!(stream, 31..=35); assert_next!(stream, 26..=30); @@ -597,10 +604,11 @@ async fn rewind_skips_reorg_when_tip_is_at_finalized() -> anyhow::Result<()> { provider.anvil_mine(Some(100), None).await?; let finalized = provider.get_block_by_number(BlockNumberOrTag::Finalized).await?.unwrap(); - let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; + let mut brs = + BlockRangeScannerBuilder::new().max_block_range(5).connect(provider.clone()).await?; // Rewind with tip == finalized - let mut stream = client.stream_rewind(0, finalized.header.number).await?; + let mut stream = brs.stream_rewind(0, finalized.header.number).await?; assert_next!(stream, 32..=36); assert_next!(stream, 27..=31);