Skip to content

Commit 3d76ef5

Browse files
committed
udpate tests and add block ordering
1 parent 11c1c04 commit 3d76ef5

File tree

7 files changed

+180
-159
lines changed

7 files changed

+180
-159
lines changed

tests/common/mock_callbacks.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use std::sync::Arc;
2+
use std::sync::atomic::{AtomicUsize, Ordering};
3+
use std::time::Duration;
4+
5+
use alloy::rpc::types::Log;
6+
use alloy::sol_types::SolEvent;
7+
use async_trait::async_trait;
8+
use event_scanner::EventCallback;
9+
use tokio::sync::Mutex;
10+
use tokio::time::sleep;
11+
12+
use crate::common::TestCounter;
13+
14+
pub struct BasicCounterCallback {
15+
pub count: Arc<AtomicUsize>,
16+
}
17+
18+
#[async_trait]
19+
impl EventCallback for BasicCounterCallback {
20+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
21+
self.count.fetch_add(1, Ordering::SeqCst);
22+
Ok(())
23+
}
24+
}
25+
26+
pub struct SlowProcessorCallback {
27+
pub delay_ms: u64,
28+
pub processed: Arc<AtomicUsize>,
29+
}
30+
31+
#[async_trait]
32+
impl EventCallback for SlowProcessorCallback {
33+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
34+
sleep(Duration::from_millis(self.delay_ms)).await;
35+
self.processed.fetch_add(1, Ordering::SeqCst);
36+
Ok(())
37+
}
38+
}
39+
40+
/// A callback that fails `fail_times` attempts before succeeding once.
41+
pub struct FlakyCallback {
42+
pub attempts: Arc<AtomicUsize>,
43+
pub successes: Arc<AtomicUsize>,
44+
pub fail_times: usize,
45+
}
46+
47+
#[async_trait]
48+
impl EventCallback for FlakyCallback {
49+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
50+
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
51+
if attempt <= self.fail_times {
52+
anyhow::bail!("intentional failure on attempt {attempt}");
53+
}
54+
self.successes.fetch_add(1, Ordering::SeqCst);
55+
Ok(())
56+
}
57+
}
58+
59+
// A callback that always fails and records attempts.
60+
pub struct AlwaysFailingCallback {
61+
pub attempts: Arc<AtomicUsize>,
62+
}
63+
64+
#[async_trait]
65+
impl EventCallback for AlwaysFailingCallback {
66+
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
67+
self.attempts.fetch_add(1, Ordering::SeqCst);
68+
anyhow::bail!("always failing callback")
69+
}
70+
}
71+
72+
// Captures block numbers in the order they are processed.
73+
pub struct BlockOrderingCallback {
74+
pub blocks: Arc<Mutex<Vec<u64>>>,
75+
}
76+
77+
#[async_trait]
78+
impl EventCallback for BlockOrderingCallback {
79+
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
80+
let mut guard = self.blocks.lock().await;
81+
if let Some(n) = log.block_number {
82+
guard.push(n);
83+
}
84+
Ok(())
85+
}
86+
}
87+
88+
// Captures decoded CountIncreased `newCount` values to verify callback/event ordering.
89+
pub struct EventOrderingCallback {
90+
pub counts: Arc<Mutex<Vec<u64>>>,
91+
}
92+
93+
#[async_trait]
94+
impl EventCallback for EventOrderingCallback {
95+
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
96+
if let Some(&TestCounter::CountIncreased::SIGNATURE_HASH) = log.topic0() {
97+
let TestCounter::CountIncreased { newCount } = log.log_decode()?.inner.data;
98+
let mut guard = self.counts.lock().await;
99+
guard.push(newCount.try_into().unwrap());
100+
}
101+
Ok(())
102+
}
103+
}

tests/common/mod.rs

Lines changed: 3 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,7 @@
1-
use std::{
2-
sync::{
3-
Arc,
4-
atomic::{AtomicUsize, Ordering},
5-
},
6-
time::Duration,
7-
};
8-
9-
use alloy::{
10-
network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent,
11-
};
1+
use alloy::{network::Ethereum, providers::ProviderBuilder, sol};
122
use alloy_node_bindings::{Anvil, AnvilInstance};
13-
use async_trait::async_trait;
14-
use event_scanner::EventCallback;
15-
use tokio::sync::Mutex;
16-
use tokio::time::sleep;
3+
4+
pub mod mock_callbacks;
175

186
// Shared test contract used across integration tests
197
sol! {
@@ -42,97 +30,6 @@ sol! {
4230
}
4331
}
4432

45-
pub struct EventCounter {
46-
pub count: Arc<AtomicUsize>,
47-
}
48-
49-
#[async_trait]
50-
impl EventCallback for EventCounter {
51-
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
52-
self.count.fetch_add(1, Ordering::SeqCst);
53-
Ok(())
54-
}
55-
}
56-
57-
pub struct SlowProcessor {
58-
pub delay_ms: u64,
59-
pub processed: Arc<AtomicUsize>,
60-
}
61-
62-
#[async_trait]
63-
impl EventCallback for SlowProcessor {
64-
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
65-
sleep(Duration::from_millis(self.delay_ms)).await;
66-
self.processed.fetch_add(1, Ordering::SeqCst);
67-
Ok(())
68-
}
69-
}
70-
71-
/// A callback that fails `fail_times` attempts before succeeding once.
72-
pub struct FlakyCallback {
73-
pub attempts: Arc<AtomicUsize>,
74-
pub successes: Arc<AtomicUsize>,
75-
pub fail_times: usize,
76-
}
77-
78-
#[async_trait]
79-
impl EventCallback for FlakyCallback {
80-
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
81-
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
82-
if attempt <= self.fail_times {
83-
anyhow::bail!("intentional failure on attempt {attempt}");
84-
}
85-
self.successes.fetch_add(1, Ordering::SeqCst);
86-
Ok(())
87-
}
88-
}
89-
90-
// A callback that always fails and records attempts.
91-
pub struct AlwaysFailingCallback {
92-
pub attempts: Arc<AtomicUsize>,
93-
}
94-
95-
#[async_trait]
96-
impl EventCallback for AlwaysFailingCallback {
97-
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
98-
self.attempts.fetch_add(1, Ordering::SeqCst);
99-
anyhow::bail!("always failing callback")
100-
}
101-
}
102-
103-
// Captures block numbers in the order they are processed.
104-
pub struct OrderingCapture {
105-
pub blocks: Arc<Mutex<Vec<u64>>>,
106-
}
107-
108-
#[async_trait]
109-
impl EventCallback for OrderingCapture {
110-
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
111-
let mut guard = self.blocks.lock().await;
112-
if let Some(n) = log.block_number {
113-
guard.push(n);
114-
}
115-
Ok(())
116-
}
117-
}
118-
119-
// Captures decoded CountIncreased `newCount` values to verify callback/event ordering.
120-
pub struct OrderingCaptureCount {
121-
pub counts: Arc<Mutex<Vec<u64>>>,
122-
}
123-
124-
#[async_trait]
125-
impl EventCallback for OrderingCaptureCount {
126-
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
127-
if let Some(&TestCounter::CountIncreased::SIGNATURE_HASH) = log.topic0() {
128-
let TestCounter::CountIncreased { newCount } = log.log_decode()?.inner.data;
129-
let mut guard = self.counts.lock().await;
130-
guard.push(newCount.try_into().unwrap());
131-
}
132-
Ok(())
133-
}
134-
}
135-
13633
#[allow(clippy::missing_errors_doc)]
13734
pub fn spawn_anvil(block_time_secs: u64) -> anyhow::Result<AnvilInstance> {
13835
Ok(Anvil::new().block_time(block_time_secs).try_spawn()?)

tests/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
pub mod common;
22
pub mod live_mode;
3+
4+
pub use common::mock_callbacks;

tests/live_mode/basic.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use std::{
66
time::Duration,
77
};
88

9-
use crate::common::{EventCounter, TestCounter, build_provider, deploy_counter, spawn_anvil};
9+
use crate::{
10+
common::{TestCounter, build_provider, deploy_counter, spawn_anvil},
11+
mock_callbacks::BasicCounterCallback,
12+
};
1013
use alloy::{network::Ethereum, providers::WsConnect, sol_types::SolEvent};
1114
use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter};
1215
use tokio::time::sleep;
@@ -19,7 +22,7 @@ async fn basic_single_event_scanning() -> anyhow::Result<()> {
1922
let contract_address = *contract.address();
2023

2124
let event_count = Arc::new(AtomicUsize::new(0));
22-
let callback = Arc::new(EventCounter { count: event_count.clone() });
25+
let callback = Arc::new(BasicCounterCallback { count: event_count.clone() });
2326

2427
let filter = EventFilter {
2528
contract_address,
@@ -52,8 +55,8 @@ async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()>
5255

5356
let a_count = Arc::new(AtomicUsize::new(0));
5457
let b_count = Arc::new(AtomicUsize::new(0));
55-
let a_cb = Arc::new(EventCounter { count: a_count.clone() });
56-
let b_cb = Arc::new(EventCounter { count: b_count.clone() });
58+
let a_cb = Arc::new(BasicCounterCallback { count: a_count.clone() });
59+
let b_cb = Arc::new(BasicCounterCallback { count: b_count.clone() });
5760

5861
let a_filter = EventFilter {
5962
contract_address: *a.address(),
@@ -96,8 +99,8 @@ async fn multiple_events_same_contract() -> anyhow::Result<()> {
9699

97100
let increase_count = Arc::new(AtomicUsize::new(0));
98101
let decrease_count = Arc::new(AtomicUsize::new(0));
99-
let increase_cb = Arc::new(EventCounter { count: increase_count.clone() });
100-
let decrease_cb = Arc::new(EventCounter { count: decrease_count.clone() });
102+
let increase_cb = Arc::new(BasicCounterCallback { count: increase_count.clone() });
103+
let decrease_cb = Arc::new(BasicCounterCallback { count: decrease_count.clone() });
101104

102105
let increase_filter = EventFilter {
103106
contract_address,
@@ -139,7 +142,7 @@ async fn signature_matching_ignores_irrelevant_events() -> anyhow::Result<()> {
139142
let contract = deploy_counter(provider).await?;
140143

141144
let count = Arc::new(AtomicUsize::new(0));
142-
let callback = Arc::new(EventCounter { count: count.clone() });
145+
let callback = Arc::new(BasicCounterCallback { count: count.clone() });
143146

144147
// Subscribe to CountDecreased but only emit CountIncreased
145148
let filter = EventFilter {
@@ -170,7 +173,7 @@ async fn live_filters_malformed_signature_graceful() -> anyhow::Result<()> {
170173
let contract = deploy_counter(provider).await?;
171174

172175
let count = Arc::new(AtomicUsize::new(0));
173-
let callback = Arc::new(EventCounter { count: count.clone() });
176+
let callback = Arc::new(BasicCounterCallback { count: count.clone() });
174177
let filter = EventFilter {
175178
contract_address: *contract.address(),
176179
event: "invalid-sig".to_string(),

tests/live_mode/callbacks.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,20 @@
1-
use std::{sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration};
2-
1+
use std::{
2+
sync::{
3+
Arc,
4+
atomic::{AtomicUsize, Ordering},
5+
},
6+
time::Duration,
7+
};
8+
9+
use crate::{
10+
common::{TestCounter, build_provider, deploy_counter, spawn_anvil},
11+
mock_callbacks::{AlwaysFailingCallback, FlakyCallback, SlowProcessorCallback},
12+
};
313
use alloy::{network::Ethereum, providers::WsConnect, sol_types::SolEvent};
4-
use crate::common::{TestCounter, build_provider, deploy_counter, spawn_anvil, SlowProcessor, FlakyCallback, AlwaysFailingCallback};
5-
use event_scanner::{types::{EventFilter, CallbackConfig}, event_scanner::EventScannerBuilder};
14+
use event_scanner::{
15+
event_scanner::EventScannerBuilder,
16+
types::{CallbackConfig, EventFilter},
17+
};
618
use tokio::time::sleep;
719

820
#[tokio::test]
@@ -13,9 +25,13 @@ async fn callbacks_slow_processing_does_not_drop_events() -> anyhow::Result<()>
1325
let contract_address = *contract.address();
1426

1527
let processed = Arc::new(AtomicUsize::new(0));
16-
let callback = Arc::new(SlowProcessor { delay_ms: 100, processed: processed.clone() });
28+
let callback = Arc::new(SlowProcessorCallback { delay_ms: 100, processed: processed.clone() });
1729

18-
let filter = EventFilter { contract_address, event: TestCounter::CountIncreased::SIGNATURE.to_owned(), callback };
30+
let filter = EventFilter {
31+
contract_address,
32+
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
33+
callback,
34+
};
1935
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
2036
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
2137

@@ -42,12 +58,21 @@ async fn callbacks_failure_then_retry_success() -> anyhow::Result<()> {
4258

4359
let attempts = Arc::new(AtomicUsize::new(0));
4460
let successes = Arc::new(AtomicUsize::new(0));
45-
let callback = Arc::new(FlakyCallback { attempts: attempts.clone(), successes: successes.clone(), fail_times: 2 });
46-
47-
let filter = EventFilter { contract_address: *contract.address(), event: TestCounter::CountIncreased::SIGNATURE.to_owned(), callback };
61+
let callback = Arc::new(FlakyCallback {
62+
attempts: attempts.clone(),
63+
successes: successes.clone(),
64+
fail_times: 2,
65+
});
66+
67+
let filter = EventFilter {
68+
contract_address: *contract.address(),
69+
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
70+
callback,
71+
};
4872
let cfg = CallbackConfig { max_attempts: 3, delay_ms: 50 };
4973

50-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter).with_callback_config(cfg);
74+
let builder =
75+
EventScannerBuilder::<Ethereum>::new().with_event_filter(filter).with_callback_config(cfg);
5176
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
5277
let scanner_handle = tokio::spawn(async move { scanner.start().await });
5378

@@ -69,9 +94,14 @@ async fn callbacks_always_failing_respects_max_attempts() -> anyhow::Result<()>
6994
let attempts = Arc::new(AtomicUsize::new(0));
7095
let callback = Arc::new(AlwaysFailingCallback { attempts: attempts.clone() });
7196

72-
let filter = EventFilter { contract_address: *contract.address(), event: TestCounter::CountIncreased::SIGNATURE.to_owned(), callback };
97+
let filter = EventFilter {
98+
contract_address: *contract.address(),
99+
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
100+
callback,
101+
};
73102
let cfg = CallbackConfig { max_attempts: 2, delay_ms: 20 };
74-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter).with_callback_config(cfg);
103+
let builder =
104+
EventScannerBuilder::<Ethereum>::new().with_event_filter(filter).with_callback_config(cfg);
75105
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
76106
let scanner_handle = tokio::spawn(async move { scanner.start().await });
77107

0 commit comments

Comments
 (0)