Skip to content

Commit 159c33f

Browse files
committed
fix test and bug
c
1 parent 499b683 commit 159c33f

File tree

5 files changed

+115
-62
lines changed

5 files changed

+115
-62
lines changed

src/event_scanner.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
use alloy::{
99
eips::BlockNumberOrTag,
1010
network::Network,
11+
primitives::Address,
1112
providers::Provider,
1213
rpc::types::{Filter, Log},
1314
transports::http::reqwest::Url,
@@ -119,6 +120,12 @@ pub struct EventScanner<N: Network> {
119120
callback_strategy: Arc<dyn CallbackStrategy>,
120121
}
121122

123+
#[derive(Hash, Eq, PartialEq)]
124+
struct EventIdentifier {
125+
contract_address: Address,
126+
event: String,
127+
}
128+
122129
impl<N: Network> EventScanner<N> {
123130
/// Starts the scanner
124131
///
@@ -130,29 +137,31 @@ impl<N: Network> EventScanner<N> {
130137
start_height: BlockNumberOrTag,
131138
end_height: Option<BlockNumberOrTag>,
132139
) -> anyhow::Result<()> {
133-
let mut event_channels: HashMap<String, mpsc::Sender<Log>> = HashMap::new();
140+
let mut event_channels: HashMap<EventIdentifier, mpsc::Sender<Log>> = HashMap::new();
134141

135142
for filter in &self.tracked_events {
136-
let event_name = filter.event.clone();
143+
let unique_event = EventIdentifier {
144+
contract_address: filter.contract_address,
145+
event: filter.event.clone(),
146+
};
137147

138-
if event_channels.contains_key(&event_name) {
148+
if event_channels.contains_key(&unique_event) {
139149
continue;
140150
}
141151

142152
// TODO: configurable buffer size / smaller buffer ?
143153
let (sender, receiver) = mpsc::channel::<Log>(1024);
144154

145-
let event_name_clone = event_name.clone();
146155
let callback = filter.callback.clone();
147156
let strategy = self.callback_strategy.clone();
148157
Self::spawn_event_callback_task_executors(
149158
receiver,
150159
callback,
151160
strategy,
152-
event_name_clone,
161+
filter.event.clone(),
153162
);
154163

155-
event_channels.insert(event_name, sender);
164+
event_channels.insert(unique_event, sender);
156165
}
157166

158167
let client = self.block_scanner.run()?;
@@ -199,7 +208,7 @@ impl<N: Network> EventScanner<N> {
199208
&self,
200209
from_block: u64,
201210
to_block: u64,
202-
event_channels: &HashMap<String, mpsc::Sender<Log>>,
211+
event_channels: &HashMap<EventIdentifier, mpsc::Sender<Log>>,
203212
) -> anyhow::Result<()> {
204213
for event_filter in &self.tracked_events {
205214
let filter = Filter::new()
@@ -222,7 +231,12 @@ impl<N: Network> EventScanner<N> {
222231
"found logs for event in block range"
223232
);
224233

225-
if let Some(sender) = event_channels.get(&event_filter.event) {
234+
let event_identifier = EventIdentifier {
235+
contract_address: event_filter.contract_address,
236+
event: event_filter.event.clone(),
237+
};
238+
239+
if let Some(sender) = event_channels.get(&event_identifier) {
226240
for log in logs {
227241
if let Err(e) = sender.send(log).await {
228242
warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing");

tests/live_mode/basic.rs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
common::{TestCounter, build_provider, deploy_counter, spawn_anvil},
1111
mock_callbacks::BasicCounterCallback,
1212
};
13-
use alloy::{network::Ethereum, providers::WsConnect, sol_types::SolEvent};
13+
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
1414
use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter};
1515
use tokio::time::sleep;
1616

@@ -30,10 +30,14 @@ async fn basic_single_event_scanning() -> anyhow::Result<()> {
3030
callback,
3131
};
3232

33-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
34-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
33+
let mut builder = EventScannerBuilder::new();
34+
builder.with_event_filter(filter);
35+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
3536

36-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
37+
let scanner_handle = tokio::spawn(async move {
38+
let mut scanner = scanner;
39+
scanner.start(BlockNumberOrTag::Latest, None).await
40+
});
3741

3842
for _ in 0..5 {
3943
let _ = contract.increase().send().await?.get_receipt().await?;
@@ -69,15 +73,18 @@ async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()>
6973
callback: b_cb,
7074
};
7175

72-
let builder = EventScannerBuilder::<Ethereum>::new()
73-
.with_event_filter(a_filter)
74-
.with_event_filter(b_filter);
75-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
76-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
76+
let mut builder = EventScannerBuilder::new();
77+
builder.with_event_filters(vec![a_filter, b_filter]);
78+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
79+
let scanner_handle = tokio::spawn(async move {
80+
let mut scanner = scanner;
81+
scanner.start(BlockNumberOrTag::Latest, None).await
82+
});
7783

7884
for _ in 0..3 {
7985
let _ = a.increase().send().await?.get_receipt().await?;
8086
}
87+
8188
for _ in 0..2 {
8289
let _ = b.increase().send().await?.get_receipt().await?;
8390
}
@@ -113,12 +120,14 @@ async fn multiple_events_same_contract() -> anyhow::Result<()> {
113120
callback: decrease_cb,
114121
};
115122

116-
let builder = EventScannerBuilder::<Ethereum>::new()
117-
.with_event_filter(increase_filter)
118-
.with_event_filter(decrease_filter);
119-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
123+
let mut builder = EventScannerBuilder::new();
124+
builder.with_event_filters(vec![increase_filter, decrease_filter]);
125+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
120126

121-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
127+
let scanner_handle = tokio::spawn(async move {
128+
let mut scanner = scanner;
129+
scanner.start(BlockNumberOrTag::Latest, None).await
130+
});
122131

123132
for i in 0..6 {
124133
let _ = contract.increase().send().await?.get_receipt().await?;
@@ -127,7 +136,7 @@ async fn multiple_events_same_contract() -> anyhow::Result<()> {
127136
}
128137
}
129138

130-
sleep(Duration::from_millis(200)).await;
139+
sleep(Duration::from_millis(1500)).await;
131140
scanner_handle.abort();
132141

133142
assert_eq!(increase_count.load(Ordering::SeqCst), 6);
@@ -151,9 +160,13 @@ async fn signature_matching_ignores_irrelevant_events() -> anyhow::Result<()> {
151160
callback,
152161
};
153162

154-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
155-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
156-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
163+
let mut builder = EventScannerBuilder::new();
164+
builder.with_event_filter(filter);
165+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
166+
let scanner_handle = tokio::spawn(async move {
167+
let mut scanner = scanner;
168+
scanner.start(BlockNumberOrTag::Latest, None).await
169+
});
157170

158171
for _ in 0..3 {
159172
let _ = contract.increase().send().await?.get_receipt().await?;
@@ -180,9 +193,13 @@ async fn live_filters_malformed_signature_graceful() -> anyhow::Result<()> {
180193
callback,
181194
};
182195

183-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
184-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
185-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
196+
let mut builder = EventScannerBuilder::new();
197+
builder.with_event_filter(filter);
198+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
199+
let scanner_handle = tokio::spawn(async move {
200+
let mut scanner = scanner;
201+
scanner.start(BlockNumberOrTag::Latest, None).await
202+
});
186203

187204
for _ in 0..3 {
188205
let _ = contract.increase().send().await?.get_receipt().await?;

tests/live_mode/callbacks.rs

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ use crate::{
1010
common::{TestCounter, build_provider, deploy_counter, spawn_anvil},
1111
mock_callbacks::{AlwaysFailingCallback, FlakyCallback, SlowProcessorCallback},
1212
};
13-
use alloy::{network::Ethereum, providers::WsConnect, sol_types::SolEvent};
13+
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
1414
use event_scanner::{
15+
CallbackStrategy, EventFilter,
16+
callback_strategy::{FixedRetryConfig, FixedRetryStrategy},
1517
event_scanner::EventScannerBuilder,
16-
types::{CallbackConfig, EventFilter},
1718
};
1819
use tokio::time::sleep;
1920

@@ -32,10 +33,14 @@ async fn callbacks_slow_processing_does_not_drop_events() -> anyhow::Result<()>
3233
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
3334
callback,
3435
};
35-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
36-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
36+
let mut builder = EventScannerBuilder::new();
37+
builder.with_event_filter(filter);
38+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
3739

38-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
40+
let scanner_handle = tokio::spawn(async move {
41+
let mut scanner = scanner;
42+
scanner.start(BlockNumberOrTag::Latest, None).await
43+
});
3944

4045
for _ in 0..3 {
4146
let _ = contract.increase().send().await?.get_receipt().await?;
@@ -69,12 +74,17 @@ async fn callbacks_failure_then_retry_success() -> anyhow::Result<()> {
6974
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
7075
callback,
7176
};
72-
let cfg = CallbackConfig { max_attempts: 3, delay_ms: 50 };
73-
74-
let builder =
75-
EventScannerBuilder::<Ethereum>::new().with_event_filter(filter).with_callback_config(cfg);
76-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
77-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
77+
let cfg = FixedRetryConfig { max_attempts: 3, delay_ms: 50 };
78+
79+
let mut builder = EventScannerBuilder::new();
80+
builder.with_event_filter(filter);
81+
let strategy: Arc<dyn CallbackStrategy> = Arc::new(FixedRetryStrategy::new(cfg));
82+
builder.with_callback_strategy(strategy);
83+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
84+
let scanner_handle = tokio::spawn(async move {
85+
let mut scanner = scanner;
86+
scanner.start(BlockNumberOrTag::Latest, None).await
87+
});
7888

7989
let _ = contract.increase().send().await?.get_receipt().await?;
8090
sleep(Duration::from_millis(300)).await;
@@ -99,11 +109,16 @@ async fn callbacks_always_failing_respects_max_attempts() -> anyhow::Result<()>
99109
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
100110
callback,
101111
};
102-
let cfg = CallbackConfig { max_attempts: 2, delay_ms: 20 };
103-
let builder =
104-
EventScannerBuilder::<Ethereum>::new().with_event_filter(filter).with_callback_config(cfg);
105-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
106-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
112+
let cfg = FixedRetryConfig { max_attempts: 2, delay_ms: 20 };
113+
let mut builder = EventScannerBuilder::new();
114+
builder.with_event_filter(filter);
115+
let strategy: Arc<dyn CallbackStrategy> = Arc::new(FixedRetryStrategy::new(cfg));
116+
builder.with_callback_strategy(strategy);
117+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
118+
let scanner_handle = tokio::spawn(async move {
119+
let mut scanner = scanner;
120+
scanner.start(BlockNumberOrTag::Latest, None).await
121+
});
107122

108123
let _ = contract.increase().send().await?.get_receipt().await?;
109124
sleep(Duration::from_millis(200)).await;

tests/live_mode/ordering.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
common,
55
mock_callbacks::{BlockOrderingCallback, EventOrderingCallback},
66
};
7-
use alloy::{network::Ethereum, providers::WsConnect, sol_types::SolEvent};
7+
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
88
use common::{TestCounter, build_provider, deploy_counter, spawn_anvil};
99
use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter};
1010
use tokio::time::sleep;
@@ -23,9 +23,13 @@ async fn callback_occurs_in_order() -> anyhow::Result<()> {
2323
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
2424
callback,
2525
};
26-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
27-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
28-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
26+
let mut builder = EventScannerBuilder::new();
27+
builder.with_event_filter(filter);
28+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
29+
let scanner_handle = tokio::spawn(async move {
30+
let mut scanner = scanner;
31+
scanner.start(BlockNumberOrTag::Latest, None).await
32+
});
2933

3034
for _ in 0..5 {
3135
let _ = contract.increase().send().await?.get_receipt().await?;
@@ -42,12 +46,10 @@ async fn callback_occurs_in_order() -> anyhow::Result<()> {
4246

4347
#[tokio::test]
4448
async fn blocks_and_events_arrive_in_order() -> anyhow::Result<()> {
45-
// Mine a block every second and batch 5 txs per block
4649
let anvil = spawn_anvil(1)?;
4750
let provider = build_provider(&anvil).await?;
4851
let contract = deploy_counter(provider.clone()).await?;
4952

50-
// Capture block numbers in callback order
5153
let blocks = Arc::new(tokio::sync::Mutex::new(Vec::<u64>::new()));
5254
let callback = Arc::new(BlockOrderingCallback { blocks: blocks.clone() });
5355

@@ -56,18 +58,19 @@ async fn blocks_and_events_arrive_in_order() -> anyhow::Result<()> {
5658
event: TestCounter::CountIncreased::SIGNATURE.to_owned(),
5759
callback,
5860
};
59-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
60-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
61-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
61+
let mut builder = EventScannerBuilder::new();
62+
builder.with_event_filter(filter);
63+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
64+
let scanner_handle = tokio::spawn(async move {
65+
let mut scanner = scanner;
66+
scanner.start(BlockNumberOrTag::Latest, None).await
67+
});
6268

63-
// 5 blocks, 5 events per block
6469
for _ in 0..5 {
6570
let _pending = contract.increase().send().await?;
66-
// Wait for the next block to be mined (block_time = 1s)
6771
sleep(Duration::from_millis(1200)).await;
6872
}
6973

70-
// Give scanner time to drain channel
7174
sleep(Duration::from_millis(800)).await;
7275
scanner_handle.abort();
7376

tests/live_mode/performance.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
time::Duration,
77
};
88

9-
use alloy::{network::Ethereum, providers::WsConnect, sol_types::SolEvent};
9+
use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent};
1010
use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter};
1111
use tokio::time::sleep;
1212

@@ -29,15 +29,19 @@ async fn high_event_volume_no_loss() -> anyhow::Result<()> {
2929
callback,
3030
};
3131

32-
let builder = EventScannerBuilder::<Ethereum>::new().with_event_filter(filter);
33-
let mut scanner = builder.connect_ws(WsConnect::new(anvil.ws_endpoint_url())).await?;
34-
let scanner_handle = tokio::spawn(async move { scanner.start().await });
32+
let mut builder = EventScannerBuilder::new();
33+
builder.with_event_filter(filter);
34+
let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
35+
let scanner_handle = tokio::spawn(async move {
36+
let mut scanner = scanner;
37+
scanner.start(BlockNumberOrTag::Latest, None).await
38+
});
3539

3640
for _ in 0..100 {
3741
let _ = contract.increase().send().await?.get_receipt().await?;
3842
}
3943

40-
sleep(Duration::from_millis(800)).await;
44+
sleep(Duration::from_millis(1000)).await;
4145
scanner_handle.abort();
4246

4347
assert_eq!(count.load(Ordering::SeqCst), 100);

0 commit comments

Comments
 (0)