Skip to content

Commit cdaed6e

Browse files
authored
Make Event Scanner Stream Events Back to the Caller Instead of Invoking Callbacks (#63)
1 parent 05ee9c6 commit cdaed6e

File tree

27 files changed

+429
-1051
lines changed

27 files changed

+429
-1051
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Thanks for your interest in contributing! This guide explains how to set up your
66

77
## Project Overview
88

9-
`event-scanner` is a Rust library for monitoring EVM-based smart contract events. It is built on the `alloy` ecosystem and provides in-memory scanning with retry-aware callback execution. See `README.md` for features, usage, examples, and testing notes.
9+
`event-scanner` is a Rust library for monitoring and streaming EVM-based smart contract events. It is built on the `alloy` ecosystem and provides in-memory scanning. See `README.md` for features, usage, examples, and testing notes.
1010

1111
- Workspace manifest: `Cargo.toml`
1212
- Library code: `src/`

Cargo.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@ async fn run_scanner(ws_url: alloy::transports::http::reqwest::Url, contract: al
8181
.with_event(MyContract::SomeEvent::SIGNATURE)
8282
.with_callback(Arc::new(CounterCallback { processed: Arc::new(AtomicUsize::new(0)) }));
8383

84-
let mut scanner = EventScannerBuilder::new()
84+
let mut client = EventScannerBuilder::new()
8585
.with_event_filter(filter)
8686
.connect_ws::<Ethereum>(ws_url)
8787
.await?;
8888

89-
scanner.start(BlockNumberOrTag::Latest, None).await?;
89+
client.start_scanner(BlockNumberOrTag::Latest, None).await?;
9090
Ok(())
9191
}
9292
```

examples/historical_scanning/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ path = "main.rs"
1414
alloy.workspace = true
1515
alloy-node-bindings.workspace = true
1616
tokio.workspace = true
17+
tokio-stream.workspace = true
1718
anyhow.workspace = true
18-
async-trait.workspace = true
1919
tracing.workspace = true
2020
tracing-subscriber.workspace = true
21-
hex.workspace = true
2221
event-scanner = { path = "../.." }

examples/historical_scanning/main.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,16 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::time::Duration;
22

33
use alloy::{
4-
eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol,
5-
sol_types::SolEvent,
4+
eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent,
65
};
76
use alloy_node_bindings::Anvil;
8-
use async_trait::async_trait;
9-
use event_scanner::{EventCallback, EventFilter, event_scanner::EventScannerBuilder};
7+
use event_scanner::{EventFilter, event_scanner::EventScanner};
108

119
use tokio::time::sleep;
10+
use tokio_stream::StreamExt;
1211
use tracing::info;
1312
use tracing_subscriber::EnvFilter;
1413

15-
struct CounterCallback;
16-
17-
#[async_trait]
18-
impl EventCallback for CounterCallback {
19-
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
20-
info!("Callback successfully executed with event {:?}", log.inner.data);
21-
Ok(())
22-
}
23-
}
24-
2514
sol! {
2615
#[allow(missing_docs)]
2716
#[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")]
@@ -61,18 +50,22 @@ async fn main() -> anyhow::Result<()> {
6150

6251
let increase_filter = EventFilter::new()
6352
.with_contract_address(*contract_address)
64-
.with_event(Counter::CountIncreased::SIGNATURE)
65-
.with_callback(Arc::new(CounterCallback));
53+
.with_event(Counter::CountIncreased::SIGNATURE);
6654

6755
let _ = counter_contract.increase().send().await?.get_receipt().await?;
6856

69-
let mut scanner = EventScannerBuilder::new()
70-
.with_event_filter(increase_filter)
71-
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
72-
.await?;
57+
let mut client = EventScanner::new().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
58+
59+
let mut stream = client.create_event_stream(increase_filter);
7360

7461
sleep(Duration::from_secs(10)).await;
75-
scanner.start(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner");
62+
client.start_scanner(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner");
63+
64+
while let Some(Ok(logs)) = stream.next().await {
65+
for log in logs {
66+
info!("Callback successfully executed with event {:?}", log.inner.data);
67+
}
68+
}
7669

7770
Ok(())
7871
}

examples/simple_counter/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ path = "main.rs"
1414
alloy.workspace = true
1515
alloy-node-bindings.workspace = true
1616
tokio.workspace = true
17+
tokio-stream.workspace = true
1718
anyhow.workspace = true
18-
async-trait.workspace = true
1919
tracing.workspace = true
2020
tracing-subscriber.workspace = true
21-
hex.workspace = true
2221
event-scanner = { path = "../.." }

examples/simple_counter/main.rs

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,42 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::time::Duration;
22

33
use alloy::{
4-
eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol,
5-
sol_types::SolEvent,
4+
eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent,
65
};
76
use alloy_node_bindings::Anvil;
8-
use async_trait::async_trait;
9-
use event_scanner::{EventCallback, EventFilter, event_scanner::EventScannerBuilder};
7+
use event_scanner::{EventFilter, event_scanner::EventScanner};
108

119
use tokio::time::sleep;
10+
use tokio_stream::StreamExt;
1211
use tracing::info;
1312
use tracing_subscriber::EnvFilter;
1413

15-
struct CounterCallback;
16-
17-
#[async_trait]
18-
impl EventCallback for CounterCallback {
19-
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
20-
info!("Callback successfully executed with event {:?}", log.inner.data);
21-
Ok(())
22-
}
23-
}
24-
2514
sol! {
2615
#[allow(missing_docs)]
2716
#[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")]
28-
contract Counter {
29-
uint256 public count;
17+
contract Counter {
18+
uint256 public count;
3019

31-
event CountIncreased(uint256 newCount);
32-
event CountDecreased(uint256 newCount);
20+
event CountIncreased(uint256 newCount);
21+
event CountDecreased(uint256 newCount);
3322

34-
function increase() public {
35-
count += 1;
36-
emit CountIncreased(count);
37-
}
23+
function increase() public {
24+
count += 1;
25+
emit CountIncreased(count);
26+
}
3827

39-
function decrease() public {
40-
require(count > 0, "Count cannot be negative");
41-
count -= 1;
42-
emit CountDecreased(count);
43-
}
28+
function decrease() public {
29+
require(count > 0, "Count cannot be negative");
30+
count -= 1;
31+
emit CountDecreased(count);
32+
}
4433

45-
function getCount() public view returns (uint256) {
46-
return count;
34+
function getCount() public view returns (uint256) {
35+
return count;
36+
}
4737
}
4838
}
49-
}
39+
5040
#[tokio::main]
5141
async fn main() -> anyhow::Result<()> {
5242
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();
@@ -61,16 +51,17 @@ async fn main() -> anyhow::Result<()> {
6151

6252
let increase_filter = EventFilter::new()
6353
.with_contract_address(*contract_address)
64-
.with_event(Counter::CountIncreased::SIGNATURE)
65-
.with_callback(Arc::new(CounterCallback));
54+
.with_event(Counter::CountIncreased::SIGNATURE);
55+
56+
let mut client = EventScanner::new().connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
6657

67-
let mut scanner = EventScannerBuilder::new()
68-
.with_event_filter(increase_filter)
69-
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
70-
.await?;
58+
let mut stream = client.create_event_stream(increase_filter);
7159

7260
let task_1 = tokio::spawn(async move {
73-
scanner.start(BlockNumberOrTag::Latest, None).await.expect("failed to start scanner");
61+
client
62+
.start_scanner(BlockNumberOrTag::Latest, None)
63+
.await
64+
.expect("failed to start scanner");
7465
});
7566

7667
let task_2 = tokio::spawn(async move {
@@ -83,6 +74,12 @@ async fn main() -> anyhow::Result<()> {
8374
}
8475
});
8576

77+
while let Some(Ok(logs)) = stream.next().await {
78+
for log in logs {
79+
info!("Callback successfully executed with event {:?}", log.inner.data);
80+
}
81+
}
82+
8683
task_1.await.ok();
8784
task_2.await.ok();
8885

src/block_range_scanner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
8989
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9090
// const BACK_OFF_MAX_RETRIES: u64 = 5;
9191

92-
const MAX_BUFFERED_MESSAGES: usize = 50000;
92+
pub const MAX_BUFFERED_MESSAGES: usize = 50000;
9393

9494
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
9595

@@ -291,7 +291,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
291291
/// # Errors
292292
///
293293
/// Returns an error if the subscription service fails to start.
294-
pub fn run(&self) -> anyhow::Result<BlockRangeScannerClient> {
294+
pub fn run(&self) -> Result<BlockRangeScannerClient, Error> {
295295
let (service, cmd_tx) = Service::new(self.config.clone(), self.provider.clone());
296296
tokio::spawn(async move {
297297
service.run().await;
@@ -1017,7 +1017,7 @@ mod tests {
10171017
while let Some(result) = receiver.next().await {
10181018
match result {
10191019
Ok(range) => {
1020-
println!("Received block range: [{range:?}]");
1020+
info!("Received block range: [{range:?}]");
10211021
if block_range_start == 0 {
10221022
block_range_start = *range.start();
10231023
}

src/callback/mod.rs

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/callback/strategy/fixed_retry.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)