diff --git a/src/event_scanner/scanner/block_range_handler.rs b/src/event_scanner/block_range_handler.rs similarity index 98% rename from src/event_scanner/scanner/block_range_handler.rs rename to src/event_scanner/block_range_handler.rs index 9a44c02b..52405e7c 100644 --- a/src/event_scanner/scanner/block_range_handler.rs +++ b/src/event_scanner/block_range_handler.rs @@ -44,6 +44,11 @@ pub trait BlockRangeHandler { /// This handler fetches logs per listener and forwards each non-empty result immediately. It is /// used by scanner modes that operate as continuous streams (e.g. historic/live scanning), where /// incremental delivery is preferred over collecting a fixed-size window. +/// +/// # Concurrency +/// +/// The `max_concurrent_fetches` limit applies **per listener**, not globally. With N listeners +/// and a limit of M, up to N × M concurrent RPC requests may be in-flight simultaneously. #[derive(Debug)] pub struct StreamHandler { provider: RobustProvider, @@ -178,6 +183,11 @@ impl BlockRangeHandler for StreamHandler { /// /// During reorg recovery it prepends newly fetched logs so that the newest-first buffer remains /// correctly ordered. +/// +/// # Concurrency +/// +/// The `max_concurrent_fetches` limit applies **per listener**, not globally. With N listeners +/// and a limit of M, up to N × M concurrent RPC requests may be in-flight simultaneously. #[derive(Debug)] pub struct LatestEventsHandler { provider: RobustProvider, diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/builder.rs similarity index 67% rename from src/event_scanner/scanner/mod.rs rename to src/event_scanner/builder.rs index 111c6503..e75a1762 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/builder.rs @@ -1,60 +1,19 @@ -//! Scanner builders and mode marker types. +//! Builder pattern for constructing [`EventScanner`] instances. //! -//! This module defines [`EventScannerBuilder`] and the mode marker types used to configure an -//! [`EventScanner`]. Calling [`EventScannerBuilder::historic`], [`EventScannerBuilder::live`], -//! [`EventScannerBuilder::latest`] or [`EventScannerBuilder::sync`] selects a mode and exposes the -//! mode-specific configuration methods. -//! -//! # Streams -//! -//! Consumers register event subscriptions via [`EventScanner::subscribe`]. Each subscription -//! produces an independent stream of [`EventScannerResult`]. -//! -//! ## Ordering -//! -//! Ordering is preserved *per subscription stream*. There is no global ordering guarantee across -//! different subscriptions. -//! -//! ## Backpressure and lag -//! -//! Subscription streams are buffered. If a consumer processes events too slowly and the -//! internal buffer fills up, the stream yields [`ScannerError::Lagged`] and some events -//! may be skipped. -//! -//! # Reorgs and finality -//! -//! When scanning non-finalized blocks, the scanner may detect chain reorganizations and will emit -//! [`Notification::ReorgDetected`]. Consumers should assume the same events might be delivered more -//! than once around reorgs (i.e. benign duplicates are possible). -//! -//! In live mode, `block_confirmations` delays emission so that shallow reorganizations that do not -//! affect the confirmed boundary do not trigger reorg notifications. -//! -//! [`Notification::ReorgDetected`]: crate::Notification::ReorgDetected +//! This module provides [`EventScannerBuilder`] which allows configuring an event scanner +//! in different modes (historic, live, latest, or sync) with various options before connecting +//! to a provider. + use alloy::{ eips::{BlockId, BlockNumberOrTag}, - network::{Ethereum, Network}, + network::Network, }; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use crate::{ - block_range_scanner::{ - BlockRangeScanner, BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, - RingBufferCapacity, - }, - error::ScannerError, - event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener}, - robust_provider::IntoRobustProvider, + BlockRangeScannerBuilder, DEFAULT_BLOCK_CONFIRMATIONS, EventScanner, RingBufferCapacity, + ScannerError, robust_provider::IntoRobustProvider, }; -pub mod block_range_handler; - -mod historic; -mod latest; -mod live; -mod sync; - /// Default number of maximum concurrent fetches for each scanner mode. pub const DEFAULT_MAX_CONCURRENT_FETCHES: usize = 24; @@ -69,7 +28,6 @@ pub struct Unspecified; pub struct Historic { pub(crate) from_block: BlockId, pub(crate) to_block: BlockId, - /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } @@ -79,7 +37,6 @@ pub struct Historic { #[derive(Debug)] pub struct Live { pub(crate) block_confirmations: u64, - /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } @@ -92,7 +49,6 @@ pub struct LatestEvents { pub(crate) from_block: BlockId, pub(crate) to_block: BlockId, pub(crate) block_confirmations: u64, - /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } @@ -109,7 +65,6 @@ pub struct Synchronize; pub struct SyncFromLatestEvents { pub(crate) count: usize, pub(crate) block_confirmations: u64, - /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } @@ -123,7 +78,6 @@ pub struct SyncFromLatestEvents { pub struct SyncFromBlock { pub(crate) from_block: BlockId, pub(crate) block_confirmations: u64, - /// Controls how many log-fetching RPC requests can run in parallel during the scan. pub(crate) max_concurrent_fetches: usize, } @@ -146,29 +100,6 @@ impl Default for Live { } } -/// An event scanner configured in mode `Mode` and bound to network `N`. -/// -/// Create an instance via [`EventScannerBuilder`], register subscriptions with -/// [`EventScanner::subscribe`], then start the scanner with the mode-specific `start()` method. -/// -/// # Starting the scanner -/// -/// All scanner modes follow the same general startup pattern: -/// -/// - **Register subscriptions first**: call [`EventScanner::subscribe`] before starting the scanner -/// with `start()`. The scanner sends events only to subscriptions that have already been -/// registered. -/// - **Non-blocking start**: `start()` returns immediately after spawning background tasks. -/// Subscription streams yield events asynchronously. -/// - **Errors after startup**: most runtime failures are delivered through subscription streams as -/// [`ScannerError`] items, rather than being returned from `start()`. -#[derive(Debug)] -pub struct EventScanner { - config: Mode, - block_range_scanner: BlockRangeScanner, - listeners: Vec, -} - /// Builder for constructing an [`EventScanner`] in a particular mode. #[derive(Default, Debug)] pub struct EventScannerBuilder { @@ -188,10 +119,10 @@ impl EventScannerBuilder { /// # /// # async fn example() -> Result<(), Box> { /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); + /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// # let provider = RobustProviderBuilder::new(provider).build().await?; /// // Stream all events from genesis to latest block - /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; - /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; - /// let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?; + /// let mut scanner = EventScannerBuilder::historic().connect(provider).await?; /// /// let filter = EventFilter::new().contract_address(contract_address); /// let subscription = scanner.subscribe(filter); @@ -212,13 +143,13 @@ impl EventScannerBuilder { /// # use event_scanner::{EventScannerBuilder, robust_provider::RobustProviderBuilder}; /// # /// # async fn example() -> Result<(), Box> { + /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// # let provider = RobustProviderBuilder::new(provider).build().await?; /// // Stream events between blocks [1_000_000, 2_000_000] - /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; - /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::historic() /// .from_block(1_000_000) /// .to_block(2_000_000) - /// .connect(robust_provider) + /// .connect(provider) /// .await?; /// # Ok(()) /// # } @@ -262,12 +193,12 @@ impl EventScannerBuilder { /// # /// # async fn example() -> Result<(), Box> { /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); + /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// # let provider = RobustProviderBuilder::new(provider).build().await?; /// // Stream new events as they arrive - /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; - /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::live() /// .block_confirmations(20) - /// .connect(robust_provider) + /// .connect(provider) /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); @@ -355,10 +286,10 @@ impl EventScannerBuilder { /// # /// # async fn example() -> Result<(), Box> { /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); + /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// # let provider = RobustProviderBuilder::new(provider).build().await?; /// // Collect the latest 10 events across Earliest..=Latest - /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; - /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; - /// let mut scanner = EventScannerBuilder::latest(10).connect(robust_provider).await?; + /// let mut scanner = EventScannerBuilder::latest(10).connect(provider).await?; /// /// let filter = EventFilter::new().contract_address(contract_address); /// let subscription = scanner.subscribe(filter); @@ -380,13 +311,13 @@ impl EventScannerBuilder { /// # use event_scanner::{EventScannerBuilder, robust_provider::RobustProviderBuilder}; /// # /// # async fn example() -> Result<(), Box> { + /// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// # let provider = RobustProviderBuilder::new(provider).build().await?; /// // Collect the latest 5 events between blocks [1_000_000, 1_100_000] - /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; - /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::latest(5) /// .from_block(1_000_000) /// .to_block(1_100_000) - /// .connect(robust_provider) + /// .connect(provider) /// .await?; /// # Ok(()) /// # } @@ -562,139 +493,17 @@ impl EventScannerBuilder { /// Builds the scanner by connecting to an existing provider. /// /// This is a shared method used internally by scanner-specific `connect()` methods. - async fn build( + pub(crate) async fn build( self, provider: impl IntoRobustProvider, ) -> Result, ScannerError> { let block_range_scanner = self.block_range_scanner.connect::(provider).await?; - Ok(EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() }) - } -} - -/// A subscription to scanner events that requires proof the scanner has started. -/// -/// Created by [`EventScanner::subscribe()`](crate::EventScanner::subscribe), this type holds the -/// underlying stream but prevents access until [`stream()`](EventSubscription::stream) is called -/// with a valid [`StartProof`]. -/// -/// This pattern ensures at compile time that [`EventScanner::start()`](crate::EventScanner::start) -/// is called before attempting to read from the event stream. -/// -/// # Example -/// -/// ```ignore -/// let mut scanner = EventScannerBuilder::live().connect(provider).await?; -/// -/// // Create subscription (cannot access stream yet) -/// let subscription = scanner.subscribe(filter); -/// -/// // Start scanner and get proof -/// let proof = scanner.start().await?; -/// -/// // Now access the stream with the proof -/// let mut stream = subscription.stream(&proof); -/// -/// while let Some(msg) = stream.next().await { -/// // process events -/// } -/// ``` -pub struct EventSubscription { - inner: ReceiverStream, -} - -impl EventSubscription { - /// Creates a new subscription wrapping the given stream. - pub(crate) fn new(inner: ReceiverStream) -> Self { - Self { inner } - } - - /// Access the event stream. - /// - /// Requires a reference to a [`StartProof`] as proof that the scanner - /// has been started. The proof is obtained by calling - /// `EventScanner::start()`. - /// - /// # Arguments - /// - /// * `_proof` - Proof that the scanner has been started - #[must_use] - pub fn stream(self, _proof: &StartProof) -> ReceiverStream { - self.inner - } -} - -impl EventScanner { - /// Returns the configured stream buffer capacity. - #[must_use] - pub fn buffer_capacity(&self) -> usize { - self.block_range_scanner.buffer_capacity() - } - - /// Registers an event subscription and returns its stream. - /// - /// Each call creates a separate subscription stream with its own buffer. - /// - /// # Ordering - /// - /// Ordering is guaranteed only within a single returned stream. There is no ordering - /// guarantee across streams created by multiple calls to this method. - /// - /// # Errors - /// - /// The stream yields [`ScannerError`] values on failures. In particular, if a consumer cannot - /// keep up and internal buffers lag, the stream yields [`ScannerError::Lagged`]. - /// - /// # Notes - /// - /// For scanner to properly stream events, register all subscriptions before calling `start()`. - #[must_use] - pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription { - let (sender, receiver) = - mpsc::channel::(self.block_range_scanner.buffer_capacity()); - self.listeners.push(EventListener { filter, sender }); - EventSubscription::new(ReceiverStream::new(receiver)) - } -} - -/// Proof that the scanner has been started. -/// -/// This proof is returned by `EventScanner::start()` and must be passed to -/// [`EventSubscription::stream()`] to access the event stream. This ensures at compile -/// time that the scanner is started before attempting to read events. -/// -/// # Example -/// -/// ```ignore -/// let mut scanner = EventScannerBuilder::sync().from_block(0).connect(provider).await?; -/// let subscription = scanner.subscribe(filter); -/// -/// // Start the scanner and get the proof -/// let proof = scanner.start().await?; -/// -/// // Now we can access the stream -/// let mut stream = subscription.stream(&proof); -/// ``` -#[derive(Debug, Clone)] -pub struct StartProof { - /// Private field prevents construction outside this crate - _private: (), -} - -impl StartProof { - /// Creates a new start proof. - #[must_use] - pub(crate) fn new() -> Self { - Self { _private: () } + Ok(EventScanner::new(self.config, block_range_scanner)) } } #[cfg(test)] mod tests { - use alloy::{ - providers::{RootProvider, mock::Asserter}, - rpc::client::RpcClient, - }; - use crate::block_range_scanner::DEFAULT_STREAM_BUFFER_CAPACITY; use super::*; @@ -736,46 +545,4 @@ mod tests { assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY); } - - #[tokio::test] - async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> { - let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = EventScannerBuilder::historic().build(provider).await?; - - assert!(scanner.listeners.is_empty()); - - let _stream1 = scanner.subscribe(EventFilter::new()); - assert_eq!(scanner.listeners.len(), 1); - - let _stream2 = scanner.subscribe(EventFilter::new()); - let _stream3 = scanner.subscribe(EventFilter::new()); - assert_eq!(scanner.listeners.len(), 3); - - Ok(()) - } - - #[tokio::test] - async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> { - let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = EventScannerBuilder::historic().build(provider.clone()).await?; - - let _ = scanner.subscribe(EventFilter::new()); - let sender = &scanner.listeners[0].sender; - assert_eq!(sender.capacity(), scanner.block_range_scanner.buffer_capacity()); - - let custom_capacity = 1000; - - let mut scanner = EventScannerBuilder::historic() - .buffer_capacity(custom_capacity) - .build(provider) - .await?; - - assert_eq!(scanner.block_range_scanner.buffer_capacity(), custom_capacity); - - let _ = scanner.subscribe(EventFilter::new()); - let sender = &scanner.listeners[0].sender; - assert_eq!(sender.capacity(), custom_capacity); - - Ok(()) - } } diff --git a/src/event_scanner/filter.rs b/src/event_scanner/filter.rs index 31ea8778..cf3482fd 100644 --- a/src/event_scanner/filter.rs +++ b/src/event_scanner/filter.rs @@ -1,3 +1,8 @@ +//! Event filtering for blockchain log queries. +//! +//! This module provides [`EventFilter`] which allows specifying contract addresses and event +//! signatures to filter logs when scanning the blockchain. + use std::fmt::{Debug, Display}; use alloy::{ diff --git a/src/event_scanner/listener.rs b/src/event_scanner/listener.rs index 700f6908..1b7d4b5c 100644 --- a/src/event_scanner/listener.rs +++ b/src/event_scanner/listener.rs @@ -1,3 +1,8 @@ +//! Internal event listener for distributing scanned logs to subscribers. +//! +//! This module defines [`EventListener`] which pairs an event filter with a channel sender +//! to deliver matching logs to subscription streams. + use crate::event_scanner::{EventScannerResult, filter::EventFilter}; use tokio::sync::mpsc::Sender; diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index eb200a3e..37aac353 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,3 +1,8 @@ +//! Types yielded by event subscription streams. +//! +//! Defines [`Message`] and [`EventScannerResult`], along with conversions and comparisons +//! for working with logs. + use alloy::{rpc::types::Log, sol_types::SolEvent}; use crate::{ diff --git a/src/event_scanner/mod.rs b/src/event_scanner/mod.rs index deaa51f8..87b652af 100644 --- a/src/event_scanner/mod.rs +++ b/src/event_scanner/mod.rs @@ -1,21 +1,50 @@ -//! High-level event scanner API. +//! Event scanner, its builders, and scanner mode marker types. //! -//! This module re-exports the primary types used for scanning EVM logs: +//! This module defines [`EventScannerBuilder`] and the mode marker types used to configure an +//! [`EventScanner`]. Calling [`EventScannerBuilder::historic`], [`EventScannerBuilder::live`], +//! [`EventScannerBuilder::latest`] or [`EventScannerBuilder::sync`] selects a mode and exposes the +//! mode-specific configuration methods. //! -//! - [`EventScanner`] and [`EventScannerBuilder`] for constructing and running scanners. -//! - [`EventFilter`] for defining which contract addresses and event signatures to match. -//! - [`Message`] / [`EventScannerResult`] for consuming subscription streams. +//! # Streams //! -//! Mode marker types (e.g. [`Live`], [`Historic`]) are also re-exported. +//! Consumers register event subscriptions via [`EventScanner::subscribe`]. Each subscription +//! produces an independent stream of [`EventScannerResult`]. +//! +//! ## Ordering +//! +//! Ordering is preserved *per subscription stream*. There is no global ordering guarantee across +//! different subscriptions. +//! +//! ## Backpressure and lag +//! +//! Subscription streams are buffered. If a consumer processes events too slowly and the +//! internal buffer fills up, the stream yields [`ScannerError::Lagged`] and some events +//! may be skipped. +//! +//! # Reorgs and finality +//! +//! When scanning non-finalized blocks, the scanner may detect chain reorganizations and will emit +//! [`Notification::ReorgDetected`]. Consumers should assume the same events might be delivered more +//! than once around reorgs (i.e. benign duplicates are possible). +//! +//! In live mode, `block_confirmations` delays emission so that shallow reorganizations that do not +//! affect the confirmed boundary do not trigger reorg notifications. +//! +//! [`Notification::ReorgDetected`]: crate::Notification::ReorgDetected + +pub mod block_range_handler; +mod builder; mod filter; mod listener; mod message; +mod modes; mod scanner; +pub use builder::{ + DEFAULT_MAX_CONCURRENT_FETCHES, EventScannerBuilder, Historic, LatestEvents, Live, + SyncFromBlock, SyncFromLatestEvents, Unspecified, +}; pub use filter::EventFilter; pub use message::{EventScannerResult, Message}; -pub use scanner::{ - DEFAULT_MAX_CONCURRENT_FETCHES, EventScanner, EventScannerBuilder, EventSubscription, Historic, - LatestEvents, Live, StartProof, SyncFromBlock, SyncFromLatestEvents, block_range_handler, -}; +pub use scanner::{EventScanner, EventSubscription, StartProof}; diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/modes/historic.rs similarity index 96% rename from src/event_scanner/scanner/historic.rs rename to src/event_scanner/modes/historic.rs index 73c82a4b..4da93252 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/modes/historic.rs @@ -1,15 +1,21 @@ +//! Scans events from a fixed block range. +//! +//! Streams events from a specified block range in chronological order. +//! See [`EventScannerBuilder::historic`] for usage details. + use alloy::{ consensus::BlockHeader, eips::BlockId, network::{BlockResponse, Network}, }; -use super::block_range_handler::StreamHandler; use crate::{ - EventScannerBuilder, ScannerError, + ScannerError, event_scanner::{ StartProof, - scanner::{EventScanner, Historic, block_range_handler::BlockRangeHandler}, + block_range_handler::{BlockRangeHandler, StreamHandler}, + builder::{EventScannerBuilder, Historic}, + scanner::EventScanner, }, robust_provider::IntoRobustProvider, }; @@ -45,11 +51,14 @@ impl EventScannerBuilder { /// Increasing this value can improve throughput by issuing multiple RPC /// requests concurrently, at the cost of higher load on the provider. /// + /// **Note**: This limit applies **per listener**. With N listeners and a limit of M, + /// up to N × M concurrent RPC requests may be in-flight simultaneously. + /// /// Must be greater than 0. /// /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default]. /// - /// [default]: crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES + /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES #[must_use] pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self { self.config.max_concurrent_fetches = max_concurrent_fetches; @@ -170,7 +179,7 @@ impl EventScanner { mod tests { use crate::{ block_range_scanner::{DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY}, - event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, + event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES, }; use super::*; diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/modes/latest.rs similarity index 96% rename from src/event_scanner/scanner/latest.rs rename to src/event_scanner/modes/latest.rs index b3720f41..5aeea742 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/modes/latest.rs @@ -1,3 +1,8 @@ +//! Collects the most recent matching events. +//! +//! Performs a reverse scan to collect a specified number of the most recent matching events. +//! See [`EventScannerBuilder::latest`] for usage details. + use alloy::{ consensus::BlockHeader, eips::BlockId, @@ -5,10 +10,11 @@ use alloy::{ }; use crate::{ - EventScannerBuilder, ScannerError, + ScannerError, event_scanner::{ - EventScanner, LatestEvents, StartProof, - scanner::block_range_handler::{BlockRangeHandler, LatestEventsHandler}, + EventScanner, StartProof, + block_range_handler::{BlockRangeHandler, LatestEventsHandler}, + builder::{EventScannerBuilder, LatestEvents}, }, robust_provider::IntoRobustProvider, }; @@ -55,11 +61,14 @@ impl EventScannerBuilder { /// Higher values can increase throughput by issuing multiple RPC requests /// concurrently, at the expense of more load on the provider. /// + /// **Note**: This limit applies **per listener**. With N listeners and a limit of M, + /// up to N × M concurrent RPC requests may be in-flight simultaneously. + /// /// Must be greater than 0. /// /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default]. /// - /// [default]: crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES + /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES #[must_use] pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self { self.config.max_concurrent_fetches = max_concurrent_fetches; @@ -184,7 +193,7 @@ mod tests { block_range_scanner::{ DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, }, - event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, + event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES, }; use super::*; diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/modes/live.rs similarity index 91% rename from src/event_scanner/scanner/live.rs rename to src/event_scanner/modes/live.rs index e2571c96..9af65c06 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/modes/live.rs @@ -1,13 +1,16 @@ +//! Streams events from newly produced blocks in real-time. +//! +//! Continuously monitors the blockchain and yields events as new blocks are confirmed. +//! See [`EventScannerBuilder::live`] for usage details. + use alloy::network::Network; use crate::{ - EventScannerBuilder, ScannerError, + ScannerError, event_scanner::{ EventScanner, StartProof, - scanner::{ - Live, - block_range_handler::{BlockRangeHandler, StreamHandler}, - }, + block_range_handler::{BlockRangeHandler, StreamHandler}, + builder::{EventScannerBuilder, Live}, }, robust_provider::IntoRobustProvider, }; @@ -33,11 +36,14 @@ impl EventScannerBuilder { /// processing multiple block ranges in parallel. Increasing the value /// improves throughput at the expense of higher load on the provider. /// + /// **Note**: This limit applies **per listener**. With N listeners and a limit of M, + /// up to N × M concurrent RPC requests may be in-flight simultaneously. + /// /// Must be greater than 0. /// /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default]. /// - /// [default]: crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES + /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES #[must_use] pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self { self.config.max_concurrent_fetches = max_concurrent_fetches; @@ -110,7 +116,7 @@ mod tests { block_range_scanner::{ DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, }, - event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, + event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES, }; use super::*; diff --git a/src/event_scanner/modes/mod.rs b/src/event_scanner/modes/mod.rs new file mode 100644 index 00000000..4fe9483e --- /dev/null +++ b/src/event_scanner/modes/mod.rs @@ -0,0 +1,9 @@ +//! Scanner mode implementations. +//! +//! This module contains the implementation details for each scanner mode (historic, live, latest, +//! and sync). + +mod historic; +mod latest; +mod live; +mod sync; diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/modes/sync/from_block.rs similarity index 91% rename from src/event_scanner/scanner/sync/from_block.rs rename to src/event_scanner/modes/sync/from_block.rs index d594f06f..eb2c9eff 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/modes/sync/from_block.rs @@ -1,10 +1,19 @@ +//! Syncs from a starting block, then transitions to live streaming. +//! +//! Streams events from a specified starting block to the present, then automatically +//! continues with live streaming. See [`EventScannerBuilder::sync().from_block()`][from_block] +//! for usage details. +//! +//! [from_block]: crate::EventScannerBuilder#method.from_block-2 + use alloy::{eips::BlockId, network::Network}; use crate::{ - EventScannerBuilder, ScannerError, + ScannerError, event_scanner::{ - EventScanner, StartProof, SyncFromBlock, - scanner::block_range_handler::{BlockRangeHandler, StreamHandler}, + EventScanner, StartProof, + block_range_handler::{BlockRangeHandler, StreamHandler}, + builder::{EventScannerBuilder, SyncFromBlock}, }, robust_provider::IntoRobustProvider, }; @@ -27,11 +36,14 @@ impl EventScannerBuilder { /// Higher values can improve throughput by issuing multiple RPC requests /// concurrently, at the cost of additional load on the provider. /// + /// **Note**: This limit applies **per listener**. With N listeners and a limit of M, + /// up to N × M concurrent RPC requests may be in-flight simultaneously. + /// /// Must be greater than 0. /// /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default]. /// - /// [default]: crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES + /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES #[must_use] pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self { self.config.max_concurrent_fetches = max_concurrent_fetches; @@ -120,7 +132,7 @@ mod tests { block_range_scanner::{ DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, }, - event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, + event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES, }; use super::*; diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/modes/sync/from_latest.rs similarity index 92% rename from src/event_scanner/scanner/sync/from_latest.rs rename to src/event_scanner/modes/sync/from_latest.rs index 24ffc74f..95f7054c 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/modes/sync/from_latest.rs @@ -1,13 +1,19 @@ +//! Collects recent events, then transitions to live streaming. +//! +//! Collects a specified number of the most recent events, then automatically continues +//! with live streaming. See [`EventScannerBuilder::sync().from_latest()`][from_latest] +//! for usage details. +//! +//! [from_latest]: crate::EventScannerBuilder::from_latest + use alloy::{eips::BlockNumberOrTag, network::Network}; use crate::{ - EventScannerBuilder, ScannerError, + ScannerError, event_scanner::{ EventScanner, StartProof, - scanner::{ - SyncFromLatestEvents, - block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler}, - }, + block_range_handler::{BlockRangeHandler, LatestEventsHandler, StreamHandler}, + builder::{EventScannerBuilder, SyncFromLatestEvents}, }, robust_provider::IntoRobustProvider, types::TryStream, @@ -31,11 +37,14 @@ impl EventScannerBuilder { /// Increasing this value can improve catch-up throughput by issuing multiple /// RPC requests concurrently, at the cost of additional load on the provider. /// + /// **Note**: This limit applies **per listener**. With N listeners and a limit of M, + /// up to N × M concurrent RPC requests may be in-flight simultaneously. + /// /// Must be greater than 0. /// /// Defaults to [`DEFAULT_MAX_CONCURRENT_FETCHES`][default]. /// - /// [default]: crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES + /// [default]: crate::event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES #[must_use] pub fn max_concurrent_fetches(mut self, max_concurrent_fetches: usize) -> Self { self.config.max_concurrent_fetches = max_concurrent_fetches; @@ -174,7 +183,7 @@ mod tests { block_range_scanner::{ DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, }, - event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES, + event_scanner::builder::DEFAULT_MAX_CONCURRENT_FETCHES, }; use super::*; diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/modes/sync/mod.rs similarity index 96% rename from src/event_scanner/scanner/sync/mod.rs rename to src/event_scanner/modes/sync/mod.rs index 3032b34f..8750b9c6 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/modes/sync/mod.rs @@ -1,11 +1,16 @@ +//! Catches up on historical events, then transitions to live streaming. +//! +//! Two sync variants are available: +//! - [`from_block`]: Syncs from a specific block then transitions to live mode +//! - [`from_latest`]: Collects the latest N events then transitions to live mode + use alloy::eips::BlockId; pub(crate) mod from_block; pub(crate) mod from_latest; -use crate::{ - EventScannerBuilder, - event_scanner::scanner::{SyncFromBlock, SyncFromLatestEvents, Synchronize}, +use crate::event_scanner::builder::{ + EventScannerBuilder, SyncFromBlock, SyncFromLatestEvents, Synchronize, }; impl EventScannerBuilder { diff --git a/src/event_scanner/scanner.rs b/src/event_scanner/scanner.rs new file mode 100644 index 00000000..f8ec7a41 --- /dev/null +++ b/src/event_scanner/scanner.rs @@ -0,0 +1,242 @@ +//! Core scanner types and subscription management. +//! +//! This module defines [`EventScanner`], [`EventSubscription`], and [`StartProof`] which together +//! provide the main interface for subscribing to and streaming blockchain events. + +use alloy::network::{Ethereum, Network}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use crate::{ + block_range_scanner::BlockRangeScanner, + event_scanner::{ + EventScannerResult, Unspecified, filter::EventFilter, listener::EventListener, + }, +}; + +/// An event scanner configured in mode `Mode` and bound to network `N`. +/// +/// Create an instance via [`EventScannerBuilder`](crate::EventScannerBuilder), register +/// subscriptions with [`EventScanner::subscribe`], then start the scanner with the mode-specific +/// `start()` method. +/// +/// # Starting the scanner +/// +/// All scanner modes follow the same general startup pattern: +/// +/// - **Register subscriptions first**: call [`EventScanner::subscribe`] before starting the scanner +/// with `start()`. The scanner sends events only to subscriptions that have already been +/// registered. +/// - **Non-blocking start**: `start()` returns immediately after spawning background tasks. +/// Subscription streams yield events asynchronously. +/// - **Errors after startup**: most runtime failures are delivered through subscription streams as +/// [`ScannerError`](crate::ScannerError) items, rather than being returned from `start()`. +#[derive(Debug)] +pub struct EventScanner { + pub(crate) config: Mode, + pub(crate) block_range_scanner: BlockRangeScanner, + pub(crate) listeners: Vec, +} + +impl EventScanner { + pub fn new(config: Mode, block_range_scanner: BlockRangeScanner) -> Self { + Self { config, block_range_scanner, listeners: Vec::new() } + } +} + +/// A subscription to scanner events that requires proof the scanner has started. +/// +/// Created by [`EventScanner::subscribe()`](crate::EventScanner::subscribe), this type holds the +/// underlying stream but prevents access until [`stream()`](EventSubscription::stream) is called +/// with a valid [`StartProof`]. +/// +/// This pattern ensures at compile time that `EventScanner::start()` is called before attempting to +/// read from the event stream. +/// +/// # Example +/// +/// ```no_run +/// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; +/// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; +/// # use tokio_stream::StreamExt; +/// # +/// # async fn example() -> Result<(), Box> { +/// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); +/// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; +/// # let provider = RobustProviderBuilder::new(provider).build().await?; +/// let mut scanner = EventScannerBuilder::live().connect(provider).await?; +/// +/// let filter = EventFilter::new().contract_address(contract_address); +/// +/// // Create subscription (cannot access stream yet) +/// let subscription = scanner.subscribe(filter); +/// +/// // Start scanner and get proof of it starting +/// let proof = scanner.start().await?; +/// +/// // Now access the stream with the proof +/// let mut stream = subscription.stream(&proof); +/// +/// while let Some(msg) = stream.next().await { +/// // process events +/// } +/// # Ok(()) +/// # } +/// ``` +pub struct EventSubscription { + inner: ReceiverStream, +} + +impl EventSubscription { + /// Creates a new subscription wrapping the given stream. + pub(crate) fn new(inner: ReceiverStream) -> Self { + Self { inner } + } + + /// Access the event stream. + /// + /// Requires a reference to a [`StartProof`] as proof that the scanner + /// has been started. The proof is obtained by calling + /// `EventScanner::start()`. + /// + /// # Arguments + /// + /// * `_proof` - Proof that the scanner has been started + #[must_use] + pub fn stream(self, _proof: &StartProof) -> ReceiverStream { + self.inner + } +} + +impl EventScanner { + /// Returns the configured stream buffer capacity. + #[must_use] + pub fn buffer_capacity(&self) -> usize { + self.block_range_scanner.buffer_capacity() + } + + /// Registers an event subscription. + /// + /// Each call creates a separate subscription with its own buffer. + /// + /// # Ordering + /// + /// Ordering is guaranteed only within a single returned subscription. There is no ordering + /// guarantee across subscriptions created by multiple calls to this method. + #[must_use] + pub fn subscribe(&mut self, filter: EventFilter) -> EventSubscription { + let (sender, receiver) = + mpsc::channel::(self.block_range_scanner.buffer_capacity()); + self.listeners.push(EventListener { filter, sender }); + EventSubscription::new(ReceiverStream::new(receiver)) + } +} + +/// Proof that the scanner has been started. +/// +/// This proof is returned by `EventScanner::start()` and must be passed to +/// [`EventSubscription::stream()`] to access the event stream. This ensures at compile +/// time that the scanner is started before attempting to read events. +/// +/// # Example +/// +/// ```no_run +/// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; +/// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; +/// # use tokio_stream::StreamExt; +/// # +/// # async fn example() -> Result<(), Box> { +/// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); +/// # let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; +/// # let provider = RobustProviderBuilder::new(provider).build().await?; +/// let mut scanner = EventScannerBuilder::sync().from_block(0).connect(provider).await?; +/// +/// let filter = EventFilter::new().contract_address(contract_address); +/// +/// // Create subscription (cannot access stream yet) +/// let subscription = scanner.subscribe(filter); +/// +/// // Start scanner and get proof of it starting +/// let proof = scanner.start().await?; +/// +/// // Now access the stream with the proof +/// let mut stream = subscription.stream(&proof); +/// +/// while let Some(msg) = stream.next().await { +/// // process events +/// } +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct StartProof { + /// Private field prevents construction outside this crate + _private: (), +} + +impl StartProof { + /// Creates a new start proof. + #[must_use] + pub(crate) fn new() -> Self { + Self { _private: () } + } +} + +#[cfg(test)] +mod tests { + use alloy::{ + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + + use crate::{BlockRangeScannerBuilder, Historic}; + + use super::*; + + #[tokio::test] + async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let brs = BlockRangeScannerBuilder::new().connect(provider).await?; + + let mut scanner = EventScanner::new(Historic::default(), brs); + + assert!(scanner.listeners.is_empty()); + + let _stream1 = scanner.subscribe(EventFilter::new()); + assert_eq!(scanner.listeners.len(), 1); + + let _stream2 = scanner.subscribe(EventFilter::new()); + let _stream3 = scanner.subscribe(EventFilter::new()); + assert_eq!(scanner.listeners.len(), 3); + + Ok(()) + } + + #[tokio::test] + async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + + let brs = BlockRangeScannerBuilder::new().connect(provider.clone()).await?; + + let mut scanner = EventScanner::new(Historic::default(), brs); + + let _ = scanner.subscribe(EventFilter::new()); + let sender = &scanner.listeners[0].sender; + assert_eq!(sender.capacity(), scanner.block_range_scanner.buffer_capacity()); + + let custom_capacity = 1000; + + let brs = BlockRangeScannerBuilder::new() + .buffer_capacity(custom_capacity) + .connect(provider) + .await?; + + let mut scanner = EventScanner::new(Historic::default(), brs); + + let _ = scanner.subscribe(EventFilter::new()); + let sender = &scanner.listeners[0].sender; + assert_eq!(sender.capacity(), custom_capacity); + + Ok(()) + } +}