Skip to content

Commit b18762f

Browse files
LeoPatOZ0xNeshi
andauthored
feat: API Overhaul (#108)
Co-authored-by: Nenad <[email protected]>
1 parent b928ec0 commit b18762f

File tree

35 files changed

+2196
-1440
lines changed

35 files changed

+2196
-1440
lines changed

Cargo.lock

Lines changed: 28 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
members = [
33
".",
44
"examples/historical_scanning",
5-
"examples/simple_counter",
5+
"examples/live_scanning",
66
"examples/latest_events_scanning",
7+
"examples/sync_scanning"
78
]
89
resolver = "2"
910

README.md

Lines changed: 87 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
Event Scanner is a Rust library for streaming EVM-based smart contract events. It is built on top of the [`alloy`](https://github.com/alloy-rs/alloy) ecosystem and focuses on in-memory scanning without a backing database. Applications provide event filters; the scanner takes care of fetching historical ranges, bridging into live streaming mode, all whilst delivering the events as streams of data.
1212

1313
---
14-
1514

1615
## Table of Contents
1716

@@ -60,27 +59,29 @@ event-scanner = "0.3.0-alpha"
6059
Create an event stream for the given event filters registered with the `EventScanner`:
6160

6261
```rust
63-
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
64-
use event_scanner::{EventFilter, EventScanner, EventScannerError, EventScannerMessage};
62+
use alloy::{network::Ethereum, sol_types::SolEvent};
63+
use event_scanner::{EventFilter, EventScanner, EventScannerMessage};
6564
use tokio_stream::StreamExt;
6665

6766
use crate::MyContract;
6867

6968
async fn run_scanner(
7069
ws_url: alloy::transports::http::reqwest::Url,
7170
contract: alloy::primitives::Address,
72-
) -> Result<(), EventScannerError> {
73-
let mut client = EventScanner::new().connect_ws::<Ethereum>(ws_url).await?;
71+
) -> Result<(), Box<dyn std::error::Error>> {
72+
// Configure scanner with custom batch size (optional)
73+
let mut scanner = EventScanner::live()
74+
.block_read_limit(500) // Process up to 500 blocks per batch
75+
.connect_ws::<Ethereum>(ws_url).await?;
7476

7577
let filter = EventFilter::new()
76-
.with_contract_address(contract)
77-
.with_event(MyContract::SomeEvent::SIGNATURE);
78+
.contract_address(contract)
79+
.event(MyContract::SomeEvent::SIGNATURE);
7880

79-
let mut stream = client.create_event_stream(filter);
81+
let mut stream = scanner.subscribe(filter);
8082

81-
tokio::spawn(async move {
82-
client.start_scanner(BlockNumberOrTag::Earliest, Some(BlockNumberOrTag::Latest)).await
83-
});
83+
// Start the scanner
84+
tokio::spawn(async move { scanner.stream().await });
8485

8586
while let Some(EventScannerMessage::Data(logs)) = stream.next().await {
8687
println!("Fetched logs: {logs:?}");
@@ -96,13 +97,43 @@ async fn run_scanner(
9697

9798
### Building a Scanner
9899

99-
`EventScanner` supports:
100+
`EventScanner` provides mode-specific constructors and a builder pattern to configure settings before connecting:
100101

101-
- `with_blocks_read_per_epoch` - how many blocks are read at a time in a single batch (taken into consideration when fetching historical blocks)
102-
- `with_reorg_rewind_depth` - how many blocks to rewind when a reorg is detected (NOTE ⚠️: still WIP)
103-
- `with_block_confirmations` - how many confirmations to wait for before considering a block final
102+
```rust
103+
// Live streaming mode
104+
let scanner = EventScanner::live()
105+
.block_read_limit(500) // Optional: set max blocks per read (default: 1000)
106+
.connect_ws::<Ethereum>(ws_url).await?;
107+
108+
// Historical scanning mode
109+
let scanner = EventScanner::historic()
110+
.block_read_limit(500)
111+
.connect_ws::<Ethereum>(ws_url).await?;
112+
113+
// Sync mode (historical + live)
114+
let scanner = EventScanner::sync()
115+
.block_read_limit(500)
116+
.connect_ws::<Ethereum>(ws_url).await?;
117+
118+
// Latest mode (recent blocks only)
119+
let scanner = EventScanner::latest()
120+
.count(100)
121+
.block_read_limit(500)
122+
.connect_ws::<Ethereum>(ws_url).await?;
123+
```
124+
125+
**Available Modes:**
126+
- `EventScanner::live()` – Streams new blocks as they arrive
127+
- `EventScanner::historic()` – Processes historical block ranges
128+
- `EventScanner::sync()` – Processes historical data then transitions to live streaming
129+
- `EventScanner::latest()` – Processes a specific number of events then optionally switches to live scanning mode
104130

105-
Once configured, connect using either `connect_ws::<Ethereum>(ws_url)` or `connect_ipc::<Ethereum>(path)`. This will `connect` the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes).
131+
**Global Configuration Options:**
132+
- `block_read_limit(usize)` – Sets the maximum number of blocks to process per read operation. This prevents RPC provider errors from overly large block range queries.
133+
- Connect with `connect_ws::<Ethereum>(url)`, `connect_ipc::<Ethereum>(path)`, or `connect(provider)`.
134+
135+
**Starting the Scanner:**
136+
Invoking `scanner.start()` starts the scanner in the specified mode.
106137

107138
### Defining Event Filters
108139

@@ -111,70 +142,74 @@ Create an `EventFilter` for each event stream you wish to process. The filter sp
111142
```rust
112143
// Track a SPECIFIC event from a SPECIFIC contract
113144
let specific_filter = EventFilter::new()
114-
.with_contract_address(*counter_contract.address())
115-
.with_event(Counter::CountIncreased::SIGNATURE);
145+
.contract_address(*counter_contract.address())
146+
.event(Counter::CountIncreased::SIGNATURE);
116147

117148
// Track a multiple events from a SPECIFIC contract
118149
let specific_filter = EventFilter::new()
119-
.with_contract_address(*counter_contract.address())
120-
.with_event(Counter::CountIncreased::SIGNATURE)
121-
.with_event(Counter::CountDecreased::SIGNATURE);
150+
.contract_address(*counter_contract.address())
151+
.event(Counter::CountIncreased::SIGNATURE)
152+
.event(Counter::CountDecreased::SIGNATURE);
122153

123154
// Track a SPECIFIC event from a ALL contracts
124155
let specific_filter = EventFilter::new()
125-
.with_event(Counter::CountIncreased::SIGNATURE);
156+
.event(Counter::CountIncreased::SIGNATURE);
126157

127158
// Track ALL events from a SPECIFIC contracts
128159
let all_contract_events_filter = EventFilter::new()
129-
.with_contract_address(*counter_contract.address())
130-
.with_contract_address(*other_counter_contract.address());
160+
.contract_address(*counter_contract.address())
161+
.contract_address(*other_counter_contract.address());
131162

132163
// Track ALL events from ALL contracts in the block range
133164
let all_events_filter = EventFilter::new();
134165
```
135166

136-
Register multiple filters by invoking `create_event_stream` repeatedly.
167+
Register multiple filters by invoking `subscribe` repeatedly.
137168

138169
The flexibility provided by `EventFilter` allows you to build sophisticated event monitoring systems that can track events at different granularities depending on your application's needs.
139170

140171
### Scanning Modes
141172

142-
- **Live mode** - `start_scanner(BlockNumberOrTag::Latest, None)` subscribes to new blocks only. On detecting a reorg, the scanner emits `ScannerStatus::ReorgDetected` and recalculates the confirmed window, streaming logs from the corrected confirmed block range.
143-
- **Historical mode** - `start_scanner(BlockNumberOrTag::Number(start), Some(BlockNumberOrTag::Number(end)))`, scanner fetches events from a historical block range. Currently no reorg logic has been implemented (NOTE ⚠️: still WIP). In the case that the end block > finalized block and you need reorg resistance, we recommend to use sync mode.
144-
- **Historical → Live** - `start_scanner(BlockNumberOrTag::Number(start), None)` replays from `start` to current head, then streams future blocks. Reorgs are handled as per the particular mode phase the scanner is in (historical or live).
173+
- **Live mode**`EventScanner::live()` creates a scanner that subscribes to new blocks as they arrive.
174+
- **Historical mode**`EventScanner::historic()` creates a scanner for processing historical block ranges.
175+
- **Sync mode**`EventScanner::sync()` creates a scanner that processes historical data then automatically transitions to live streaming.
176+
- **Latest mode**`EventScanner::latest()` creates a scanner that processes a set number of events.
145177

146-
For now modes are deduced from the `start` and `end` parameters. In the future, we might add explicit commands to select the mode.
178+
**Configuration Tips:**
179+
- Set `block_read_limit` based on your RPC provider's limits (e.g., Alchemy, Infura may limit queries to 2000 blocks)
180+
- For live mode, if the WebSocket subscription lags significantly (e.g., >2000 blocks), ranges are automatically capped to prevent RPC errors
181+
- Each mode has its own configuration options for start block, end block, confirmations, etc. where it makes sense
182+
- The modes come with sensible defaults for example not specify a start block for historic mode automatically sets the start block to the earliest one
147183

148-
See the integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples.
184+
See integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples.
149185

150186
### Scanning Latest Events
151187

152-
`scan_latest` collects the most recent matching events for each registered stream.
188+
Scanner mode that collects a specified number of the most recent matching events for each registered stream.
153189

154190
- It does not enter live mode; it scans a block range and then returns.
155191
- Each registered stream receives at most `count` logs in a single message, chronologically ordered.
156192

157193
Basic usage:
158194

159195
```rust
160-
use alloy::{eips::BlockNumberOrTag, network::Ethereum};
161-
use event_scanner::{EventFilter, event_scanner::{EventScanner, EventScannerMessage}};
196+
use alloy::{network::Ethereum, primitives::Address, transports::http::reqwest::Url};
197+
use event_scanner::{EventFilter, EventScanner, Message};
162198
use tokio_stream::StreamExt;
163199

164-
async fn latest_example(ws_url: alloy::transports::http::reqwest::Url, addr: alloy::primitives::Address) -> eyre::Result<()> {
165-
let mut client = EventScanner::new().connect_ws::<Ethereum>(ws_url).await?;
200+
async fn latest_events(ws_url: Url, addr: Address) -> anyhow::Result<()> {
201+
let mut scanner = EventScanner::latest().count(10).connect_ws::<Ethereum>(ws_url).await?;
202+
203+
let filter = EventFilter::new().contract_address(addr);
166204

167-
let filter = EventFilter::new().with_contract_address(addr);
168-
let mut stream = client.create_event_stream(filter);
205+
let mut stream = scanner.subscribe(filter);
169206

170207
// Collect the latest 10 events across Earliest..=Latest
171-
client.scan_latest(10).await?;
208+
scanner.start().await?;
172209

173210
// Expect a single message with up to 10 logs, then the stream ends
174-
while let Some(msg) = stream.next().await {
175-
if let EventScannerMessage::Data(logs) = msg {
176-
println!("Latest logs: {}", logs.len());
177-
}
211+
while let Some(Message::Data(logs)) = stream.next().await {
212+
println!("Latest logs: {}", logs.len());
178213
}
179214

180215
Ok(())
@@ -185,30 +220,33 @@ Restricting to a specific block range:
185220

186221
```rust
187222
// Collect the latest 5 events between blocks [1_000_000, 1_100_000]
188-
client
189-
.scan_latest_in_range(5, BlockNumberOrTag::Number(1_000_000), BlockNumberOrTag::Number(1_100_000))
223+
let mut scanner = EventScanner::latest()
224+
.count(5)
225+
.from_block(1_000_000)
226+
.to_block(1_100_000)
227+
.connect_ws::<Ethereum>(ws_url).await?;
190228
.await?;
191229
```
192230

193231
The scanner periodically checks the tip to detect reorgs. On reorg, the scanner emits `ScannerStatus::ReorgDetected`, resets to the updated tip, and restarts the scan. Final delivery to log listeners is in chronological order.
194232

195233
Notes:
196234

197-
- Ensure you create streams via `create_event_stream()` before calling `scan_latest*` so listeners are registered.
235+
- Ensure you create streams via `subscribe()` before calling `start` so listeners are registered.
198236
<!-- TODO: uncomment once implemented - The function returns after delivering the messages; to continuously stream new blocks, use `scan_latest_then_live`. -->
199237

200238
---
201239

202240
## Examples
203241

204-
- `examples/simple_counter` – minimal live-mode scanner
205-
- `examples/historical_scanning` – demonstrates replaying from genesis (block 0) before continuing streaming latest blocks
206-
- `examples/latest_events_scanning` – demonstrates scanning the latest events
242+
- `examples/live_scanning` – minimal live-mode scanner using `EventScanner::live()`
243+
- `examples/historical_scanning` – demonstrates replaying historical data using `EventScanner::historic()`
244+
- `examples/latest_events_scanning` – demonstrates scanning the latest events using `EventScanner::latest()`
207245

208246
Run an example with:
209247

210248
```bash
211-
RUST_LOG=info cargo run -p simple_counter
249+
RUST_LOG=info cargo run -p live_scanning
212250
# or
213251
RUST_LOG=info cargo run -p historical_scanning
214252
```

examples/historical_scanning/main.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,19 @@
11
use std::time::Duration;
22

3-
use alloy::{
4-
eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent,
5-
};
3+
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
64
use alloy_node_bindings::Anvil;
7-
use event_scanner::{
8-
EventFilter,
9-
event_scanner::{EventScanner, EventScannerMessage},
10-
};
115

6+
use event_scanner::{EventFilter, EventScanner, Message};
127
use tokio::time::sleep;
138
use tokio_stream::StreamExt;
149
use tracing::{error, info};
1510
use tracing_subscriber::EnvFilter;
1611

1712
sol! {
18-
#[allow(missing_docs)]
19-
#[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")]
20-
contract Counter {
13+
// Built directly with solc 0.8.30+commit.73712a01.Darwin.appleclang
14+
#[sol(rpc,
15+
bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220471585b420a1ad0093820ff10129ec863f6df4bec186546249391fbc3cdbaa7c64736f6c634300081e0033"
16+
)] contract Counter {
2117
uint256 public count;
2218

2319
event CountIncreased(uint256 newCount);
@@ -39,6 +35,7 @@ contract Counter {
3935
}
4036
}
4137
}
38+
4239
#[tokio::main]
4340
async fn main() -> anyhow::Result<()> {
4441
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();
@@ -52,29 +49,30 @@ async fn main() -> anyhow::Result<()> {
5249
let contract_address = counter_contract.address();
5350

5451
let increase_filter = EventFilter::new()
55-
.with_contract_address(*contract_address)
56-
.with_event(Counter::CountIncreased::SIGNATURE);
52+
.contract_address(*contract_address)
53+
.event(Counter::CountIncreased::SIGNATURE);
5754

5855
let _ = counter_contract.increase().send().await?.get_receipt().await?;
5956

60-
let mut client = EventScanner::new().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
57+
let mut scanner =
58+
EventScanner::historic().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
6159

62-
let mut stream = client.create_event_stream(increase_filter);
60+
let mut stream = scanner.subscribe(increase_filter);
6361

6462
sleep(Duration::from_secs(10)).await;
65-
client.start_scanner(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner");
63+
scanner.start().await.expect("failed to start scanner");
6664

6765
while let Some(message) = stream.next().await {
6866
match message {
69-
EventScannerMessage::Data(logs) => {
67+
Message::Data(logs) => {
7068
for log in logs {
7169
info!("Callback successfully executed with event {:?}", log.inner.data);
7270
}
7371
}
74-
EventScannerMessage::Error(e) => {
72+
Message::Error(e) => {
7573
error!("Received error: {}", e);
7674
}
77-
EventScannerMessage::Status(info) => {
75+
Message::Status(info) => {
7876
info!("Received info: {:?}", info);
7977
}
8078
}

0 commit comments

Comments
 (0)