diff --git a/README.md b/README.md index a2e83dae..eb1a1728 100644 --- a/README.md +++ b/README.md @@ -230,11 +230,15 @@ let multi_sigs = EventFilter::new() The scanner delivers three types of messages through the event stream: - **`Message::Data(Vec)`** – Contains a batch of matching event logs. Each log includes the raw event data, transaction hash, block number, and other metadata. -- **`Message::Notification(Notification)`** – Notifications from the scanner: -- **`ScannerError`** – Errors indicating that the scanner has encountered issues (e.g., RPC failures, connection problems) +- **`Message::Notification(Notification)`** – Notifications from the scanner. +- **`ScannerError`** – Errors indicating that the scanner has encountered issues (e.g., RPC failures, connection problems, or a lagging consumer). Always handle all message types in your stream processing loop to ensure robust error handling and proper reorg detection. +Notes: + +- Ordering is guaranteed only within a single subscription stream. There is no global ordering guarantee across multiple subscriptions. +- When the scanner detects a reorg, it emits `Notification::ReorgDetected`. Consumers should assume the same events might be delivered more than once around reorgs (i.e. benign duplicates are possible). Depending on the application's needs, this could be handled via idempotency/deduplication or by rolling back application state on reorg notifications. ### Scanning Modes @@ -248,7 +252,7 @@ Always handle all message types in your stream processing loop to ensure robust - Set `max_block_range` based on your RPC provider's limits (e.g., Alchemy, Infura may limit queries to 2000 blocks). Default is 1000 blocks. - The modes come with sensible defaults; for example, not specifying a start block for historic mode automatically sets it to the genesis block. -- For live mode, if the WebSocket subscription lags significantly (e.g., >2000 blocks), ranges are automatically capped to prevent RPC errors. +- In live mode, if the block subscription lags and the scanner needs to catch up by querying past blocks, catch-up queries are performed in ranges bounded by `max_block_range` to respect provider limits. --- diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index b5e9db25..821d747f 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -1,4 +1,28 @@ -//! Example usage: +//! Block-range streaming service. +//! +//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams +//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for +//! certain state transitions (e.g. reorg detection). +//! +//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of +//! range streaming, or when you need direct access to the scanner's batching and reorg-detection +//! behavior. +//! +//! # Output stream +//! +//! Streams returned by [`BlockRangeScannerClient`] yield [`BlockScannerResult`] items: +//! +//! - `Ok(ScannerMessage::Data(range))` for a block range to process. +//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications. +//! - `Err(ScannerError)` for errors. +//! +//! # Ordering +//! +//! Range messages are streamed in chronological order within a single stream (lower block number +//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the +//! affected blocks. +//! +//! # Example usage: //! //! ```rust,no_run //! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber}; @@ -91,18 +115,19 @@ pub(crate) use range_iterator::RangeIterator; use reorg_handler::ReorgHandler; pub use ring_buffer::RingBufferCapacity; +/// Default maximum number of blocks per streamed range. pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000; +/// Default confirmation depth used by scanners that accept a `block_confirmations` setting. pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; +/// Default per-stream buffer size used by scanners. pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000; -// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block -// is considered final) -pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; - +/// The result type yielded by block-range streams. pub type BlockScannerResult = ScannerResult>; +/// Convenience alias for a streamed block-range message. pub type Message = ScannerMessage>; impl From> for Message { @@ -123,9 +148,14 @@ impl IntoScannerResult> for RangeInclusive Self { Self { @@ -146,12 +177,20 @@ impl BlockRangeScanner { } } + /// Sets the maximum number of blocks per streamed range. + /// + /// This controls batching for historical scans and for catch-up in live/sync scanners. + /// + /// Must be greater than 0. #[must_use] pub fn max_block_range(mut self, max_block_range: u64) -> Self { self.max_block_range = max_block_range; self } + /// Sets how many past block hashes to keep in memory for reorg detection. + /// + /// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled. #[must_use] pub fn past_blocks_storage_capacity( mut self, @@ -161,6 +200,14 @@ impl BlockRangeScanner { self } + /// Sets the stream buffer capacity. + /// + /// Controls the maximum number of messages that can be buffered in the stream + /// before backpressure is applied. + /// + /// # Arguments + /// + /// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0) #[must_use] pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self { self.buffer_capacity = buffer_capacity; @@ -192,6 +239,11 @@ impl BlockRangeScanner { } } +/// A [`BlockRangeScanner`] connected to a provider. +/// +/// Use [`ConnectedBlockRangeScanner::run`] to start the background service and obtain a +/// [`BlockRangeScannerClient`]. +#[derive(Debug)] pub struct ConnectedBlockRangeScanner { provider: RobustProvider, max_block_range: u64, @@ -200,7 +252,7 @@ pub struct ConnectedBlockRangeScanner { } impl ConnectedBlockRangeScanner { - /// Returns the `RobustProvider` + /// Returns the underlying [`RobustProvider`]. #[must_use] pub fn provider(&self) -> &RobustProvider { &self.provider @@ -230,25 +282,30 @@ impl ConnectedBlockRangeScanner { } } +/// Commands accepted by the internal block-range service. #[derive(Debug)] -pub enum Command { +enum Command { + /// Start a live stream. StreamLive { sender: mpsc::Sender, block_confirmations: u64, response: oneshot::Sender>, }, + /// Start a historical range stream. StreamHistorical { sender: mpsc::Sender, start_id: BlockId, end_id: BlockId, response: oneshot::Sender>, }, + /// Start a stream that catches up from `start_id` and then transitions to live streaming. StreamFrom { sender: mpsc::Sender, start_id: BlockId, block_confirmations: u64, response: oneshot::Sender>, }, + /// Start a reverse stream (newer to older), in batches. Rewind { sender: mpsc::Sender, start_id: BlockId, @@ -267,6 +324,7 @@ struct Service { } impl Service { + /// Creates a new background service instance and its command channel. pub fn new( provider: RobustProvider, max_block_range: u64, @@ -580,6 +638,9 @@ impl Service { } } +/// Client for requesting block-range streams from the background service. +/// +/// Each method returns a new stream whose items are [`BlockScannerResult`] values. pub struct BlockRangeScannerClient { command_sender: mpsc::Sender, buffer_capacity: usize, @@ -593,7 +654,7 @@ impl BlockRangeScannerClient { /// * `command_sender` - The sender for sending commands to the subscription service. /// * `buffer_capacity` - The capacity for buffering messages in the stream. #[must_use] - pub fn new(command_sender: mpsc::Sender, buffer_capacity: usize) -> Self { + fn new(command_sender: mpsc::Sender, buffer_capacity: usize) -> Self { Self { command_sender, buffer_capacity } } diff --git a/src/block_range_scanner/ring_buffer.rs b/src/block_range_scanner/ring_buffer.rs index 6b00be69..f8aa6e89 100644 --- a/src/block_range_scanner/ring_buffer.rs +++ b/src/block_range_scanner/ring_buffer.rs @@ -1,8 +1,19 @@ use std::collections::VecDeque; +/// Configuration for how many past block hashes to retain for reorg detection. +/// +/// This type is re-exported as `PastBlocksStorageCapacity` from the crate root. #[derive(Copy, Clone, Debug)] pub enum RingBufferCapacity { + /// Keep at most `n` items. + /// + /// A value of `0` disables storing past block hashes and effectively disables reorg + /// detection. Limited(usize), + /// Keep an unbounded number of items. + /// + /// WARNING: This can lead to unbounded memory growth over long-running processes. + /// Avoid using this in production deployments without an external bound. Infinite, } @@ -56,14 +67,17 @@ impl RingBuffer { } } + /// Removes and returns the newest element from the buffer. pub fn pop_back(&mut self) -> Option { self.inner.pop_back() } + /// Returns a reference to the newest element in the buffer. pub fn back(&self) -> Option<&T> { self.inner.back() } + /// Clears all elements currently stored in the buffer. pub fn clear(&mut self) { self.inner.clear(); } diff --git a/src/error.rs b/src/error.rs index 67aef47f..26d524c0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,37 +8,61 @@ use thiserror::Error; use crate::{robust_provider::provider::Error as RobustProviderError, types::ScannerResult}; +/// Errors emitted by the scanner. +/// +/// `ScannerError` values can be returned by builder `connect()` methods and are also yielded by +/// subscription streams (as `Err(ScannerError)` items). +/// +/// All errors except [`ScannerError::Lagged`] are terminal and will halt further stream processing. #[derive(Error, Debug, Clone)] pub enum ScannerError { + /// The underlying RPC transport returned an error. #[error("RPC error: {0}")] RpcError(Arc>), + /// The internal service has shut down and can no longer process commands. #[error("Service is shutting down")] ServiceShutdown, + /// A requested block (by number, hash or tag) could not be retrieved. #[error("Block not found, Block Id: {0}")] BlockNotFound(BlockId), + /// A timeout elapsed while waiting for an RPC response. #[error("Operation timed out")] Timeout, + /// A configured block parameter exceeds the latest known block. #[error("{0} {1} exceeds the latest block {2}")] BlockExceedsLatest(&'static str, u64, u64), + /// The requested event count is invalid (must be greater than zero). #[error("Event count must be greater than 0")] InvalidEventCount, + /// The configured maximum block range is invalid (must be greater than zero). #[error("Max block range must be greater than 0")] InvalidMaxBlockRange, + /// The configured stream buffer capacity is invalid (must be greater than zero). #[error("Stream buffer capacity must be greater than 0")] InvalidBufferCapacity, + /// The configured maximum number of concurrent fetches is invalid (must be greater than + /// zero). #[error("Max concurrent fetches must be greater than 0")] InvalidMaxConcurrentFetches, + /// A block subscription ended (for example, the underlying WebSocket subscription closed). #[error("Subscription closed")] SubscriptionClosed, + + /// A subscription consumer could not keep up and some internal messages were skipped. + /// + /// The contained value is the number of skipped messages reported by the underlying channel. + /// After emitting this error, the subscription stream may continue with newer items. + #[error("Subscription lagged")] + Lagged(u64), } impl From for ScannerError { diff --git a/src/event_scanner/listener.rs b/src/event_scanner/listener.rs index 10562b74..c31298d3 100644 --- a/src/event_scanner/listener.rs +++ b/src/event_scanner/listener.rs @@ -1,7 +1,7 @@ use crate::event_scanner::{EventScannerResult, filter::EventFilter}; use tokio::sync::mpsc::Sender; -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct EventListener { pub filter: EventFilter, pub sender: Sender, diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index df61a21c..eb200a3e 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -5,7 +5,14 @@ use crate::{ types::{IntoScannerResult, ScannerResult}, }; +/// The item type yielded by event subscription streams. +/// +/// This is a [`ScannerMessage`] whose data payload is a batch of [`Log`] values. pub type Message = ScannerMessage>; + +/// The `Result` type yielded by event subscription streams. +/// +/// Successful items are [`Message`] values; failures are [`crate::ScannerError`]. pub type EventScannerResult = ScannerResult>; impl From> for Message { diff --git a/src/event_scanner/mod.rs b/src/event_scanner/mod.rs index 3522924c..68cab7b5 100644 --- a/src/event_scanner/mod.rs +++ b/src/event_scanner/mod.rs @@ -1,3 +1,13 @@ +//! High-level event scanner API. +//! +//! This module re-exports the primary types used for scanning EVM logs: +//! +//! - [`EventScanner`] and [`EventScannerBuilder`] for constructing and running scanners. +//! - [`EventFilter`] for defining which contract addresses and event signatures to match. +//! - [`Message`] / [`EventScannerResult`] for consuming subscription streams. +//! +//! Mode marker types (e.g. [`Live`], [`Historic`]) are also re-exported. + mod filter; mod listener; mod message; diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 3d94f089..a755d438 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -85,7 +85,9 @@ pub(crate) async fn handle_stream( .map_err(ScannerError::from) } Ok(ScannerMessage::Notification(notification)) => Ok(notification.into()), - // No need to stop the stream on an error, because that decision is up to - // the caller. + // No need to stop the stream on an error, because there will be no more + // values received from the range stream. Err(e) => Err(e), }) .buffered(max_concurrent_fetches); @@ -154,7 +156,9 @@ fn spawn_log_consumers_in_stream_mode( break; } Err(RecvError::Lagged(skipped)) => { - debug!(skipped_messages = skipped, "Channel lagged"); + tx.send(Err(ScannerError::Lagged(skipped))) + .await + .expect("receiver dropped only if we exit this loop"); } } } @@ -177,10 +181,8 @@ fn spawn_log_consumers_in_collection_mode( max_concurrent_fetches: usize, ) -> JoinSet<()> { listeners.iter().cloned().fold(JoinSet::new(), |mut set, listener| { - let EventListener { filter, sender } = listener; - let provider = provider.clone(); - let base_filter = Filter::from(&filter); + let base_filter = Filter::from(&listener.filter); let mut range_rx = range_tx.subscribe(); set.spawn(async move { @@ -196,14 +198,14 @@ fn spawn_log_consumers_in_collection_mode( let mut stream = ReceiverStream::new(rx) .map(async |message| match message { Ok(ScannerMessage::Data(range)) => { - get_logs(range, &filter, &base_filter, &provider) + get_logs(range, &listener.filter, &base_filter, &provider) .await .map(Message::from) .map_err(ScannerError::from) } Ok(ScannerMessage::Notification(notification)) => Ok(notification.into()), - // No need to stop the stream on an error, because that decision is up to - // the caller. + // No need to stop the stream on an error, because there will be no more + // values received from the range stream. Err(e) => Err(e), }) .buffered(max_concurrent_fetches); @@ -259,12 +261,12 @@ fn spawn_log_consumers_in_collection_mode( } Ok(ScannerMessage::Notification(notification)) => { debug!(notification = ?notification, "Received notification"); - if !sender.try_stream(notification).await { + if !listener.sender.try_stream(notification).await { return; } } Err(e) => { - if !sender.try_stream(e).await { + if !listener.sender.try_stream(e).await { return; } } @@ -273,7 +275,7 @@ fn spawn_log_consumers_in_collection_mode( if collected.is_empty() { debug!("No logs found"); - _ = sender.try_stream(Notification::NoPastLogsFound).await; + _ = listener.sender.try_stream(Notification::NoPastLogsFound).await; return; } @@ -281,7 +283,7 @@ fn spawn_log_consumers_in_collection_mode( collected.reverse(); // restore chronological order trace!("Sending collected logs to consumer"); - _ = sender.try_stream(collected).await; + _ = listener.sender.try_stream(collected).await; }); // Receive block ranges from the broadcast channel and send them to the range processor @@ -300,7 +302,9 @@ fn spawn_log_consumers_in_collection_mode( break; } Err(RecvError::Lagged(skipped)) => { - debug!(skipped_messages = skipped, "Channel lagged"); + tx.send(Err(ScannerError::Lagged(skipped))) + .await + .expect("receiver dropped only if we exit this loop"); } } } @@ -406,6 +410,14 @@ async fn get_logs( #[cfg(test)] mod tests { + use alloy::{ + network::Ethereum, + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + + use crate::robust_provider::RobustProviderBuilder; + use super::*; #[test] @@ -505,4 +517,58 @@ mod tests { assert_eq!(collected, vec![95, 90, 85]); } + + #[tokio::test] + async fn spawn_log_consumers_in_stream_mode_streams_lagged_error() -> anyhow::Result<()> { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let provider = RobustProviderBuilder::fragile(provider).build().await?; + + let (range_tx, _) = tokio::sync::broadcast::channel::(1); + let (sender, mut receiver) = mpsc::channel(1); + let listeners = &[EventListener { filter: EventFilter::new(), sender }]; + let max_concurrent_fetches = 1; + + let _set = spawn_log_consumers_in_stream_mode( + &provider, + listeners, + &range_tx, + max_concurrent_fetches, + ); + + range_tx.send(Ok(ScannerMessage::Data(0..=1)))?; + // the next range "overfills" the channel, causing a lag + range_tx.send(Ok(ScannerMessage::Data(2..=3)))?; + + assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1)))); + + Ok(()) + } + + #[tokio::test] + async fn spawn_log_consumers_in_collection_mode_streams_lagged_error() -> anyhow::Result<()> { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let provider = RobustProviderBuilder::fragile(provider).build().await?; + + let (range_tx, _) = tokio::sync::broadcast::channel::(1); + let (sender, mut receiver) = mpsc::channel(1); + let listeners = &[EventListener { filter: EventFilter::new(), sender }]; + let count = 5; + let max_concurrent_fetches = 1; + + let _set = spawn_log_consumers_in_collection_mode( + &provider, + listeners, + &range_tx, + count, + max_concurrent_fetches, + ); + + range_tx.send(Ok(ScannerMessage::Data(2..=3)))?; + // the next range "overfills" the channel, causing a lag + range_tx.send(Ok(ScannerMessage::Data(0..=1)))?; + + assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1)))); + + Ok(()) + } } diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index 972e550f..8430f8c9 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -124,19 +124,16 @@ impl EventScannerBuilder { } impl EventScanner { - /// Starts the scanner. + /// Starts the scanner in [`Historic`] mode. /// - /// # Important notes - /// - /// * Register event streams via [`scanner.subscribe(filter)`][subscribe] **before** calling - /// this function. - /// * The method returns immediately; events are delivered asynchronously. + /// See [`EventScanner`] for general startup notes. /// /// # Errors /// - /// Can error out if the service fails to start. - /// - /// [subscribe]: EventScanner::subscribe + /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved. pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?; diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index d6d024ac..304a1b1b 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -12,6 +12,17 @@ use crate::{ }; impl EventScannerBuilder { + /// Sets the number of confirmations required before a block is considered stable enough to + /// include when collecting the latest events. + /// + /// Higher values reduce the likelihood of emitting logs from blocks that are later reorged, + /// at the cost of potentially excluding very recent events. + #[must_use] + pub fn block_confirmations(mut self, confirmations: u64) -> Self { + self.config.block_confirmations = confirmations; + self + } + /// Sets the starting block for the historic scan. /// /// # Note @@ -124,19 +135,16 @@ impl EventScannerBuilder { } impl EventScanner { - /// Starts the scanner. - /// - /// # Important notes + /// Starts the scanner in [`LatestEvents`] mode. /// - /// * Register event streams via [`scanner.subscribe(filter)`][subscribe] **before** calling - /// this function. - /// * The method returns immediately; events are delivered asynchronously. + /// See [`EventScanner`] for general startup notes. /// /// # Errors /// - /// Can error out if the service fails to start. - /// - /// [subscribe]: EventScanner::subscribe + /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved. pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = client.rewind(self.config.from_block, self.config.to_block).await?; @@ -165,7 +173,8 @@ impl EventScanner { #[cfg(test)] mod tests { use crate::{ - DEFAULT_STREAM_BUFFER_CAPACITY, block_range_scanner::DEFAULT_MAX_BLOCK_RANGE, + DEFAULT_STREAM_BUFFER_CAPACITY, + block_range_scanner::{DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE}, event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, }; @@ -183,12 +192,14 @@ mod tests { fn test_latest_scanner_builder_pattern() { let builder = EventScannerBuilder::latest(3) .max_block_range(25) + .block_confirmations(5) .from_block(BlockNumberOrTag::Number(50)) .to_block(BlockNumberOrTag::Number(150)) .max_concurrent_fetches(10) .buffer_capacity(33); assert_eq!(builder.block_range_scanner.max_block_range, 25); + assert_eq!(builder.config.block_confirmations, 5); assert_eq!(builder.config.max_concurrent_fetches, 10); assert_eq!(builder.config.count, 3); assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(50).into()); @@ -204,6 +215,7 @@ mod tests { assert_eq!(builder.config.to_block, BlockNumberOrTag::Earliest.into()); assert_eq!(builder.config.count, 10); assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES); + assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE); assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY); } @@ -215,6 +227,8 @@ mod tests { .from_block(20) .to_block(100) .to_block(200) + .block_confirmations(5) + .block_confirmations(7) .max_block_range(50) .max_block_range(60) .max_concurrent_fetches(10) @@ -225,11 +239,25 @@ mod tests { assert_eq!(builder.config.count, 3); assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(20).into()); assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into()); + assert_eq!(builder.config.block_confirmations, 7); assert_eq!(builder.config.max_concurrent_fetches, 20); assert_eq!(builder.block_range_scanner.max_block_range, 60); assert_eq!(builder.block_range_scanner.buffer_capacity, 40); } + #[tokio::test] + async fn accepts_zero_confirmations() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let scanner = + EventScannerBuilder::latest(1).block_confirmations(0).connect(provider).await?; + + assert_eq!(scanner.config.block_confirmations, 0); + + Ok(()) + } + #[tokio::test] async fn test_latest_returns_error_with_zero_count() { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/scanner/live.rs index 4a971032..ac3a274e 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/scanner/live.rs @@ -8,6 +8,11 @@ use crate::{ }; impl EventScannerBuilder { + /// Sets the number of confirmations required before a block is considered stable enough to + /// scan in live mode. + /// + /// Higher values reduce the likelihood of emitting logs from blocks that are later reorged, + /// at the cost of increased event delivery latency. #[must_use] pub fn block_confirmations(mut self, confirmations: u64) -> Self { self.config.block_confirmations = confirmations; @@ -54,19 +59,15 @@ impl EventScannerBuilder { } impl EventScanner { - /// Starts the scanner. - /// - /// # Important notes + /// Starts the scanner in [`Live`] mode. /// - /// * Register event streams via [`scanner.subscribe(filter)`][subscribe] **before** calling - /// this function. - /// * The method returns immediately; events are delivered asynchronously. + /// See [`EventScanner`] for general startup notes. /// /// # Errors /// - /// Can error out if the service fails to start. - /// - /// [subscribe]: EventScanner::subscribe + /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = client.stream_live(self.config.block_confirmations).await?; diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 92dc3585..9eec42fa 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -1,3 +1,36 @@ +//! Scanner builders and mode marker types. +//! +//! This module defines [`EventScannerBuilder`] and the mode marker types used to configure an +//! [`EventScanner`]. Calling [`EventScannerBuilder::historic`], [`EventScannerBuilder::live`], +//! [`EventScannerBuilder::latest`] or [`EventScannerBuilder::sync`] selects a mode and exposes the +//! mode-specific configuration methods. +//! +//! # Streams +//! +//! Consumers register event subscriptions via [`EventScanner::subscribe`]. Each subscription +//! produces an independent stream of [`EventScannerResult`]. +//! +//! ## Ordering +//! +//! Ordering is preserved *per subscription stream*. There is no global ordering guarantee across +//! different subscriptions. +//! +//! ## Backpressure and lag +//! +//! Subscription streams are buffered. If a consumer processes events too slowly and the +//! internal buffer fills up, the stream yields [`ScannerError::Lagged`] and some events +//! may be skipped. +//! +//! # Reorgs and finality +//! +//! When scanning non-finalized blocks, the scanner may detect chain reorganizations and will emit +//! [`Notification::ReorgDetected`]. Consumers should assume the same events might be delivered more +//! than once around reorgs (i.e. benign duplicates are possible). +//! +//! In live mode, `block_confirmations` delays emission so that shallow reorganizations that do not +//! affect the confirmed boundary do not trigger reorg notifications. +//! +//! [`Notification::ReorgDetected`]: crate::Notification::ReorgDetected use alloy::{ eips::{BlockId, BlockNumberOrTag}, network::{Ethereum, Network}, @@ -24,34 +57,68 @@ mod sync; /// Default number of maximum concurrent fetches for each scanner mode. pub const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 24; -#[derive(Default)] +/// Marker indicating that a scanner mode has not been selected yet. +#[derive(Default, Debug)] pub struct Unspecified; + +/// Mode marker for historical range scanning. +/// +/// For more details on this scanner mode, see [`EventScannerBuilder::historic`]. +#[derive(Debug)] pub struct Historic { pub(crate) from_block: BlockId, pub(crate) to_block: BlockId, /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } + +/// Mode marker for live streaming. +/// +/// For more details on this scanner mode, see [`EventScannerBuilder::live`]. +#[derive(Debug)] pub struct Live { pub(crate) block_confirmations: u64, /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } + +/// Mode marker for latest-events collection. +/// +/// For more details on this scanner mode, see [`EventScannerBuilder::latest`]. +#[derive(Debug)] pub struct LatestEvents { pub(crate) count: usize, pub(crate) from_block: BlockId, pub(crate) to_block: BlockId, + pub(crate) block_confirmations: u64, /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } -#[derive(Default)] + +/// Marker indicating that a sync mode must be selected. +#[derive(Default, Debug)] pub struct Synchronize; + +/// Mode marker for scanning by syncing from the specified count of latest events and then switching +/// to live mode. +/// +/// For more details on this scanner mode, see +/// [`EventScannerBuilder::sync().from_latest(count)`](crate::EventScannerBuilder::from_latest). +#[derive(Debug)] pub struct SyncFromLatestEvents { pub(crate) count: usize, pub(crate) block_confirmations: u64, /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } + +/// Mode marker for scanning by syncing from the specified block and then switching to live mode. +/// +/// For more details on this scanner mode, see +/// [`EventScannerBuilder::sync().from_block(block_id)`][sync from block]. +/// +/// [sync from block]: crate::EventScannerBuilder#method.from_block-2 +#[derive(Debug)] pub struct SyncFromBlock { pub(crate) from_block: BlockId, pub(crate) block_confirmations: u64, @@ -78,15 +145,33 @@ impl Default for Live { } } -pub struct EventScanner { - config: M, +/// An event scanner configured in mode `Mode` and bound to network `N`. +/// +/// Create an instance via [`EventScannerBuilder`], register subscriptions with +/// [`EventScanner::subscribe`], then start the scanner with the mode-specific `start()` method. +/// +/// # Starting the scanner +/// +/// All scanner modes follow the same general startup pattern: +/// +/// - **Register subscriptions first**: call [`EventScanner::subscribe`] before starting the scanner +/// with `start()`. The scanner sends events only to subscriptions that have already been +/// registered. +/// - **Non-blocking start**: `start()` returns immediately after spawning background tasks. +/// Subscription streams yield events asynchronously. +/// - **Errors after startup**: most runtime failures are delivered through subscription streams as +/// [`ScannerError`] items, rather than being returned from `start()`. +#[derive(Debug)] +pub struct EventScanner { + config: Mode, block_range_scanner: ConnectedBlockRangeScanner, listeners: Vec, } -#[derive(Default)] -pub struct EventScannerBuilder { - pub(crate) config: M, +/// Builder for constructing an [`EventScanner`] in a particular mode. +#[derive(Default, Debug)] +pub struct EventScannerBuilder { + pub(crate) config: Mode, pub(crate) block_range_scanner: BlockRangeScanner, } @@ -378,6 +463,7 @@ impl EventScannerBuilder { count, from_block: BlockNumberOrTag::Latest.into(), to_block: BlockNumberOrTag::Earliest.into(), + block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, max_concurrent_fetches: DEFAULT_MAX_CONCURRENT_FETCHES, }, block_range_scanner: BlockRangeScanner::default(), @@ -413,7 +499,7 @@ impl EventScannerBuilder { } } -impl EventScannerBuilder { +impl EventScannerBuilder { /// Sets the maximum block range per event batch. /// /// Controls how the scanner splits a large block range into smaller batches for processing. @@ -473,18 +559,36 @@ impl EventScannerBuilder { async fn build( self, provider: impl IntoRobustProvider, - ) -> Result, ScannerError> { + ) -> Result, ScannerError> { let block_range_scanner = self.block_range_scanner.connect::(provider).await?; Ok(EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() }) } } -impl EventScanner { +impl EventScanner { + /// Returns the configured stream buffer capacity. #[must_use] pub fn buffer_capacity(&self) -> usize { self.block_range_scanner.buffer_capacity() } + /// Registers an event subscription and returns its stream. + /// + /// Each call creates a separate subscription stream with its own buffer. + /// + /// # Ordering + /// + /// Ordering is guaranteed only within a single returned stream. There is no ordering + /// guarantee across streams created by multiple calls to this method. + /// + /// # Errors + /// + /// The stream yields [`ScannerError`] values on failures. In particular, if a consumer cannot + /// keep up and internal buffers lag, the stream yields [`ScannerError::Lagged`]. + /// + /// # Notes + /// + /// For scanner to properly stream events, register all subscriptions before calling `start()`. #[must_use] pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { let (sender, receiver) = @@ -530,6 +634,7 @@ mod tests { assert_eq!(builder.config.from_block, BlockNumberOrTag::Latest.into()); assert_eq!(builder.config.to_block, BlockNumberOrTag::Earliest.into()); + assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY); } diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index 81e6b839..a91f2bc0 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -10,6 +10,11 @@ use crate::{ }; impl EventScannerBuilder { + /// Sets the number of confirmations required before a block is considered stable enough to + /// scan in the live phase. + /// + /// This affects the post-sync live streaming phase; higher values reduce reorg risk at the + /// cost of increased event delivery latency. #[must_use] pub fn block_confirmations(mut self, confirmations: u64) -> Self { self.config.block_confirmations = confirmations; @@ -61,19 +66,16 @@ impl EventScannerBuilder { } impl EventScanner { - /// Starts the scanner. - /// - /// # Important notes + /// Starts the scanner in [`SyncFromBlock`] mode. /// - /// * Register event streams via [`scanner.subscribe(filter)`][subscribe] **before** calling - /// this function. - /// * The method returns immediately; events are delivered asynchronously. + /// See [`EventScanner`] for general startup notes. /// /// # Errors /// - /// Can error out if the service fails to start. - /// - /// [subscribe]: EventScanner::subscribe + /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + /// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved. pub async fn start(self) -> Result<(), ScannerError> { let client = self.block_range_scanner.run()?; let stream = diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 5ce8f452..4a4141b0 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -14,6 +14,11 @@ use crate::{ }; impl EventScannerBuilder { + /// Sets the number of confirmations required before a block is considered stable enough to + /// scan in the live phase. + /// + /// This affects the post-sync live streaming phase; higher values reduce reorg risk at the + /// cost of increased event delivery latency. #[must_use] pub fn block_confirmations(mut self, confirmations: u64) -> Self { self.config.block_confirmations = confirmations; @@ -60,19 +65,15 @@ impl EventScannerBuilder { } impl EventScanner { - /// Starts the scanner. - /// - /// # Important notes + /// Starts the scanner in [`SyncFromLatestEvents`] mode. /// - /// * Register event streams via [`scanner.subscribe(filter)`][subscribe] **before** calling - /// this function. - /// * The method returns immediately; events are delivered asynchronously. + /// See [`EventScanner`] for general startup notes. /// /// # Errors /// - /// Can error out if the service fails to start. - /// - /// [subscribe]: EventScanner::subscribe + /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. #[allow(clippy::missing_panics_doc)] pub async fn start(self) -> Result<(), ScannerError> { let count = self.config.count; diff --git a/src/lib.rs b/src/lib.rs index c4faff57..54d585bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,53 @@ +//! Event-Scanner is a library made to stream EVM event logs. +//! +//! The main entry point is [`EventScanner`], built via [`EventScannerBuilder`] in one of the +//! supported modes (e.g. [`Historic`] or [`Live`]). +//! +//! After constructing a scanner, register one or more event subscriptions with +//! [`EventScanner::subscribe`], then call [`EventScanner::start`] to begin streaming. +//! +//! # Stream items +//! +//! Each subscription yields an [`EventScannerResult`]. Successful items are [`Message`] values, +//! which wrap either event batches or a [`Notification`] (see [`ScannerMessage`]). +//! +//! # Ordering +//! +//! Ordering is preserved *per subscription stream*. There is no global ordering guarantee across +//! different subscriptions. +//! +//! # Reorgs and finality +//! +//! When scanning non-finalized blocks, the scanner may detect chain reorganizations and will emit +//! [`Notification::ReorgDetected`]. Consumers should assume the same events might be delivered more +//! than once around reorgs (i.e. benign duplicates are possible). +//! +//! [`BlockNumberOrTag::Finalized`][finalized] is treated as the authoritative finality boundary +//! when the scanner needs one. In live mode, `block_confirmations` delays emission to reduce the +//! chance that already-emitted blocks are affected by shallow reorganizations. +//! +//! # Dedupe vs rollback +//! +//! Event-Scanner does not include a built-in deduplication utility. Depending on your +//! application, you can: +//! +//! - **Implement idempotency/deduplication** (for example, keyed by transaction hash and log index, +//! optionally including block hash). +//! - **Handle reorgs by rollback**: interpret [`Notification::ReorgDetected`] as a signal to revert +//! application state for blocks after the reported common ancestor. +//! +//! # Backpressure and lag +//! +//! Streams are buffered. If a consumer cannot keep up and an internal broadcast receiver lags, +//! the subscription stream yields [`ScannerError::Lagged`]. +//! +//! # Robust providers +//! +//! The [`robust_provider`] module provides [`robust_provider::RobustProvider`], a wrapper that can +//! retry and fail over across multiple RPC endpoints. +//! +//! [finalized]: alloy::eips::BlockNumberOrTag::Finalized + #[macro_use] mod logging; diff --git a/src/robust_provider/builder.rs b/src/robust_provider/builder.rs index ac4cc7ca..2263489f 100644 --- a/src/robust_provider/builder.rs +++ b/src/robust_provider/builder.rs @@ -20,6 +20,9 @@ pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1); /// Default subscription channel size. pub const DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY: usize = 128; +/// Builder for constructing a [`RobustProvider`]. +/// +/// Use this to configure timeouts, retry/backoff, and one or more fallback providers. pub struct RobustProviderBuilder> { primary_provider: P, fallback_providers: Vec>, diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index 87e5ab6f..269bfaf0 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -109,7 +109,11 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). + /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block_by_number( &self, number: BlockNumberOrTag, @@ -134,7 +138,11 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). + /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block(&self, id: BlockId) -> Result { info!("eth_getBlock called"); let result = self @@ -155,7 +163,10 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); let result = self @@ -181,7 +192,11 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). + /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block_number_by_id(&self, block_id: BlockId) -> Result { info!("get_block_number_by_id called"); let result = self @@ -208,7 +223,10 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). pub async fn get_latest_confirmed(&self, confirmations: u64) -> Result { info!(configurations = confirmations, "get_latest_confirmed called"); let latest_block = self.get_block_number().await?; @@ -222,7 +240,11 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). + /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block_by_hash(&self, hash: BlockHash) -> Result { info!("eth_getBlockByHash called"); let result = self @@ -244,7 +266,10 @@ impl RobustProvider { /// /// # Errors /// - /// See [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). pub async fn get_logs(&self, filter: &Filter) -> Result, Error> { info!("eth_getLogs called"); let result = self @@ -271,7 +296,10 @@ impl RobustProvider { /// /// # Errors /// - /// see [retry errors](#retry-errors). + /// * [`Error::RpcError`] - if no fallback providers succeeded; contains the last error returned + /// by the last provider attempted on the last retry. + /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); let subscription = self @@ -297,7 +325,7 @@ impl RobustProvider { /// Execute `operation` with exponential backoff and a total timeout. /// - /// Wraps the retry logic with `tokio::time::timeout(self.call_timeout, ...)` so + /// Wraps the retry logic with [`tokio::time::timeout`] so /// the entire operation (including time spent inside the RPC call) cannot exceed /// `call_timeout`. /// @@ -307,14 +335,11 @@ impl RobustProvider { /// If `require_pubsub` is true, providers that don't support pubsub will be skipped. /// /// # Errors - /// - /// - /// * Returns [`RpcError`] with message "total operation timeout exceeded - /// and all fallback providers failed" if the overall timeout elapses and no fallback - /// providers succeed. - /// * Returns [`RpcError::Transport(TransportErrorKind::PubsubUnavailable)`] if `require_pubsub` - /// is true and all providers don't support pubsub. - /// * Propagates any [`RpcError`] from the underlying retries. + /// + /// * [`CoreError::RpcError`] - if no fallback providers succeeded; contains the last error + /// returned by the last provider attempted on the last retry. + /// * [`CoreError::Timeout`] - if the overall operation timeout elapses (i.e. exceeds + /// `call_timeout`). pub async fn try_operation_with_failover( &self, operation: F, diff --git a/src/robust_provider/provider_conversion.rs b/src/robust_provider/provider_conversion.rs index 76d3dc95..fee2d9c8 100644 --- a/src/robust_provider/provider_conversion.rs +++ b/src/robust_provider/provider_conversion.rs @@ -10,7 +10,16 @@ use alloy::{ use crate::robust_provider::{RobustProvider, RobustProviderBuilder, provider::Error}; +/// Conversion trait for types that can be turned into an Alloy [`RootProvider`]. +/// +/// This is primarily used by [`RobustProviderBuilder`] to accept different provider types and +/// connection strings. pub trait IntoRootProvider { + /// Convert `self` into a [`RootProvider`]. + /// + /// # Errors + /// + /// Returns an error if the underlying provider cannot be constructed or connected. fn into_root_provider(self) -> impl Future, Error>> + Send; } @@ -78,7 +87,13 @@ where } } +/// Conversion trait for types that can be turned into a [`RobustProvider`]. pub trait IntoRobustProvider { + /// Convert `self` into a [`RobustProvider`]. + /// + /// # Errors + /// + /// Returns an error if the primary or any fallback provider fails to connect. fn into_robust_provider(self) -> impl Future, Error>> + Send; } diff --git a/src/types.rs b/src/types.rs index bad6b438..992f458b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -44,12 +44,13 @@ pub enum Notification { /// /// **How to handle**: This is a benign race condition. Your application should be designed to /// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as - /// deduplication keys). The scanner prioritizes correctness by ensuring all logs from the - /// canonical chain are delivered, even if it means occasional duplicates during reorgs. - /// - /// The `common_ancestor` field contains the block number of the last block - /// that is still valid on the canonical chain. - ReorgDetected { common_ancestor: u64 }, + /// deduplication keys). Depending on your application semantics, you may also treat this + /// notification as a signal to roll back application state derived from blocks after the + /// reported common ancestor. + ReorgDetected { + /// The block number of the last block that is still valid on the canonical chain. + common_ancestor: u64, + }, /// Emitted during the latest events phase when no matching logs are found in the /// scanned range. @@ -72,8 +73,12 @@ impl PartialEq for ScannerMessage { } } +/// A convenience `Result` type for scanner streams. +/// +/// Successful items are [`ScannerMessage`] values; failures are [`ScannerError`]. pub type ScannerResult = Result, ScannerError>; +/// Conversion helper for streaming either data, notifications, or errors. pub trait IntoScannerResult { fn into_scanner_message_result(self) -> ScannerResult; } @@ -102,6 +107,7 @@ impl IntoScannerResult for Notification { } } +/// Internal helper for attempting to forward a stream item through an `mpsc` channel. pub(crate) trait TryStream { async fn try_stream>(&self, msg: M) -> bool; }