Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Event Scanner is a Rust library for monitoring EVM-based smart contract events.
The library exposes two primary layers:

- `EventScannerBuilder` / `EventScanner` – the main module the application will interact with.
- `BlockScanner` – lower-level component that streams block ranges, handles reorg, batching, and provider subscriptions. This is exposed to the user but has many edge cases which will be documented in the future. For now interact with this via the `EventScanner`
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already noted that this is in alpha, and the block range scanner is as stable as the event scanner, so seems redundant to recommend this.

- `BlockRangeScanner` – lower-level component that streams block ranges, handles reorg, batching, and provider subscriptions.

Callbacks implement the `EventCallback` trait. They are executed through a `CallbackStrategy` that performs retries when necessary before reporting failures.

Expand Down
18 changes: 9 additions & 9 deletions src/block_scanner.rs → src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
//!
//! ```rust,no_run
//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
//! use event_scanner::block_scanner::BlockScannerError;
//! use event_scanner::block_range_scanner::BlockScannerError;
//! use std::ops::Range;
//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
//!
//! use alloy::transports::http::reqwest::Url;
//! use event_scanner::block_scanner::{BlockScanner, BlockScannerClient};
//! use event_scanner::block_range_scanner::{BlockRangeScanner, BlockScannerClient};
//! use tokio::time::Duration;
//! use tracing::{error, info};
//!
Expand All @@ -17,7 +17,7 @@
//! tracing_subscriber::fmt::init();
//!
//! // Configuration
//! let block_scanner = BlockScanner::new()
//! let block_range_scanner = BlockRangeScanner::new()
//! .with_blocks_read_per_epoch(1000)
//! .with_reorg_rewind_depth(5)
//! .with_retry_interval(Duration::from_secs(12))
Expand All @@ -26,7 +26,7 @@
//! .await?;
//!
//! // Create client to send subscribe command to block scanner
//! let subscription_client: BlockScannerClient = block_scanner.run()?;
//! let subscription_client: BlockScannerClient = block_range_scanner.run()?;
//!
//! let mut receiver: ReceiverStream<Result<Range<BlockNumber>, BlockScannerError>> =
//! subscription_client
Expand Down Expand Up @@ -187,20 +187,20 @@ struct Config {
block_confirmations: u64,
}

pub struct BlockScanner {
pub struct BlockRangeScanner {
blocks_read_per_epoch: usize,
reorg_rewind_depth: u64,
retry_interval: Duration,
block_confirmations: u64,
}

impl Default for BlockScanner {
impl Default for BlockRangeScanner {
fn default() -> Self {
Self::new()
}
}

impl BlockScanner {
impl BlockRangeScanner {
#[must_use]
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -778,7 +778,7 @@ mod tests {
async fn live_mode_processes_all_blocks() -> anyhow::Result<()> {
let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?;

let sub_client = BlockScanner::new()
let client = BlockRangeScanner::new()
.with_blocks_read_per_epoch(3)
.with_reorg_rewind_depth(5)
.with_retry_interval(Duration::from_secs(1))
Expand All @@ -790,7 +790,7 @@ mod tests {
let expected_blocks = 10;

let mut receiver =
sub_client.subscribe(BlockNumberOrTag::Latest, None).await?.take(expected_blocks);
client.subscribe(BlockNumberOrTag::Latest, None).await?.take(expected_blocks);

let mut block_range_start = 0;

Expand Down
31 changes: 17 additions & 14 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use crate::{
block_scanner::{BlockScanner, BlockScannerError, ConnectedBlockScanner},
block_range_scanner::{BlockRangeScanner, BlockScannerError, ConnectedBlockScanner},
callback::strategy::{CallbackStrategy, StateSyncAwareStrategy},
types::EventFilter,
};
Expand All @@ -18,7 +18,7 @@ use tokio_stream::StreamExt;
use tracing::{error, info, warn};

pub struct EventScannerBuilder {
block_scanner: BlockScanner,
block_range_scanner: BlockRangeScanner,
tracked_events: Vec<EventFilter>,
callback_strategy: Arc<dyn CallbackStrategy>,
}
Expand All @@ -34,7 +34,7 @@ impl EventScannerBuilder {
/// Creates a new builder with default block scanner and callback strategy.
pub fn new() -> Self {
Self {
block_scanner: BlockScanner::new(),
block_range_scanner: BlockRangeScanner::new(),
tracked_events: Vec::new(),
callback_strategy: Self::get_default_callback_strategy(),
}
Expand Down Expand Up @@ -64,28 +64,31 @@ impl EventScannerBuilder {
/// Configures how many blocks are read per epoch during a historical sync.
#[must_use]
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
self.block_scanner = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
self.block_range_scanner =
self.block_range_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
self
}

/// Sets the depth to rewind when a reorg is detected.
#[must_use]
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
self.block_scanner = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
self.block_range_scanner =
self.block_range_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
self
}

/// Adjusts the retry interval when reconnecting to the provider.
#[must_use]
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.block_scanner = self.block_scanner.with_retry_interval(retry_interval);
self.block_range_scanner = self.block_range_scanner.with_retry_interval(retry_interval);
self
}

/// Configures how many confirmations are required before processing a block (used for reorgs).
#[must_use]
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
self.block_scanner = self.block_scanner.with_block_confirmations(block_confirmations);
self.block_range_scanner =
self.block_range_scanner.with_block_confirmations(block_confirmations);
self
}

Expand All @@ -98,9 +101,9 @@ impl EventScannerBuilder {
self,
ws_url: Url,
) -> Result<EventScanner<N>, BlockScannerError> {
let block_scanner = self.block_scanner.connect_ws(ws_url).await?;
let block_range_scanner = self.block_range_scanner.connect_ws(ws_url).await?;
Ok(EventScanner {
block_scanner,
block_range_scanner,
tracked_events: self.tracked_events,
callback_strategy: self.callback_strategy,
})
Expand All @@ -115,9 +118,9 @@ impl EventScannerBuilder {
self,
ipc_path: impl Into<String>,
) -> Result<EventScanner<N>, BlockScannerError> {
let block_scanner = self.block_scanner.connect_ipc(ipc_path.into()).await?;
let block_range_scanner = self.block_range_scanner.connect_ipc(ipc_path.into()).await?;
Ok(EventScanner {
block_scanner,
block_range_scanner,
tracked_events: self.tracked_events,
callback_strategy: self.callback_strategy,
})
Expand All @@ -131,7 +134,7 @@ impl EventScannerBuilder {
}

pub struct EventScanner<N: Network> {
block_scanner: ConnectedBlockScanner<N>,
block_range_scanner: ConnectedBlockScanner<N>,
tracked_events: Vec<EventFilter>,
callback_strategy: Arc<dyn CallbackStrategy>,
}
Expand Down Expand Up @@ -180,7 +183,7 @@ impl<N: Network> EventScanner<N> {
event_channels.insert(unique_event, sender);
}

let client = self.block_scanner.run()?;
let client = self.block_range_scanner.run()?;
let mut stream = client.subscribe(start_height, end_height).await?;

while let Some(range) = stream.next().await {
Expand Down Expand Up @@ -235,7 +238,7 @@ impl<N: Network> EventScanner<N> {
.from_block(from_block)
.to_block(to_block);

match self.block_scanner.provider().get_logs(&filter).await {
match self.block_range_scanner.provider().get_logs(&filter).await {
Ok(logs) => {
if logs.is_empty() {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod block_scanner;
pub mod block_range_scanner;
pub mod callback;
pub mod event_scanner;
pub mod types;
Expand Down