Skip to content

Commit 9b8e077

Browse files
0xNeshiLeoPatOZ
andauthored
Docs improvements (#261)
Co-authored-by: Leo <[email protected]>
1 parent d9c3aeb commit 9b8e077

File tree

19 files changed

+526
-107
lines changed

19 files changed

+526
-107
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,15 @@ let multi_sigs = EventFilter::new()
230230
The scanner delivers three types of messages through the event stream:
231231

232232
- **`Message::Data(Vec<Log>)`** – Contains a batch of matching event logs. Each log includes the raw event data, transaction hash, block number, and other metadata.
233-
- **`Message::Notification(Notification)`** – Notifications from the scanner:
234-
- **`ScannerError`** – Errors indicating that the scanner has encountered issues (e.g., RPC failures, connection problems)
233+
- **`Message::Notification(Notification)`** – Notifications from the scanner.
234+
- **`ScannerError`** – Errors indicating that the scanner has encountered issues (e.g., RPC failures, connection problems, or a lagging consumer).
235235

236236
Always handle all message types in your stream processing loop to ensure robust error handling and proper reorg detection.
237237

238+
Notes:
239+
240+
- Ordering is guaranteed only within a single subscription stream. There is no global ordering guarantee across multiple subscriptions.
241+
- When the scanner detects a reorg, it emits `Notification::ReorgDetected`. Consumers should assume the same events might be delivered more than once around reorgs (i.e. benign duplicates are possible). Depending on the application's needs, this could be handled via idempotency/deduplication or by rolling back application state on reorg notifications.
238242

239243
### Scanning Modes
240244

@@ -248,7 +252,7 @@ Always handle all message types in your stream processing loop to ensure robust
248252

249253
- Set `max_block_range` based on your RPC provider's limits (e.g., Alchemy, Infura may limit queries to 2000 blocks). Default is 1000 blocks.
250254
- The modes come with sensible defaults; for example, not specifying a start block for historic mode automatically sets it to the genesis block.
251-
- For live mode, if the WebSocket subscription lags significantly (e.g., >2000 blocks), ranges are automatically capped to prevent RPC errors.
255+
- In live mode, if the block subscription lags and the scanner needs to catch up by querying past blocks, catch-up queries are performed in ranges bounded by `max_block_range` to respect provider limits.
252256

253257
---
254258

src/block_range_scanner.rs

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,28 @@
1-
//! Example usage:
1+
//! Block-range streaming service.
2+
//!
3+
//! This module provides a lower-level primitive used by [`crate::EventScanner`]: it streams
4+
//! contiguous block number ranges (inclusive) and emits [`crate::Notification`] values for
5+
//! certain state transitions (e.g. reorg detection).
6+
//!
7+
//! [`BlockRangeScanner`] is useful when you want to build your own log-fetching pipeline on top of
8+
//! range streaming, or when you need direct access to the scanner's batching and reorg-detection
9+
//! behavior.
10+
//!
11+
//! # Output stream
12+
//!
13+
//! Streams returned by [`BlockRangeScannerClient`] yield [`BlockScannerResult`] items:
14+
//!
15+
//! - `Ok(ScannerMessage::Data(range))` for a block range to process.
16+
//! - `Ok(ScannerMessage::Notification(_))` for scanner notifications.
17+
//! - `Err(ScannerError)` for errors.
18+
//!
19+
//! # Ordering
20+
//!
21+
//! Range messages are streamed in chronological order within a single stream (lower block number
22+
//! to higher block number). On reorgs, the scanner may re-emit previously-seen ranges for the
23+
//! affected blocks.
24+
//!
25+
//! # Example usage:
226
//!
327
//! ```rust,no_run
428
//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
@@ -91,18 +115,19 @@ pub(crate) use range_iterator::RangeIterator;
91115
use reorg_handler::ReorgHandler;
92116
pub use ring_buffer::RingBufferCapacity;
93117

118+
/// Default maximum number of blocks per streamed range.
94119
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
95120

121+
/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
96122
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
97123

124+
/// Default per-stream buffer size used by scanners.
98125
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
99126

100-
// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
101-
// is considered final)
102-
pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64;
103-
127+
/// The result type yielded by block-range streams.
104128
pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
105129

130+
/// Convenience alias for a streamed block-range message.
106131
pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
107132

108133
impl From<RangeInclusive<BlockNumber>> for Message {
@@ -123,9 +148,14 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
123148
}
124149
}
125150

126-
#[derive(Clone)]
151+
/// Builder/configuration for the block-range streaming service.
152+
#[derive(Clone, Debug)]
127153
pub struct BlockRangeScanner {
154+
/// Maximum number of blocks per streamed range.
128155
pub max_block_range: u64,
156+
/// How many past block hashes to keep in memory for reorg detection.
157+
///
158+
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
129159
pub past_blocks_storage_capacity: RingBufferCapacity,
130160
pub buffer_capacity: usize,
131161
}
@@ -137,6 +167,7 @@ impl Default for BlockRangeScanner {
137167
}
138168

139169
impl BlockRangeScanner {
170+
/// Creates a scanner with default configuration.
140171
#[must_use]
141172
pub fn new() -> Self {
142173
Self {
@@ -146,12 +177,20 @@ impl BlockRangeScanner {
146177
}
147178
}
148179

180+
/// Sets the maximum number of blocks per streamed range.
181+
///
182+
/// This controls batching for historical scans and for catch-up in live/sync scanners.
183+
///
184+
/// Must be greater than 0.
149185
#[must_use]
150186
pub fn max_block_range(mut self, max_block_range: u64) -> Self {
151187
self.max_block_range = max_block_range;
152188
self
153189
}
154190

191+
/// Sets how many past block hashes to keep in memory for reorg detection.
192+
///
193+
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
155194
#[must_use]
156195
pub fn past_blocks_storage_capacity(
157196
mut self,
@@ -161,6 +200,14 @@ impl BlockRangeScanner {
161200
self
162201
}
163202

203+
/// Sets the stream buffer capacity.
204+
///
205+
/// Controls the maximum number of messages that can be buffered in the stream
206+
/// before backpressure is applied.
207+
///
208+
/// # Arguments
209+
///
210+
/// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
164211
#[must_use]
165212
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
166213
self.buffer_capacity = buffer_capacity;
@@ -192,6 +239,11 @@ impl BlockRangeScanner {
192239
}
193240
}
194241

242+
/// A [`BlockRangeScanner`] connected to a provider.
243+
///
244+
/// Use [`ConnectedBlockRangeScanner::run`] to start the background service and obtain a
245+
/// [`BlockRangeScannerClient`].
246+
#[derive(Debug)]
195247
pub struct ConnectedBlockRangeScanner<N: Network> {
196248
provider: RobustProvider<N>,
197249
max_block_range: u64,
@@ -200,7 +252,7 @@ pub struct ConnectedBlockRangeScanner<N: Network> {
200252
}
201253

202254
impl<N: Network> ConnectedBlockRangeScanner<N> {
203-
/// Returns the `RobustProvider`
255+
/// Returns the underlying [`RobustProvider`].
204256
#[must_use]
205257
pub fn provider(&self) -> &RobustProvider<N> {
206258
&self.provider
@@ -230,25 +282,30 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
230282
}
231283
}
232284

285+
/// Commands accepted by the internal block-range service.
233286
#[derive(Debug)]
234-
pub enum Command {
287+
enum Command {
288+
/// Start a live stream.
235289
StreamLive {
236290
sender: mpsc::Sender<BlockScannerResult>,
237291
block_confirmations: u64,
238292
response: oneshot::Sender<Result<(), ScannerError>>,
239293
},
294+
/// Start a historical range stream.
240295
StreamHistorical {
241296
sender: mpsc::Sender<BlockScannerResult>,
242297
start_id: BlockId,
243298
end_id: BlockId,
244299
response: oneshot::Sender<Result<(), ScannerError>>,
245300
},
301+
/// Start a stream that catches up from `start_id` and then transitions to live streaming.
246302
StreamFrom {
247303
sender: mpsc::Sender<BlockScannerResult>,
248304
start_id: BlockId,
249305
block_confirmations: u64,
250306
response: oneshot::Sender<Result<(), ScannerError>>,
251307
},
308+
/// Start a reverse stream (newer to older), in batches.
252309
Rewind {
253310
sender: mpsc::Sender<BlockScannerResult>,
254311
start_id: BlockId,
@@ -267,6 +324,7 @@ struct Service<N: Network> {
267324
}
268325

269326
impl<N: Network> Service<N> {
327+
/// Creates a new background service instance and its command channel.
270328
pub fn new(
271329
provider: RobustProvider<N>,
272330
max_block_range: u64,
@@ -580,6 +638,9 @@ impl<N: Network> Service<N> {
580638
}
581639
}
582640

641+
/// Client for requesting block-range streams from the background service.
642+
///
643+
/// Each method returns a new stream whose items are [`BlockScannerResult`] values.
583644
pub struct BlockRangeScannerClient {
584645
command_sender: mpsc::Sender<Command>,
585646
buffer_capacity: usize,
@@ -593,7 +654,7 @@ impl BlockRangeScannerClient {
593654
/// * `command_sender` - The sender for sending commands to the subscription service.
594655
/// * `buffer_capacity` - The capacity for buffering messages in the stream.
595656
#[must_use]
596-
pub fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
657+
fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
597658
Self { command_sender, buffer_capacity }
598659
}
599660

src/block_range_scanner/ring_buffer.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
use std::collections::VecDeque;
22

3+
/// Configuration for how many past block hashes to retain for reorg detection.
4+
///
5+
/// This type is re-exported as `PastBlocksStorageCapacity` from the crate root.
36
#[derive(Copy, Clone, Debug)]
47
pub enum RingBufferCapacity {
8+
/// Keep at most `n` items.
9+
///
10+
/// A value of `0` disables storing past block hashes and effectively disables reorg
11+
/// detection.
512
Limited(usize),
13+
/// Keep an unbounded number of items.
14+
///
15+
/// WARNING: This can lead to unbounded memory growth over long-running processes.
16+
/// Avoid using this in production deployments without an external bound.
617
Infinite,
718
}
819

@@ -56,14 +67,17 @@ impl<T> RingBuffer<T> {
5667
}
5768
}
5869

70+
/// Removes and returns the newest element from the buffer.
5971
pub fn pop_back(&mut self) -> Option<T> {
6072
self.inner.pop_back()
6173
}
6274

75+
/// Returns a reference to the newest element in the buffer.
6376
pub fn back(&self) -> Option<&T> {
6477
self.inner.back()
6578
}
6679

80+
/// Clears all elements currently stored in the buffer.
6781
pub fn clear(&mut self) {
6882
self.inner.clear();
6983
}

src/error.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,61 @@ use thiserror::Error;
88

99
use crate::{robust_provider::provider::Error as RobustProviderError, types::ScannerResult};
1010

11+
/// Errors emitted by the scanner.
12+
///
13+
/// `ScannerError` values can be returned by builder `connect()` methods and are also yielded by
14+
/// subscription streams (as `Err(ScannerError)` items).
15+
///
16+
/// All errors except [`ScannerError::Lagged`] are terminal and will halt further stream processing.
1117
#[derive(Error, Debug, Clone)]
1218
pub enum ScannerError {
19+
/// The underlying RPC transport returned an error.
1320
#[error("RPC error: {0}")]
1421
RpcError(Arc<RpcError<TransportErrorKind>>),
1522

23+
/// The internal service has shut down and can no longer process commands.
1624
#[error("Service is shutting down")]
1725
ServiceShutdown,
1826

27+
/// A requested block (by number, hash or tag) could not be retrieved.
1928
#[error("Block not found, Block Id: {0}")]
2029
BlockNotFound(BlockId),
2130

31+
/// A timeout elapsed while waiting for an RPC response.
2232
#[error("Operation timed out")]
2333
Timeout,
2434

35+
/// A configured block parameter exceeds the latest known block.
2536
#[error("{0} {1} exceeds the latest block {2}")]
2637
BlockExceedsLatest(&'static str, u64, u64),
2738

39+
/// The requested event count is invalid (must be greater than zero).
2840
#[error("Event count must be greater than 0")]
2941
InvalidEventCount,
3042

43+
/// The configured maximum block range is invalid (must be greater than zero).
3144
#[error("Max block range must be greater than 0")]
3245
InvalidMaxBlockRange,
3346

47+
/// The configured stream buffer capacity is invalid (must be greater than zero).
3448
#[error("Stream buffer capacity must be greater than 0")]
3549
InvalidBufferCapacity,
3650

51+
/// The configured maximum number of concurrent fetches is invalid (must be greater than
52+
/// zero).
3753
#[error("Max concurrent fetches must be greater than 0")]
3854
InvalidMaxConcurrentFetches,
3955

56+
/// A block subscription ended (for example, the underlying WebSocket subscription closed).
4057
#[error("Subscription closed")]
4158
SubscriptionClosed,
59+
60+
/// A subscription consumer could not keep up and some internal messages were skipped.
61+
///
62+
/// The contained value is the number of skipped messages reported by the underlying channel.
63+
/// After emitting this error, the subscription stream may continue with newer items.
64+
#[error("Subscription lagged")]
65+
Lagged(u64),
4266
}
4367

4468
impl From<RobustProviderError> for ScannerError {

src/event_scanner/listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::event_scanner::{EventScannerResult, filter::EventFilter};
22
use tokio::sync::mpsc::Sender;
33

4-
#[derive(Clone)]
4+
#[derive(Clone, Debug)]
55
pub(crate) struct EventListener {
66
pub filter: EventFilter,
77
pub sender: Sender<EventScannerResult>,

src/event_scanner/message.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,14 @@ use crate::{
55
types::{IntoScannerResult, ScannerResult},
66
};
77

8+
/// The item type yielded by event subscription streams.
9+
///
10+
/// This is a [`ScannerMessage`] whose data payload is a batch of [`Log`] values.
811
pub type Message = ScannerMessage<Vec<Log>>;
12+
13+
/// The `Result` type yielded by event subscription streams.
14+
///
15+
/// Successful items are [`Message`] values; failures are [`crate::ScannerError`].
916
pub type EventScannerResult = ScannerResult<Vec<Log>>;
1017

1118
impl From<Vec<Log>> for Message {

src/event_scanner/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
//! High-level event scanner API.
2+
//!
3+
//! This module re-exports the primary types used for scanning EVM logs:
4+
//!
5+
//! - [`EventScanner`] and [`EventScannerBuilder`] for constructing and running scanners.
6+
//! - [`EventFilter`] for defining which contract addresses and event signatures to match.
7+
//! - [`Message`] / [`EventScannerResult`] for consuming subscription streams.
8+
//!
9+
//! Mode marker types (e.g. [`Live`], [`Historic`]) are also re-exported.
10+
111
mod filter;
212
mod listener;
313
mod message;

0 commit comments

Comments
 (0)