Skip to content

Commit 78d5a17

Browse files
authored
feat: Add "fetch latest X events" stream function (#121)
1 parent de7c59b commit 78d5a17

File tree

22 files changed

+1355
-154
lines changed

22 files changed

+1355
-154
lines changed

CONTRIBUTING.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ cargo build --locked --all-targets --all-features
6060
Run the test suite (we recommend nextest):
6161

6262
```bash
63-
cargo nextest run
63+
cargo nextest run --features test-utils
6464
# or
65-
cargo test
65+
cargo test --features test-utils
6666
```
6767

6868
Run examples:
@@ -123,7 +123,7 @@ cargo clippy --all-targets --all-features -- -D warnings -D clippy::pedantic
123123
typos
124124
125125
# Tests (prefer nextest)
126-
cargo nextest run
126+
cargo nextest run --features test-utils
127127
```
128128

129129
---
@@ -160,7 +160,7 @@ Key implementation details, trade-offs, and alternatives considered.
160160
- [ ] `cargo +nightly fmt --all --check`
161161
- [ ] `cargo clippy --all-targets --all-features -- -D warnings -D clippy::pedantic`
162162
- [ ] `typos`
163-
- [ ] Tests pass (`cargo nextest run`)
163+
- [ ] Tests pass (`cargo nextest run --features test-utils`)
164164
- [ ] Docs/README/examples updated (if applicable)
165165
```
166166

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
[workspace]
2-
members = [".", "examples/historical_scanning", "examples/simple_counter"]
2+
members = [
3+
".",
4+
"examples/historical_scanning",
5+
"examples/simple_counter",
6+
"examples/latest_events_scanning",
7+
]
38
resolver = "2"
49

510
[workspace.package]
@@ -63,3 +68,6 @@ tracing-subscriber.workspace = true
6368

6469
[lints]
6570
workspace = true
71+
72+
[features]
73+
test-utils = []

README.md

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
Event Scanner is a Rust library for streaming EVM-based smart contract events. It is built on top of the [`alloy`](https://github.com/alloy-rs/alloy) ecosystem and focuses on in-memory scanning without a backing database. Applications provide event filters; the scanner takes care of fetching historical ranges, bridging into live streaming mode, all whilst delivering the events as streams of data.
1212

1313
---
14+
1415

1516
## Table of Contents
1617

@@ -21,7 +22,7 @@ Event Scanner is a Rust library for streaming EVM-based smart contract events. I
2122
- [Building a Scanner](#building-a-scanner)
2223
- [Defining Event Filters](#defining-event-filters)
2324
- [Scanning Modes](#scanning-modes)
24-
- [Working with Callbacks](#working-with-callbacks)
25+
- [Scanning Latest Events](#scanning-latest-events)
2526
- [Examples](#examples)
2627
- [Testing](#testing)
2728

@@ -32,6 +33,7 @@ Event Scanner is a Rust library for streaming EVM-based smart contract events. I
3233
- **Historical replay** – stream events from past block ranges.
3334
- **Live subscriptions** – stay up to date with latest events via WebSocket or IPC transports.
3435
- **Hybrid flow** – automatically transition from historical catch-up into streaming mode.
36+
- **Latest events fetch** – one-shot rewind to collect the most recent matching logs.
3537
- **Composable filters** – register one or many contract + event signature pairs.
3638
- **No database** – processing happens in-memory; persistence is left to the host application.
3739

@@ -137,20 +139,71 @@ The flexibility provided by `EventFilter` allows you to build sophisticated even
137139

138140
### Scanning Modes
139141

140-
- **Live mode** `start_scanner(BlockNumberOrTag::Latest, None)` subscribes to new blocks only.
141-
- **Historical mode** `start_scanner(BlockNumberOrTag::Number(start), Some(BlockNumberOrTag::Number(end)))`, scanner fetches events from a historical block range.
142-
- **Historical → Live** `start_scanner(BlockNumberOrTag::Number(start), None)` replays from `start` to current head, then streams future blocks.
142+
- **Live mode** - `start_scanner(BlockNumberOrTag::Latest, None)` subscribes to new blocks only. On detecting a reorg, the scanner emits `ScannerStatus::ReorgDetected` and recalculates the confirmed window, streaming logs from the corrected confirmed block range.
143+
- **Historical mode** - `start_scanner(BlockNumberOrTag::Number(start), Some(BlockNumberOrTag::Number(end)))`, scanner fetches events from a historical block range. While syncing ranges, the scanner verifies continuity. If a reorg is detected, it rewinds by `with_reorg_rewind_depth` blocks and resumes forward syncing.
144+
- **Historical → Live** - `start_scanner(BlockNumberOrTag::Number(start), None)` replays from `start` to current head, then streams future blocks. Reorgs are handled as per the particular mode phase the scanner is in (historical or live).
143145

144146
For now modes are deduced from the `start` and `end` parameters. In the future, we might add explicit commands to select the mode.
145147

146148
See the integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples.
147149

150+
### Scanning Latest Events
151+
152+
`scan_latest` collects the most recent matching events for each registered stream.
153+
154+
- It does not enter live mode; it scans a block range and then returns.
155+
- Each registered stream receives at most `count` logs in a single message, chronologically ordered.
156+
157+
Basic usage:
158+
159+
```rust
160+
use alloy::{eips::BlockNumberOrTag, network::Ethereum};
161+
use event_scanner::{EventFilter, event_scanner::{EventScanner, EventScannerMessage}};
162+
use tokio_stream::StreamExt;
163+
164+
async fn latest_example(ws_url: alloy::transports::http::reqwest::Url, addr: alloy::primitives::Address) -> eyre::Result<()> {
165+
let mut client = EventScanner::new().connect_ws::<Ethereum>(ws_url).await?;
166+
167+
let filter = EventFilter::new().with_contract_address(addr);
168+
let mut stream = client.create_event_stream(filter);
169+
170+
// Collect the latest 10 events across Earliest..=Latest
171+
client.scan_latest(10).await?;
172+
173+
// Expect a single message with up to 10 logs, then the stream ends
174+
while let Some(msg) = stream.next().await {
175+
if let EventScannerMessage::Data(logs) = msg {
176+
println!("Latest logs: {}", logs.len());
177+
}
178+
}
179+
180+
Ok(())
181+
}
182+
```
183+
184+
Restricting to a specific block range:
185+
186+
```rust
187+
// Collect the latest 5 events between blocks [1_000_000, 1_100_000]
188+
client
189+
.scan_latest_in_range(5, BlockNumberOrTag::Number(1_000_000), BlockNumberOrTag::Number(1_100_000))
190+
.await?;
191+
```
192+
193+
The scanner periodically checks the tip to detect reorgs. On reorg, the scanner emits `ScannerStatus::ReorgDetected`, resets to the updated tip, and restarts the scan. Final delivery to log listeners is in chronological order.
194+
195+
Notes:
196+
197+
- Ensure you create streams via `create_event_stream()` before calling `scan_latest*` so listeners are registered.
198+
<!-- TODO: uncomment once implemented - The function returns after delivering the messages; to continuously stream new blocks, use `scan_latest_then_live`. -->
199+
148200
---
149201

150202
## Examples
151203

152204
- `examples/simple_counter` – minimal live-mode scanner
153205
- `examples/historical_scanning` – demonstrates replaying from genesis (block 0) before continuing streaming latest blocks
206+
- `examples/latest_events_scanning` – demonstrates scanning the latest events
154207

155208
Run an example with:
156209

@@ -166,10 +219,11 @@ Both examples spin up a local `anvil` instance, deploy a demo counter contract,
166219

167220
## Testing
168221

169-
Integration tests cover live, historical, and hybrid flows:
170222
(We recommend using [nextest](https://crates.io/crates/cargo-nextest) to run the tests)
171223

224+
Integration tests cover all modes:
225+
172226
```bash
173-
cargo nextest run
227+
cargo nextest run --features test-utils
174228
```
175229

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "latest_events_scanning"
3+
version.workspace = true
4+
edition.workspace = true
5+
license.workspace = true
6+
repository.workspace = true
7+
publish = false
8+
9+
[[bin]]
10+
name = "latest_events_scanning"
11+
path = "main.rs"
12+
13+
[dependencies]
14+
alloy.workspace = true
15+
alloy-node-bindings.workspace = true
16+
tokio.workspace = true
17+
tokio-stream.workspace = true
18+
anyhow.workspace = true
19+
tracing.workspace = true
20+
tracing-subscriber.workspace = true
21+
event-scanner = { path = "../.." }
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent};
2+
use alloy_node_bindings::Anvil;
3+
use event_scanner::{
4+
EventFilter,
5+
event_scanner::{EventScanner, EventScannerMessage},
6+
};
7+
8+
use tokio_stream::StreamExt;
9+
use tracing::{error, info};
10+
use tracing_subscriber::EnvFilter;
11+
12+
sol! {
13+
#[allow(missing_docs)]
14+
#[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")]
15+
contract Counter {
16+
uint256 public count;
17+
18+
event CountIncreased(uint256 newCount);
19+
event CountDecreased(uint256 newCount);
20+
21+
function increase() public {
22+
count += 1;
23+
emit CountIncreased(count);
24+
}
25+
26+
function decrease() public {
27+
require(count > 0, "Count cannot be negative");
28+
count -= 1;
29+
emit CountDecreased(count);
30+
}
31+
32+
function getCount() public view returns (uint256) {
33+
return count;
34+
}
35+
}
36+
}
37+
38+
#[tokio::main]
39+
async fn main() -> anyhow::Result<()> {
40+
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();
41+
42+
let anvil = Anvil::new().block_time_f64(0.5).try_spawn()?;
43+
let wallet = anvil.wallet();
44+
let provider =
45+
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;
46+
let counter_contract = Counter::deploy(provider).await?;
47+
48+
let contract_address = counter_contract.address();
49+
50+
let increase_filter = EventFilter::new()
51+
.with_contract_address(*contract_address)
52+
.with_event(Counter::CountIncreased::SIGNATURE);
53+
54+
let mut client = EventScanner::new().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
55+
56+
let mut stream = client.create_event_stream(increase_filter);
57+
58+
for _ in 0..10 {
59+
_ = counter_contract.increase().send().await?;
60+
}
61+
62+
tokio::spawn(async move {
63+
client.scan_latest(5).await.expect("failed to start scanner");
64+
});
65+
66+
// only the last 5 events will be streamed
67+
while let Some(message) = stream.next().await {
68+
match message {
69+
EventScannerMessage::Data(logs) => {
70+
for log in logs {
71+
info!("Callback successfully executed with event {:?}", log.inner.data);
72+
}
73+
}
74+
EventScannerMessage::Error(e) => {
75+
error!("Received error: {}", e);
76+
}
77+
EventScannerMessage::Status(info) => {
78+
info!("Received info: {:?}", info);
79+
}
80+
}
81+
}
82+
83+
Ok(())
84+
}

0 commit comments

Comments
 (0)