Skip to content

Commit 151d691

Browse files
pepebndcLeoPatOZ0xNeshi
authored
Make Eventfilter fields optional (#60)
Co-authored-by: Leo <[email protected]> Co-authored-by: Nenad <[email protected]>
1 parent 72a78f7 commit 151d691

File tree

15 files changed

+421
-88
lines changed

15 files changed

+421
-88
lines changed

README.md

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,10 @@ impl EventCallback for CounterCallback {
7474
}
7575

7676
async fn run_scanner(ws_url: alloy::transports::http::reqwest::Url, contract: alloy::primitives::Address) -> anyhow::Result<()> {
77-
let filter = EventFilter {
78-
contract_address: contract,
79-
event: MyContract::SomeEvent::SIGNATURE.to_owned(),
80-
callback: Arc::new(CounterCallback { processed: Arc::new(AtomicUsize::new(0)) }),
81-
};
77+
let filter = EventFilter::new()
78+
.with_contract_address(contract)
79+
.with_event(MyContract::SomeEvent::SIGNATURE)
80+
.with_callback(Arc::new(CounterCallback { processed: Arc::new(AtomicUsize::new(0)) }));
8281

8382
let mut scanner = EventScannerBuilder::new()
8483
.with_event_filter(filter)
@@ -111,16 +110,67 @@ Once configured, connect using either `connect_ws::<Ethereum>(ws_url)` or `conne
111110

112111
Create an `EventFilter` for each contract/event pair you want to track. The filter bundles the contract address, the event signature (from `SolEvent::SIGNATURE`), and an `Arc<dyn EventCallback + Send + Sync>`.
113112

113+
Both `contract_address` and `event` fields are optional, allowing for flexible event tracking.
114+
115+
You can construct EventFilters using either the builder pattern (recommended) or direct struct construction:
116+
117+
### Builder Pattern (Recommended)
118+
119+
```rust
120+
// Track a specific event from a specific contract
121+
let specific_filter = EventFilter::new()
122+
.with_contract_address(*counter_contract.address())
123+
.with_event(Counter::CountIncreased::SIGNATURE)
124+
.with_callback(Arc::new(CounterCallback));
125+
126+
// Track ALL events from a specific contract
127+
let all_contract_events_filter = EventFilter::new()
128+
.with_contract_address(*counter_contract.address())
129+
.with_callback(Arc::new(AllEventsCallback));
130+
131+
// Track ALL events from ALL contracts in the block range
132+
let all_events_filter = EventFilter::new()
133+
.with_callback(Arc::new(GlobalEventsCallback));
134+
```
135+
136+
### Direct Struct Construction
137+
114138
```rust
115-
let filter = EventFilter {
116-
contract_address: *counter_contract.address(),
117-
event: Counter::CountIncreased::SIGNATURE.to_owned(),
139+
// Track a specific event from a specific contract (traditional usage)
140+
let specific_filter = EventFilter {
141+
contract_address: Some(*counter_contract.address()),
142+
event: Some(Counter::CountIncreased::SIGNATURE.to_owned()),
118143
callback: Arc::new(CounterCallback),
119144
};
145+
146+
// Track ALL events from a specific contract
147+
let all_contract_events_filter = EventFilter {
148+
contract_address: Some(*counter_contract.address()),
149+
event: None, // Will track all events from this contract
150+
callback: Arc::new(AllEventsCallback),
151+
};
152+
153+
// Track ALL events from ALL contracts in the block range
154+
let all_events_filter = EventFilter {
155+
contract_address: None, // Will track events from all contracts
156+
event: None, // Will track all event types
157+
callback: Arc::new(GlobalEventsCallback),
158+
};
120159
```
121160

122161
Register multiple filters by calling either `with_event_filter` repeatedly or `with_event_filters` once.
123162

163+
#### Use Cases for Optional Fields
164+
165+
The optional `contract_address` and `event` fields enable several powerful use cases:
166+
167+
- **Track all events from a specific contract**: Set `contract_address` but leave `event` as `None`
168+
- **Track all events across all contracts**: Set both `contract_address` and `event` as `None`
169+
- **Track specific events from specific contracts**: Set both fields (traditional usage)
170+
- **Mixed filtering**: Use multiple filters with different optional field combinations
171+
172+
This flexibility allows you to build sophisticated event monitoring systems that can track events at different granularities depending on your application's needs.
173+
124174

125175
### Scanning Modes
126176

examples/historical_scanning/main.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,10 @@ async fn main() -> anyhow::Result<()> {
5959

6060
let contract_address = counter_contract.address();
6161

62-
let increase_filter = EventFilter {
63-
contract_address: *contract_address,
64-
event: Counter::CountIncreased::SIGNATURE.to_owned(),
65-
callback: Arc::new(CounterCallback),
66-
};
62+
let increase_filter = EventFilter::new()
63+
.with_contract_address(*contract_address)
64+
.with_event(Counter::CountIncreased::SIGNATURE)
65+
.with_callback(Arc::new(CounterCallback));
6766

6867
let _ = counter_contract.increase().send().await?.get_receipt().await?;
6968

examples/simple_counter/main.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,10 @@ async fn main() -> anyhow::Result<()> {
5959

6060
let contract_address = counter_contract.address();
6161

62-
let increase_filter = EventFilter {
63-
contract_address: *contract_address,
64-
event: Counter::CountIncreased::SIGNATURE.to_owned(),
65-
callback: Arc::new(CounterCallback),
66-
};
62+
let increase_filter = EventFilter::new()
63+
.with_contract_address(*contract_address)
64+
.with_event(Counter::CountIncreased::SIGNATURE)
65+
.with_callback(Arc::new(CounterCallback));
6766

6867
let mut scanner = EventScannerBuilder::new()
6968
.with_event_filter(increase_filter)

src/event_filter.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::sync::Arc;
2+
3+
use alloy::primitives::Address;
4+
5+
use crate::callback::EventCallback;
6+
7+
#[derive(Clone)]
8+
pub struct EventFilter {
9+
/// Contract address to filter events from. If None, events from all contracts will be tracked.
10+
pub contract_address: Option<Address>,
11+
/// Human-readable event signature, e.g. "Transfer(address,address,uint256)".
12+
/// If None, all events from the specified contract(s) will be tracked.
13+
pub event: Option<String>,
14+
pub callback: Arc<dyn EventCallback + Send + Sync>,
15+
}
16+
17+
impl EventFilter {
18+
/// Creates a new `EventFilter` builder.
19+
///
20+
/// # Examples
21+
///
22+
/// ```rust
23+
/// use event_scanner::{EventFilter, EventCallback};
24+
/// use std::sync::Arc;
25+
/// use alloy::primitives::Address;
26+
///
27+
/// # struct MyCallback;
28+
/// # #[async_trait::async_trait]
29+
/// # impl EventCallback for MyCallback {
30+
/// # async fn on_event(&self, _log: &alloy::rpc::types::Log) -> anyhow::Result<()> { Ok(()) }
31+
/// # }
32+
/// # async fn example() -> anyhow::Result<()> {
33+
/// let contract_address = Address::ZERO;
34+
/// let callback = Arc::new(MyCallback);
35+
/// let filter = EventFilter::new()
36+
/// .with_contract_address(contract_address)
37+
/// .with_event("Transfer(address,address,uint256)")
38+
/// .with_callback(callback);
39+
/// # Ok(())
40+
/// # }
41+
/// ```
42+
#[must_use]
43+
#[allow(clippy::new_ret_no_self)]
44+
pub fn new() -> EventFilterBuilder {
45+
EventFilterBuilder::default()
46+
}
47+
}
48+
49+
/// Builder for constructing `EventFilter` instances with optional fields.
50+
#[derive(Default)]
51+
pub struct EventFilterBuilder {
52+
contract_address: Option<Address>,
53+
event: Option<String>,
54+
}
55+
56+
impl EventFilterBuilder {
57+
/// Sets the contract address to filter events from.
58+
/// If not set, events from all contracts will be tracked.
59+
#[must_use]
60+
pub fn with_contract_address(mut self, contract_address: Address) -> Self {
61+
self.contract_address = Some(contract_address);
62+
self
63+
}
64+
65+
/// Sets the event signature to filter specific events.
66+
/// If not set, all events from the specified contract(s) will be tracked.
67+
#[must_use]
68+
pub fn with_event(mut self, event: impl Into<String>) -> Self {
69+
self.event = Some(event.into());
70+
self
71+
}
72+
73+
/// Sets the callback for processing events and builds the `EventFilter`.
74+
///
75+
/// # Panics
76+
///
77+
/// Panics if the callback is not set, as it's required for event processing.
78+
#[must_use]
79+
pub fn with_callback(self, callback: Arc<dyn EventCallback + Send + Sync>) -> EventFilter {
80+
EventFilter { contract_address: self.contract_address, event: self.event, callback }
81+
}
82+
}

src/event_scanner.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc};
33
use crate::{
44
block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner},
55
callback::strategy::{CallbackStrategy, StateSyncAwareStrategy},
6-
types::EventFilter,
6+
event_filter::EventFilter,
77
};
88
use alloy::{
99
eips::BlockNumberOrTag,
@@ -134,8 +134,8 @@ pub struct EventScanner<N: Network> {
134134

135135
#[derive(Hash, Eq, PartialEq)]
136136
struct EventIdentifier {
137-
contract_address: Address,
138-
event: String,
137+
contract_address: Option<Address>,
138+
event: Option<String>,
139139
}
140140

141141
impl<N: Network> EventScanner<N> {
@@ -165,12 +165,8 @@ impl<N: Network> EventScanner<N> {
165165

166166
let callback = filter.callback.clone();
167167
let strategy = self.callback_strategy.clone();
168-
Self::spawn_event_callback_task_executors(
169-
receiver,
170-
callback,
171-
strategy,
172-
filter.event.clone(),
173-
);
168+
let event_name = filter.event.clone().unwrap_or_else(|| "all events".to_string());
169+
Self::spawn_event_callback_task_executors(receiver, callback, strategy, event_name);
174170

175171
event_channels.insert(unique_event, sender);
176172
}
@@ -225,20 +221,31 @@ impl<N: Network> EventScanner<N> {
225221
event_channels: &HashMap<EventIdentifier, mpsc::Sender<Log>>,
226222
) -> anyhow::Result<()> {
227223
for event_filter in &self.tracked_events {
228-
let filter = Filter::new()
229-
.address(event_filter.contract_address)
230-
.event(event_filter.event.as_str())
231-
.from_block(from_block)
232-
.to_block(to_block);
224+
let mut filter = Filter::new().from_block(from_block).to_block(to_block);
225+
226+
// Add contract address filter if specified
227+
if let Some(contract_address) = event_filter.contract_address {
228+
filter = filter.address(contract_address);
229+
}
230+
231+
// Add event signature filter if specified
232+
if let Some(event_signature) = &event_filter.event {
233+
filter = filter.event(event_signature.as_str());
234+
}
233235

234236
match self.block_range_scanner.provider().get_logs(&filter).await {
235237
Ok(logs) => {
236238
if logs.is_empty() {
237239
continue;
238240
}
241+
let contract_display = event_filter
242+
.contract_address
243+
.map_or_else(|| "all contracts".to_string(), |addr| format!("{addr:?}"));
244+
let event_display = event_filter.event.as_deref().map_or("all events", |s| s);
245+
239246
info!(
240-
contract = ?event_filter.contract_address,
241-
event = %event_filter.event,
247+
contract = %contract_display,
248+
event = %event_display,
242249
log_count = logs.len(),
243250
from_block,
244251
to_block,
@@ -253,17 +260,34 @@ impl<N: Network> EventScanner<N> {
253260
if let Some(sender) = event_channels.get(&event_identifier) {
254261
for log in logs {
255262
if let Err(e) = sender.send(log).await {
256-
warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing");
263+
let contract_display = event_filter.contract_address.map_or_else(
264+
|| "all contracts".to_string(),
265+
|addr| format!("{addr:?}"),
266+
);
267+
let event_display =
268+
event_filter.event.as_deref().map_or("all events", |s| s);
269+
warn!(contract = %contract_display, event = %event_display, error = %e, "failed to enqueue log for processing");
257270
}
258271
}
259272
} else {
260-
warn!(event = %event_filter.event, "no channel found for event type");
273+
let contract_display = event_filter.contract_address.map_or_else(
274+
|| "all contracts".to_string(),
275+
|addr| format!("{addr:?}"),
276+
);
277+
let event_display =
278+
event_filter.event.as_deref().map_or("all events", |s| s);
279+
warn!(contract = %contract_display, event = %event_display, "no channel found for event type");
261280
}
262281
}
263282
Err(e) => {
283+
let contract_display = event_filter
284+
.contract_address
285+
.map_or_else(|| "all contracts".to_string(), |addr| format!("{addr:?}"));
286+
let event_display = event_filter.event.as_deref().map_or("all events", |s| s);
287+
264288
error!(
265-
contract = ?event_filter.contract_address,
266-
event = %event_filter.event,
289+
contract = %contract_display,
290+
event = %event_display,
267291
error = %e,
268292
from_block,
269293
to_block,

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub mod block_range_scanner;
22
pub mod callback;
3+
pub mod event_filter;
34
pub mod event_scanner;
4-
pub mod types;
55

66
pub use crate::{
77
callback::{
@@ -11,5 +11,5 @@ pub use crate::{
1111
StateSyncConfig,
1212
},
1313
},
14-
types::EventFilter,
14+
event_filter::{EventFilter, EventFilterBuilder},
1515
};

src/types.rs

Lines changed: 0 additions & 13 deletions
This file was deleted.

tests/historic_mode/basic.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
};
88

99
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
10-
use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter};
10+
use event_scanner::{event_filter::EventFilter, event_scanner::EventScannerBuilder};
1111
use tokio::time::{sleep, timeout};
1212

1313
use crate::{
@@ -26,8 +26,8 @@ async fn processes_events_within_specified_historical_range() -> anyhow::Result<
2626
let callback = Arc::new(BasicCounterCallback { count: Arc::clone(&event_count) });
2727

2828
let filter = EventFilter {
29-
contract_address,
30-
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
29+
contract_address: Some(contract_address),
30+
event: Some(TestCounter::CountIncreased::SIGNATURE.to_owned()),
3131
callback,
3232
};
3333

tests/historic_to_live/basic.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
4-
use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter};
4+
use event_scanner::{event_filter::EventFilter, event_scanner::EventScannerBuilder};
55
use tokio::time::{Duration, sleep, timeout};
66

77
use crate::{
@@ -31,8 +31,8 @@ async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> {
3131
let callback = Arc::new(EventOrderingCallback { counts: Arc::clone(&event_new_counts) });
3232

3333
let filter = EventFilter {
34-
contract_address,
35-
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
34+
contract_address: Some(contract_address),
35+
event: Some(TestCounter::CountIncreased::SIGNATURE.to_owned()),
3636
callback,
3737
};
3838

0 commit comments

Comments
 (0)