Skip to content

Commit 939f9f6

Browse files
LeoPatOZ0xNeshi
andauthored
Migrate event scanner tests (#20)
Co-authored-by: 0xNeshi <[email protected]>
1 parent 85f1c3f commit 939f9f6

File tree

16 files changed

+897
-76
lines changed

16 files changed

+897
-76
lines changed

examples/historical_scanning/main.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,10 @@ async fn main() -> anyhow::Result<()> {
6767

6868
let _ = counter_contract.increase().send().await?.get_receipt().await?;
6969

70-
let mut builder = EventScannerBuilder::new();
71-
72-
builder.with_event_filter(increase_filter);
73-
74-
let mut scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
70+
let mut scanner = EventScannerBuilder::new()
71+
.with_event_filter(increase_filter)
72+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
73+
.await?;
7574

7675
sleep(Duration::from_secs(10)).await;
7776
scanner.start(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner");

examples/simple_counter/main.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,12 @@ async fn main() -> anyhow::Result<()> {
6565
callback: Arc::new(CounterCallback),
6666
};
6767

68-
let mut builder = EventScannerBuilder::new();
69-
70-
builder.with_event_filter(increase_filter);
71-
72-
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
68+
let mut scanner = EventScannerBuilder::new()
69+
.with_event_filter(increase_filter)
70+
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
71+
.await?;
7372

7473
let task_1 = tokio::spawn(async move {
75-
let mut scanner = scanner;
7674
scanner.start(BlockNumberOrTag::Latest, None).await.expect("failed to start scanner");
7775
});
7876

src/block_scanner.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -212,25 +212,25 @@ impl BlockScanner {
212212
}
213213

214214
#[must_use]
215-
pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self {
215+
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
216216
self.blocks_read_per_epoch = blocks_read_per_epoch;
217217
self
218218
}
219219

220220
#[must_use]
221-
pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self {
221+
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
222222
self.reorg_rewind_depth = reorg_rewind_depth;
223223
self
224224
}
225225

226226
#[must_use]
227-
pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self {
227+
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
228228
self.retry_interval = retry_interval;
229229
self
230230
}
231231

232232
#[must_use]
233-
pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self {
233+
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
234234
self.block_confirmations = block_confirmations;
235235
self
236236
}
@@ -241,7 +241,7 @@ impl BlockScanner {
241241
///
242242
/// Returns an error if the connection fails
243243
pub async fn connect_ws<N: Network>(
244-
&self,
244+
self,
245245
ws_url: Url,
246246
) -> TransportResult<ConnectedBlockScanner<N>> {
247247
let provider =
@@ -263,7 +263,7 @@ impl BlockScanner {
263263
///
264264
/// Returns an error if the connection fails
265265
pub async fn connect_ipc<N: Network>(
266-
&self,
266+
self,
267267
ipc_path: String,
268268
) -> TransportResult<ConnectedBlockScanner<N>> {
269269
let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);

src/event_scanner.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
use alloy::{
99
eips::BlockNumberOrTag,
1010
network::Network,
11+
primitives::Address,
1112
providers::Provider,
1213
rpc::types::{Filter, Log},
1314
transports::http::reqwest::Url,
@@ -38,38 +39,45 @@ impl EventScannerBuilder {
3839
}
3940
}
4041

41-
pub fn with_event_filter(&mut self, filter: EventFilter) -> &mut Self {
42+
#[must_use]
43+
pub fn with_event_filter(mut self, filter: EventFilter) -> Self {
4244
self.tracked_events.push(filter);
4345
self
4446
}
4547

46-
pub fn with_event_filters(&mut self, filters: Vec<EventFilter>) -> &mut Self {
48+
#[must_use]
49+
pub fn with_event_filters(mut self, filters: Vec<EventFilter>) -> Self {
4750
self.tracked_events.extend(filters);
4851
self
4952
}
5053

51-
pub fn with_callback_strategy(&mut self, strategy: Arc<dyn CallbackStrategy>) -> &mut Self {
54+
#[must_use]
55+
pub fn with_callback_strategy(mut self, strategy: Arc<dyn CallbackStrategy>) -> Self {
5256
self.callback_strategy = strategy;
5357
self
5458
}
5559

56-
pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self {
57-
let _ = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
60+
#[must_use]
61+
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
62+
self.block_scanner = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
5863
self
5964
}
6065

61-
pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self {
62-
let _ = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
66+
#[must_use]
67+
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
68+
self.block_scanner = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
6369
self
6470
}
6571

66-
pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self {
67-
let _ = self.block_scanner.with_retry_interval(retry_interval);
72+
#[must_use]
73+
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
74+
self.block_scanner = self.block_scanner.with_retry_interval(retry_interval);
6875
self
6976
}
7077

71-
pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self {
72-
let _ = self.block_scanner.with_block_confirmations(block_confirmations);
78+
#[must_use]
79+
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
80+
self.block_scanner = self.block_scanner.with_block_confirmations(block_confirmations);
7381
self
7482
}
7583

@@ -119,6 +127,12 @@ pub struct EventScanner<N: Network> {
119127
callback_strategy: Arc<dyn CallbackStrategy>,
120128
}
121129

130+
#[derive(Hash, Eq, PartialEq)]
131+
struct EventIdentifier {
132+
contract_address: Address,
133+
event: String,
134+
}
135+
122136
impl<N: Network> EventScanner<N> {
123137
/// Starts the scanner
124138
///
@@ -130,29 +144,31 @@ impl<N: Network> EventScanner<N> {
130144
start_height: BlockNumberOrTag,
131145
end_height: Option<BlockNumberOrTag>,
132146
) -> anyhow::Result<()> {
133-
let mut event_channels: HashMap<String, mpsc::Sender<Log>> = HashMap::new();
147+
let mut event_channels: HashMap<EventIdentifier, mpsc::Sender<Log>> = HashMap::new();
134148

135149
for filter in &self.tracked_events {
136-
let event_name = filter.event.clone();
150+
let unique_event = EventIdentifier {
151+
contract_address: filter.contract_address,
152+
event: filter.event.clone(),
153+
};
137154

138-
if event_channels.contains_key(&event_name) {
155+
if event_channels.contains_key(&unique_event) {
139156
continue;
140157
}
141158

142159
// TODO: configurable buffer size / smaller buffer ?
143160
let (sender, receiver) = mpsc::channel::<Log>(1024);
144161

145-
let event_name_clone = event_name.clone();
146162
let callback = filter.callback.clone();
147163
let strategy = self.callback_strategy.clone();
148164
Self::spawn_event_callback_task_executors(
149165
receiver,
150166
callback,
151167
strategy,
152-
event_name_clone,
168+
filter.event.clone(),
153169
);
154170

155-
event_channels.insert(event_name, sender);
171+
event_channels.insert(unique_event, sender);
156172
}
157173

158174
let client = self.block_scanner.run()?;
@@ -199,7 +215,7 @@ impl<N: Network> EventScanner<N> {
199215
&self,
200216
from_block: u64,
201217
to_block: u64,
202-
event_channels: &HashMap<String, mpsc::Sender<Log>>,
218+
event_channels: &HashMap<EventIdentifier, mpsc::Sender<Log>>,
203219
) -> anyhow::Result<()> {
204220
for event_filter in &self.tracked_events {
205221
let filter = Filter::new()
@@ -222,7 +238,12 @@ impl<N: Network> EventScanner<N> {
222238
"found logs for event in block range"
223239
);
224240

225-
if let Some(sender) = event_channels.get(&event_filter.event) {
241+
let event_identifier = EventIdentifier {
242+
contract_address: event_filter.contract_address,
243+
event: event_filter.event.clone(),
244+
};
245+
246+
if let Some(sender) = event_channels.get(&event_identifier) {
226247
for log in logs {
227248
if let Err(e) = sender.send(log).await {
228249
warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing");

tests/common/mock_callbacks.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use std::{
2+
sync::{
3+
Arc,
4+
atomic::{AtomicU64, AtomicUsize, Ordering},
5+
},
6+
time::Duration,
7+
};
8+
9+
use alloy::{rpc::types::Log, sol_types::SolEvent};
10+
use async_trait::async_trait;
11+
use event_scanner::EventCallback;
12+
use tokio::{sync::Mutex, time::sleep};
13+
14+
use crate::common::TestCounter;
15+
16+
pub struct BasicCounterCallback {
17+
pub count: Arc<AtomicUsize>,
18+
}
19+
20+
#[async_trait]
21+
impl EventCallback for BasicCounterCallback {
22+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
23+
self.count.fetch_add(1, Ordering::SeqCst);
24+
Ok(())
25+
}
26+
}
27+
28+
pub struct SlowProcessorCallback {
29+
pub delay_ms: u64,
30+
pub processed: Arc<AtomicUsize>,
31+
}
32+
33+
#[async_trait]
34+
impl EventCallback for SlowProcessorCallback {
35+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
36+
sleep(Duration::from_millis(self.delay_ms)).await;
37+
self.processed.fetch_add(1, Ordering::SeqCst);
38+
Ok(())
39+
}
40+
}
41+
42+
/// A callback that fails `max_fail_times` attempts before succeeding once.
43+
pub struct FlakyCallback {
44+
pub attempts: Arc<AtomicUsize>,
45+
pub successes: Arc<AtomicUsize>,
46+
pub max_fail_times: usize,
47+
}
48+
49+
#[async_trait]
50+
impl EventCallback for FlakyCallback {
51+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
52+
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
53+
if attempt <= self.max_fail_times {
54+
anyhow::bail!("intentional failure on attempt {attempt}");
55+
}
56+
self.successes.fetch_add(1, Ordering::SeqCst);
57+
Ok(())
58+
}
59+
}
60+
61+
// A callback that always fails and records attempts.
62+
pub struct AlwaysFailingCallback {
63+
pub attempts: Arc<AtomicU64>,
64+
}
65+
66+
#[async_trait]
67+
impl EventCallback for AlwaysFailingCallback {
68+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
69+
self.attempts.fetch_add(1, Ordering::SeqCst);
70+
anyhow::bail!("always failing callback")
71+
}
72+
}
73+
74+
// Captures block numbers in the order they are processed.
75+
pub struct BlockOrderingCallback {
76+
pub blocks: Arc<Mutex<Vec<u64>>>,
77+
}
78+
79+
#[async_trait]
80+
impl EventCallback for BlockOrderingCallback {
81+
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
82+
let mut guard = self.blocks.lock().await;
83+
if let Some(n) = log.block_number {
84+
guard.push(n);
85+
}
86+
Ok(())
87+
}
88+
}
89+
90+
// Captures decoded CountIncreased `newCount` values to verify callback/event ordering.
91+
pub struct EventOrderingCallback {
92+
pub counts: Arc<Mutex<Vec<u64>>>,
93+
}
94+
95+
#[async_trait]
96+
impl EventCallback for EventOrderingCallback {
97+
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
98+
if let Some(&TestCounter::CountIncreased::SIGNATURE_HASH) = log.topic0() {
99+
let TestCounter::CountIncreased { newCount } = log.log_decode()?.inner.data;
100+
let mut guard = self.counts.lock().await;
101+
guard.push(newCount.try_into().unwrap());
102+
}
103+
Ok(())
104+
}
105+
}

tests/common/mod.rs

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,7 @@
1-
use std::{
2-
sync::{
3-
Arc,
4-
atomic::{AtomicUsize, Ordering},
5-
},
6-
time::Duration,
7-
};
8-
9-
use alloy::{network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol};
1+
use alloy::{network::Ethereum, providers::ProviderBuilder, sol};
102
use alloy_node_bindings::{Anvil, AnvilInstance};
11-
use async_trait::async_trait;
12-
use event_scanner::EventCallback;
13-
use tokio::time::sleep;
3+
4+
pub mod mock_callbacks;
145

156
// Shared test contract used across integration tests
167
sol! {
@@ -39,36 +30,13 @@ sol! {
3930
}
4031
}
4132

42-
pub struct EventCounter {
43-
pub count: Arc<AtomicUsize>,
44-
}
45-
46-
#[async_trait]
47-
impl EventCallback for EventCounter {
48-
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
49-
self.count.fetch_add(1, Ordering::SeqCst);
50-
Ok(())
51-
}
52-
}
53-
54-
pub struct SlowProcessor {
55-
pub delay_ms: u64,
56-
pub processed: Arc<AtomicUsize>,
57-
}
58-
59-
#[async_trait]
60-
impl EventCallback for SlowProcessor {
61-
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
62-
sleep(Duration::from_millis(self.delay_ms)).await;
63-
self.processed.fetch_add(1, Ordering::SeqCst);
64-
Ok(())
65-
}
66-
}
67-
68-
pub fn spawn_anvil(block_time_secs: u64) -> anyhow::Result<AnvilInstance> {
69-
Ok(Anvil::new().block_time(block_time_secs).try_spawn()?)
33+
#[allow(clippy::missing_errors_doc)]
34+
pub fn spawn_anvil(block_time_secs: f64) -> anyhow::Result<AnvilInstance> {
35+
Ok(Anvil::new().block_time_f64(block_time_secs).try_spawn()?)
7036
}
7137

38+
#[allow(clippy::missing_errors_doc)]
39+
#[allow(clippy::missing_panics_doc)]
7240
pub async fn build_provider(
7341
anvil: &AnvilInstance,
7442
) -> anyhow::Result<impl alloy::providers::Provider<Ethereum> + Clone> {
@@ -77,6 +45,7 @@ pub async fn build_provider(
7745
Ok(provider)
7846
}
7947

48+
#[allow(clippy::missing_errors_doc)]
8049
pub async fn deploy_counter<P>(provider: P) -> anyhow::Result<TestCounter::TestCounterInstance<P>>
8150
where
8251
P: alloy::providers::Provider<Ethereum> + Clone,

0 commit comments

Comments
 (0)