Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9010ae3
move common_ancestor related docs above the field
0xNeshi Dec 16, 2025
0dbc70e
fix: stream Lagged errors up to the caller
0xNeshi Dec 16, 2025
c989923
add missing mod docs to src/lib.rs
0xNeshi Dec 16, 2025
ed0e2aa
add missing mod docs to src/event_scanner/scanner/mod.rs
0xNeshi Dec 16, 2025
9fc1e1f
add missing mod and doc comments
0xNeshi Dec 16, 2025
83a448f
add missing pub API docs
0xNeshi Dec 16, 2025
d1fc0c6
clippy: inline listener in spawn fns
0xNeshi Dec 16, 2025
cfe2a5c
docs: clarify why handle_stream waits on consumer (thread) sets
0xNeshi Dec 16, 2025
eb28119
Revert "clippy: inline listener in spawn fns"
0xNeshi Dec 16, 2025
78bcabf
Revert "Revert "clippy: inline listener in spawn fns""
0xNeshi Dec 16, 2025
943f365
Revert "remove block_confirmations from latest events"
0xNeshi Dec 16, 2025
64a8b64
Revert "fix: stream Lagged errors up to the caller"
0xNeshi Dec 16, 2025
352eba1
Revert "Revert "fix: stream Lagged errors up to the caller""
0xNeshi Dec 16, 2025
996f9ba
remove unused DEFAULT_REORG_REWIND_DEPTH const
0xNeshi Dec 17, 2025
a732564
docs: Apply suggestions from code review
0xNeshi Dec 17, 2025
2eedfe9
docs: reword duplicate event possibility description
0xNeshi Dec 17, 2025
60e5b54
docs: move start-related docs to EventScanner + list specific possibl…
0xNeshi Dec 17, 2025
9e3e4e4
ref: rename generic param 'M' to 'Mode'
0xNeshi Dec 17, 2025
85e417f
docs: link to mode function creators in Mode docs
0xNeshi Dec 17, 2025
8bdc3a5
clippy
0xNeshi Dec 17, 2025
e384058
ref: try_operation_with_failover
0xNeshi Dec 18, 2025
acda174
specify exact errors in robust provider error docs
0xNeshi Dec 18, 2025
7d48e5a
expose try_operation_with_failover publicly
0xNeshi Dec 18, 2025
ec63456
ref: move derive below doc comment on Synchronize struct
0xNeshi Dec 18, 2025
e441e2e
Revert "expose try_operation_with_failover publicly"
0xNeshi Dec 18, 2025
7937616
ref: revert try_operation_with_failover ref
0xNeshi Dec 18, 2025
be1b48b
fix conflicts
0xNeshi Dec 18, 2025
309b952
merge: fix conflict
0xNeshi Dec 18, 2025
cad3330
merge: fix conflicts
0xNeshi Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,15 @@ let multi_sigs = EventFilter::new()
The scanner delivers three types of messages through the event stream:

- **`Message::Data(Vec<Log>)`** – Contains a batch of matching event logs. Each log includes the raw event data, transaction hash, block number, and other metadata.
- **`Message::Notification(Notification)`** – Notifications from the scanner:
- **`ScannerError`** – Errors indicating that the scanner has encountered issues (e.g., RPC failures, connection problems)
- **`Message::Notification(Notification)`** – Notifications from the scanner.
- **`ScannerError`** – Errors indicating that the scanner has encountered issues (e.g., RPC failures, connection problems, or a lagging consumer).

Always handle all message types in your stream processing loop to ensure robust error handling and proper reorg detection.

Notes:

- Ordering is guaranteed only within a single subscription stream. There is no global ordering guarantee across multiple subscriptions.
- When the scanner detects a reorg, it emits `Notification::ReorgDetected`. Consumers should assume the same events might be delivered more than once around reorgs (i.e. benign duplicates are possible). Depending on the application's needs, this could be handled via idempotency/deduplication or by rolling back application state on reorg notifications.

### Scanning Modes

Expand All @@ -248,7 +252,7 @@ Always handle all message types in your stream processing loop to ensure robust

- Set `max_block_range` based on your RPC provider's limits (e.g., Alchemy, Infura may limit queries to 2000 blocks). Default is 1000 blocks.
- The modes come with sensible defaults; for example, not specifying a start block for historic mode automatically sets it to the genesis block.
- For live mode, if the WebSocket subscription lags significantly (e.g., >2000 blocks), ranges are automatically capped to prevent RPC errors.
- In live mode, if the block subscription lags and the scanner needs to catch up by querying past blocks, catch-up queries are performed in ranges bounded by `max_block_range` to respect provider limits.

---

Expand Down
79 changes: 70 additions & 9 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
//! Example usage:
//! Block-range streaming service.
//!
//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams
//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for
//! certain state transitions (e.g. reorg detection).
//!
//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of
//! range streaming, or when you need direct access to the scanner's batching and reorg-detection
//! behavior.
//!
//! # Output stream
//!
//! Streams returned by [`BlockRangeScannerClient`] yield [`BlockScannerResult`] items:
//!
//! - `Ok(ScannerMessage::Data(range))` for a block range to process.
//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications.
//! - `Err(ScannerError)` for errors.
//!
//! # Ordering
//!
//! Range messages are streamed in chronological order within a single stream (lower block number
//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the
//! affected blocks.
//!
//! # Example usage:
//!
//! ```rust,no_run
//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
Expand Down Expand Up @@ -91,18 +115,19 @@ pub(crate) use range_iterator::RangeIterator;
use reorg_handler::ReorgHandler;
pub use ring_buffer::RingBufferCapacity;

/// Default maximum number of blocks per streamed range.
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;

/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;

/// Default per-stream buffer size used by scanners.
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;

// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
// is considered final)
pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;

/// The result type yielded by block-range streams.
pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;

/// Convenience alias for a streamed block-range message.
pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;

impl From<RangeInclusive<BlockNumber>> for Message {
Expand All @@ -123,9 +148,14 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
}
}

#[derive(Clone)]
/// Builder/configuration for the block-range streaming service.
#[derive(Clone, Debug)]
pub struct BlockRangeScanner {
/// Maximum number of blocks per streamed range.
pub max_block_range: u64,
/// How many past block hashes to keep in memory for reorg detection.
///
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
pub past_blocks_storage_capacity: RingBufferCapacity,
pub buffer_capacity: usize,
}
Expand All @@ -137,6 +167,7 @@ impl Default for BlockRangeScanner {
}

impl BlockRangeScanner {
/// Creates a scanner with default configuration.
#[must_use]
pub fn new() -> Self {
Self {
Expand All @@ -146,12 +177,20 @@ impl BlockRangeScanner {
}
}

/// Sets the maximum number of blocks per streamed range.
///
/// This controls batching for historical scans and for catch-up in live/sync scanners.
///
/// Must be greater than 0.
#[must_use]
pub fn max_block_range(mut self, max_block_range: u64) -> Self {
self.max_block_range = max_block_range;
self
}

/// Sets how many past block hashes to keep in memory for reorg detection.
///
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
#[must_use]
pub fn past_blocks_storage_capacity(
mut self,
Expand All @@ -161,6 +200,14 @@ impl BlockRangeScanner {
self
}

/// Sets the stream buffer capacity.
///
/// Controls the maximum number of messages that can be buffered in the stream
/// before backpressure is applied.
///
/// # Arguments
///
/// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
#[must_use]
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
self.buffer_capacity = buffer_capacity;
Expand Down Expand Up @@ -192,6 +239,11 @@ impl BlockRangeScanner {
}
}

/// A [`BlockRangeScanner`] connected to a provider.
///
/// Use [`ConnectedBlockRangeScanner::run`] to start the background service and obtain a
/// [`BlockRangeScannerClient`].
#[derive(Debug)]
pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
Expand All @@ -200,7 +252,7 @@ pub struct ConnectedBlockRangeScanner<N: Network> {
}

impl<N: Network> ConnectedBlockRangeScanner<N> {
/// Returns the `RobustProvider`
/// Returns the underlying [`RobustProvider`].
#[must_use]
pub fn provider(&self) -> &RobustProvider<N> {
&self.provider
Expand Down Expand Up @@ -230,25 +282,30 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
}
}

/// Commands accepted by the internal block-range service.
#[derive(Debug)]
pub enum Command {
enum Command {
/// Start a live stream.
StreamLive {
sender: mpsc::Sender<BlockScannerResult>,
block_confirmations: u64,
response: oneshot::Sender<Result<(), ScannerError>>,
},
/// Start a historical range stream.
StreamHistorical {
sender: mpsc::Sender<BlockScannerResult>,
start_id: BlockId,
end_id: BlockId,
response: oneshot::Sender<Result<(), ScannerError>>,
},
/// Start a stream that catches up from `start_id` and then transitions to live streaming.
StreamFrom {
sender: mpsc::Sender<BlockScannerResult>,
start_id: BlockId,
block_confirmations: u64,
response: oneshot::Sender<Result<(), ScannerError>>,
},
/// Start a reverse stream (newer to older), in batches.
Rewind {
sender: mpsc::Sender<BlockScannerResult>,
start_id: BlockId,
Expand All @@ -267,6 +324,7 @@ struct Service<N: Network> {
}

impl<N: Network> Service<N> {
/// Creates a new background service instance and its command channel.
pub fn new(
provider: RobustProvider<N>,
max_block_range: u64,
Expand Down Expand Up @@ -580,6 +638,9 @@ impl<N: Network> Service<N> {
}
}

/// Client for requesting block-range streams from the background service.
///
/// Each method returns a new stream whose items are [`BlockScannerResult`] values.
pub struct BlockRangeScannerClient {
command_sender: mpsc::Sender<Command>,
buffer_capacity: usize,
Expand All @@ -593,7 +654,7 @@ impl BlockRangeScannerClient {
/// * `command_sender` - The sender for sending commands to the subscription service.
/// * `buffer_capacity` - The capacity for buffering messages in the stream.
#[must_use]
pub fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
Self { command_sender, buffer_capacity }
}

Expand Down
14 changes: 14 additions & 0 deletions src/block_range_scanner/ring_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use std::collections::VecDeque;

/// Configuration for how many past block hashes to retain for reorg detection.
///
/// This type is re-exported as `PastBlocksStorageCapacity` from the crate root.
#[derive(Copy, Clone, Debug)]
pub enum RingBufferCapacity {
/// Keep at most `n` items.
///
/// A value of `0` disables storing past block hashes and effectively disables reorg
/// detection.
Limited(usize),
/// Keep an unbounded number of items.
///
/// WARNING: This can lead to unbounded memory growth over long-running processes.
/// Avoid using this in production deployments without an external bound.
Infinite,
}

Expand Down Expand Up @@ -56,14 +67,17 @@ impl<T> RingBuffer<T> {
}
}

/// Removes and returns the newest element from the buffer.
pub fn pop_back(&mut self) -> Option<T> {
self.inner.pop_back()
}

/// Returns a reference to the newest element in the buffer.
pub fn back(&self) -> Option<&T> {
self.inner.back()
}

/// Clears all elements currently stored in the buffer.
pub fn clear(&mut self) {
self.inner.clear();
}
Expand Down
24 changes: 24 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,61 @@ use thiserror::Error;

use crate::{robust_provider::provider::Error as RobustProviderError, types::ScannerResult};

/// Errors emitted by the scanner.
///
/// `ScannerError` values can be returned by builder `connect()` methods and are also yielded by
/// subscription streams (as `Err(ScannerError)` items).
///
/// All errors except [`ScannerError::Lagged`] are terminal and will halt further stream processing.
#[derive(Error, Debug, Clone)]
pub enum ScannerError {
/// The underlying RPC transport returned an error.
#[error("RPC error: {0}")]
RpcError(Arc<RpcError<TransportErrorKind>>),

/// The internal service has shut down and can no longer process commands.
#[error("Service is shutting down")]
ServiceShutdown,

/// A requested block (by number, hash or tag) could not be retrieved.
#[error("Block not found, Block Id: {0}")]
BlockNotFound(BlockId),

/// A timeout elapsed while waiting for an RPC response.
#[error("Operation timed out")]
Timeout,

/// A configured block parameter exceeds the latest known block.
#[error("{0} {1} exceeds the latest block {2}")]
BlockExceedsLatest(&'static str, u64, u64),

/// The requested event count is invalid (must be greater than zero).
#[error("Event count must be greater than 0")]
InvalidEventCount,

/// The configured maximum block range is invalid (must be greater than zero).
#[error("Max block range must be greater than 0")]
InvalidMaxBlockRange,

/// The configured stream buffer capacity is invalid (must be greater than zero).
#[error("Stream buffer capacity must be greater than 0")]
InvalidBufferCapacity,

/// The configured maximum number of concurrent fetches is invalid (must be greater than
/// zero).
#[error("Max concurrent fetches must be greater than 0")]
InvalidMaxConcurrentFetches,

/// A block subscription ended (for example, the underlying WebSocket subscription closed).
#[error("Subscription closed")]
SubscriptionClosed,

/// A subscription consumer could not keep up and some internal messages were skipped.
///
/// The contained value is the number of skipped messages reported by the underlying channel.
/// After emitting this error, the subscription stream may continue with newer items.
#[error("Subscription lagged")]
Lagged(u64),
}

impl From<RobustProviderError> for ScannerError {
Expand Down
2 changes: 1 addition & 1 deletion src/event_scanner/listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::event_scanner::{EventScannerResult, filter::EventFilter};
use tokio::sync::mpsc::Sender;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct EventListener {
pub filter: EventFilter,
pub sender: Sender<EventScannerResult>,
Expand Down
7 changes: 7 additions & 0 deletions src/event_scanner/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ use crate::{
types::{IntoScannerResult, ScannerResult},
};

/// The item type yielded by event subscription streams.
///
/// This is a [`ScannerMessage`] whose data payload is a batch of [`Log`] values.
pub type Message = ScannerMessage<Vec<Log>>;

/// The `Result` type yielded by event subscription streams.
///
/// Successful items are [`Message`] values; failures are [`crate::ScannerError`].
pub type EventScannerResult = ScannerResult<Vec<Log>>;

impl From<Vec<Log>> for Message {
Expand Down
10 changes: 10 additions & 0 deletions src/event_scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
//! High-level event scanner API.
//!
//! This module re-exports the primary types used for scanning EVM logs:
//!
//! - [`EventScanner`] and [`EventScannerBuilder`] for constructing and running scanners.
//! - [`EventFilter`] for defining which contract addresses and event signatures to match.
//! - [`Message`] / [`EventScannerResult`] for consuming subscription streams.
//!
//! Mode marker types (e.g. [`Live`], [`Historic`]) are also re-exported.

mod filter;
mod listener;
mod message;
Expand Down
Loading