Skip to content

Commit 6f036a4

Browse files
authored
Merge branch 'OpenZeppelin:main' into feat/benchmarks
2 parents 6a1e923 + 18c8654 commit 6f036a4

File tree

17 files changed

+564
-523
lines changed

17 files changed

+564
-523
lines changed

README.md

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ event-scanner = "0.9.0-alpha"
5959
Create an event stream for the given event filters registered with the `EventScanner`:
6060

6161
```rust
62-
use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}, sol_types::SolEvent};
62+
use alloy::{network::Ethereum, providers::ProviderBuilder, sol_types::SolEvent};
6363
use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder};
6464
use tokio_stream::StreamExt;
6565
use tracing::{error, info};
@@ -125,8 +125,8 @@ Once configured, connect using:
125125
This will connect the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes).
126126

127127
```rust
128-
use alloy::providers::{Provider, ProviderBuilder};
129-
use event_scanner::robust_provider::RobustProviderBuilder;
128+
use alloy::providers::ProviderBuilder;
129+
use event_scanner::{EventScannerBuilder, robust_provider::RobustProviderBuilder};
130130

131131
// Connect to provider (example with WebSocket)
132132
let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
@@ -135,14 +135,16 @@ let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?;
135135
let scanner = EventScannerBuilder::live()
136136
.max_block_range(500) // Optional: set max blocks per read (default: 1000)
137137
.block_confirmations(12) // Optional: set block confirmations (default: 12)
138-
.connect(provider.clone());
138+
.connect(provider.clone())
139+
.await?;
139140

140141
// Historical block range mode
141142
let scanner = EventScannerBuilder::historic()
142143
.from_block(1_000_000)
143144
.to_block(2_000_000)
144145
.max_block_range(500)
145-
.connect(provider.clone());
146+
.connect(provider.clone())
147+
.await?;
146148

147149
// we can also wrap the provider in a RobustProvider
148150
// for more advanced configurations like retries and fallbacks
@@ -153,20 +155,23 @@ let scanner = EventScannerBuilder::latest(100)
153155
// .from_block(1_000_000) // Optional: set start of search range
154156
// .to_block(2_000_000) // Optional: set end of search range
155157
.max_block_range(500)
156-
.connect(robust_provider.clone());
158+
.connect(robust_provider.clone())
159+
.await?;
157160

158161
// Sync from block then switch to live mode
159162
let scanner = EventScannerBuilder::sync()
160163
.from_block(100)
161164
.max_block_range(500)
162165
.block_confirmations(12)
163-
.connect(robust_provider.clone());
166+
.connect(robust_provider.clone())
167+
.await?;
164168

165169
// Sync the latest 60 events then switch to live mode
166170
let scanner = EventScannerBuilder::sync()
167171
.from_latest(60)
168172
.block_confirmations(12)
169-
.connect(robust_provider);
173+
.connect(robust_provider)
174+
.await?;
170175
```
171176

172177
Invoking `scanner.start()` starts the scanner in the specified mode.
@@ -176,25 +181,28 @@ Invoking `scanner.start()` starts the scanner in the specified mode.
176181
Create an `EventFilter` for each event stream you wish to process. The filter specifies the contract address where events originated, and event signatures (tip: you can use the value stored in `SolEvent::SIGNATURE`).
177182

178183
```rust
184+
use alloy::sol_types::SolEvent;
185+
use event_scanner::EventFilter;
186+
179187
// Track a SPECIFIC event from a SPECIFIC contract
180188
let specific_filter = EventFilter::new()
181-
.contract_address(*counter_contract.address())
182-
.event(Counter::CountIncreased::SIGNATURE);
189+
.contract_address(*my_contract.address())
190+
.event(MyContract::SomeEvent::SIGNATURE);
183191

184192
// Track multiple events from a SPECIFIC contract
185193
let specific_filter = EventFilter::new()
186-
.contract_address(*counter_contract.address())
187-
.event(Counter::CountIncreased::SIGNATURE)
188-
.event(Counter::CountDecreased::SIGNATURE);
194+
.contract_address(*my_contract.address())
195+
.event(MyContract::SomeEvent::SIGNATURE)
196+
.event(MyContract::OtherEvent::SIGNATURE);
189197

190198
// Track a SPECIFIC event from ALL contracts
191199
let specific_filter = EventFilter::new()
192-
.event(Counter::CountIncreased::SIGNATURE);
200+
.event(MyContract::SomeEvent::SIGNATURE);
193201

194202
// Track ALL events from SPECIFIC contracts
195203
let all_contract_events_filter = EventFilter::new()
196-
.contract_address(*counter_contract.address())
197-
.contract_address(*other_counter_contract.address());
204+
.contract_address(*my_contract.address())
205+
.contract_address(*other_contract.address());
198206

199207
// Track ALL events from ALL contracts
200208
let all_events_filter = EventFilter::new();
@@ -211,17 +219,17 @@ Batch builder examples:
211219
```rust
212220
// Multiple contract addresses at once
213221
let multi_addr = EventFilter::new()
214-
.contract_addresses([*counter_contract.address(), *other_counter_contract.address()]);
222+
.contract_addresses([*my_contract.address(), *other_contract.address()]);
215223

216224
// Multiple event names at once
217225
let multi_events = EventFilter::new()
218-
.events([Counter::CountIncreased::SIGNATURE, Counter::CountDecreased::SIGNATURE]);
226+
.events([MyContract::SomeEvent::SIGNATURE, MyContract::OtherEvent::SIGNATURE]);
219227

220228
// Multiple event signature hashes at once
221229
let multi_sigs = EventFilter::new()
222230
.event_signatures([
223-
Counter::CountIncreased::SIGNATURE_HASH,
224-
Counter::CountDecreased::SIGNATURE_HASH,
231+
MyContract::SomeEvent::SIGNATURE_HASH,
232+
MyContract::OtherEvent::SIGNATURE_HASH,
225233
]);
226234
```
227235

@@ -303,6 +311,9 @@ async fn example() -> anyhow::Result<()> {
303311
.subscription_timeout(Duration::from_secs(120))
304312
.build()
305313
.await?;
314+
315+
// ...
316+
306317
Ok(())
307318
}
308319
```

src/block_range_scanner/builder.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use alloy::network::Network;
2+
3+
use crate::{
4+
ScannerError,
5+
block_range_scanner::{
6+
DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity,
7+
scanner::BlockRangeScanner,
8+
},
9+
robust_provider::IntoRobustProvider,
10+
};
11+
12+
/// Builder/configuration for the block-range streaming service.
13+
#[derive(Clone, Debug)]
14+
pub struct BlockRangeScannerBuilder {
15+
/// Maximum number of blocks per streamed range.
16+
pub max_block_range: u64,
17+
/// How many past block hashes to keep in memory for reorg detection.
18+
///
19+
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
20+
pub past_blocks_storage_capacity: RingBufferCapacity,
21+
pub buffer_capacity: usize,
22+
}
23+
24+
impl Default for BlockRangeScannerBuilder {
25+
fn default() -> Self {
26+
Self::new()
27+
}
28+
}
29+
30+
impl BlockRangeScannerBuilder {
31+
/// Creates a scanner with default configuration.
32+
#[must_use]
33+
pub fn new() -> Self {
34+
Self {
35+
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
36+
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
37+
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
38+
}
39+
}
40+
41+
/// Sets the maximum number of blocks per streamed range.
42+
///
43+
/// This controls batching for historical scans and for catch-up in live/sync scanners.
44+
///
45+
/// Must be greater than 0.
46+
#[must_use]
47+
pub fn max_block_range(mut self, max_block_range: u64) -> Self {
48+
self.max_block_range = max_block_range;
49+
self
50+
}
51+
52+
/// Sets how many past block hashes to keep in memory for reorg detection.
53+
///
54+
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
55+
#[must_use]
56+
pub fn past_blocks_storage_capacity(
57+
mut self,
58+
past_blocks_storage_capacity: RingBufferCapacity,
59+
) -> Self {
60+
self.past_blocks_storage_capacity = past_blocks_storage_capacity;
61+
self
62+
}
63+
64+
/// Sets the stream buffer capacity.
65+
///
66+
/// Controls the maximum number of messages that can be buffered in the stream
67+
/// before backpressure is applied.
68+
///
69+
/// # Arguments
70+
///
71+
/// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
72+
#[must_use]
73+
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
74+
self.buffer_capacity = buffer_capacity;
75+
self
76+
}
77+
78+
/// Connects to an existing provider
79+
///
80+
/// # Errors
81+
///
82+
/// Returns an error if the provider connection fails.
83+
pub async fn connect<N: Network>(
84+
self,
85+
provider: impl IntoRobustProvider<N>,
86+
) -> Result<BlockRangeScanner<N>, ScannerError> {
87+
if self.max_block_range == 0 {
88+
return Err(ScannerError::InvalidMaxBlockRange);
89+
}
90+
if self.buffer_capacity == 0 {
91+
return Err(ScannerError::InvalidBufferCapacity);
92+
}
93+
let provider = provider.into_robust_provider().await?;
94+
Ok(BlockRangeScanner {
95+
provider,
96+
max_block_range: self.max_block_range,
97+
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
98+
buffer_capacity: self.buffer_capacity,
99+
})
100+
}
101+
}

src/block_range_scanner/common.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use std::ops::RangeInclusive;
2+
13
use tokio::sync::mpsc;
24
use tokio_stream::StreamExt;
35

46
use crate::{
5-
ScannerError,
6-
block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler},
7+
ScannerError, ScannerMessage,
8+
block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
79
robust_provider::{RobustProvider, RobustSubscription, subscription},
8-
types::{Notification, TryStream},
10+
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
911
};
1012
use alloy::{
1113
consensus::BlockHeader,
@@ -14,6 +16,39 @@ use alloy::{
1416
primitives::BlockNumber,
1517
};
1618

19+
/// Default maximum number of blocks per streamed range.
20+
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
21+
22+
/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
23+
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
24+
25+
/// Default per-stream buffer size used by scanners.
26+
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
27+
28+
/// The result type yielded by block-range streams.
29+
pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
30+
31+
/// Convenience alias for a streamed block-range message.
32+
pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
33+
34+
impl From<RangeInclusive<BlockNumber>> for Message {
35+
fn from(range: RangeInclusive<BlockNumber>) -> Self {
36+
Message::Data(range)
37+
}
38+
}
39+
40+
impl PartialEq<RangeInclusive<BlockNumber>> for Message {
41+
fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
42+
if let Message::Data(range) = self { range.eq(other) } else { false }
43+
}
44+
}
45+
46+
impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
47+
fn into_scanner_message_result(self) -> BlockScannerResult {
48+
Ok(Message::Data(self))
49+
}
50+
}
51+
1752
#[allow(clippy::too_many_arguments)]
1853
pub(crate) async fn stream_live_blocks<N: Network>(
1954
stream_start: BlockNumber,

src/block_range_scanner/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
mod builder;
2+
mod common;
3+
mod range_iterator;
4+
mod reorg_handler;
5+
mod rewind_handler;
6+
mod ring_buffer;
7+
mod scanner;
8+
mod sync_handler;
9+
10+
pub use builder::BlockRangeScannerBuilder;
11+
pub use common::BlockScannerResult;
12+
pub use ring_buffer::RingBufferCapacity;
13+
pub use scanner::BlockRangeScanner;
14+
15+
pub use common::{
16+
DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
17+
};

0 commit comments

Comments
 (0)