Skip to content

Commit 4fde7b1

Browse files
committed
test: refactor
1 parent 3ed4a60 commit 4fde7b1

File tree

3 files changed

+54
-76
lines changed

3 files changed

+54
-76
lines changed

tests/latest_events/basic.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,9 @@ async fn latest_scanner_exact_count_returns_last_events_in_order() -> anyhow::Re
1717
let scanner = setup.scanner;
1818
let mut stream = setup.stream;
1919

20-
// Produce 8 events
21-
contract.increase().send().await?.watch().await?;
22-
contract.increase().send().await?.watch().await?;
23-
contract.increase().send().await?.watch().await?;
24-
contract.increase().send().await?.watch().await?;
25-
contract.increase().send().await?.watch().await?;
26-
contract.increase().send().await?.watch().await?;
27-
contract.increase().send().await?.watch().await?;
28-
contract.increase().send().await?.watch().await?;
20+
for _ in 0..8 {
21+
contract.increase().send().await?.watch().await?;
22+
}
2923

3024
// Ask for the latest 5
3125
scanner.start().await?;
@@ -278,22 +272,17 @@ async fn latest_scanner_mixed_events_and_filters_return_correct_streams() -> any
278272
}
279273

280274
#[tokio::test]
281-
async fn latest_scanner_cross_contract_filtering() -> anyhow::Result<()> {
275+
async fn latest_scanner_ignores_non_tracked_contract() -> anyhow::Result<()> {
282276
// Manual setup to deploy two contracts
283-
let count = 5;
284-
let setup = setup_latest_scanner(None, None, count, None, None).await?;
277+
let setup = setup_latest_scanner(None, None, 5, None, None).await?;
285278
let provider = setup.provider;
286-
let mut scanner = setup.scanner;
279+
let scanner = setup.scanner;
287280

288-
let contract_a = deploy_counter(provider.clone()).await?;
289-
let contract_b = deploy_counter(provider.clone()).await?;
281+
let contract_a = setup.contract;
282+
let contract_b = deploy_counter(provider).await?;
290283

291284
// Listener only for contract A CountIncreased
292-
let filter_a = EventFilter::new()
293-
.contract_address(*contract_a.address())
294-
.event(TestCounter::CountIncreased::SIGNATURE);
295-
296-
let mut stream_a = scanner.subscribe(filter_a);
285+
let mut stream_a = setup.stream;
297286

298287
// Emit interleaved events from A and B: A(1), B(1), A(2), B(2), A(3)
299288
contract_a.increase().send().await?.watch().await?;

tests/live/performance.rs

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,32 @@
1-
use std::{
2-
sync::{
3-
Arc,
4-
atomic::{AtomicUsize, Ordering},
5-
},
6-
time::Duration,
7-
};
1+
use alloy::primitives::U256;
2+
use event_scanner::{assert_empty, assert_next};
83

9-
use event_scanner::Message;
10-
use tokio::time::timeout;
11-
use tokio_stream::StreamExt;
12-
13-
use crate::common::{TestCounter, setup_live_scanner};
4+
use crate::common::{LiveScannerSetup, TestCounter::CountIncreased, setup_live_scanner};
145

156
#[tokio::test]
167
async fn high_event_volume_no_loss() -> anyhow::Result<()> {
17-
let setup = setup_live_scanner(Some(0.05), None, 0).await?;
18-
let contract = setup.contract.clone();
19-
20-
let expected_event_count = 100;
21-
22-
let scanner = setup.scanner;
23-
24-
let mut stream = setup.stream.take(expected_event_count);
8+
let LiveScannerSetup { contract, provider: _p, scanner, mut stream, anvil: _a } =
9+
setup_live_scanner(None, None, 0).await?;
2510

2611
scanner.start().await?;
2712

28-
for _ in 0..expected_event_count {
29-
contract.increase().send().await?.watch().await?;
30-
}
31-
32-
let event_count = Arc::new(AtomicUsize::new(0));
33-
let event_count_clone = Arc::clone(&event_count);
34-
let event_counting = async move {
35-
let mut expected_new_count = 1;
36-
while let Some(Message::Data(logs)) = stream.next().await {
37-
event_count_clone.fetch_add(logs.len(), Ordering::SeqCst);
38-
39-
for log in logs {
40-
let TestCounter::CountIncreased { newCount } = log.log_decode().unwrap().inner.data;
41-
assert_eq!(newCount, expected_new_count);
42-
expected_new_count += 1;
43-
}
13+
tokio::spawn(async move {
14+
for _ in 0..100 {
15+
contract
16+
.increase()
17+
.send()
18+
.await
19+
.expect("should send")
20+
.watch()
21+
.await
22+
.expect("should confirm");
4423
}
45-
};
24+
});
4625

47-
_ = timeout(Duration::from_secs(60), event_counting).await;
48-
49-
assert_eq!(event_count.load(Ordering::SeqCst), expected_event_count);
26+
for new_count in 1..=100 {
27+
assert_next!(stream, &[CountIncreased { newCount: U256::from(new_count) }]);
28+
}
29+
assert_empty!(stream);
5030

5131
Ok(())
5232
}

tests/sync/from_block.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use alloy::{
66
};
77
use event_scanner::{ScannerStatus, assert_empty, assert_next};
88

9-
use crate::common::{TestCounter, setup_sync_scanner};
9+
use crate::common::{SyncScannerSetup, TestCounter, setup_sync_scanner};
1010

1111
#[tokio::test]
1212
async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> {
@@ -75,29 +75,37 @@ async fn sync_from_future_block_waits_until_minted() -> anyhow::Result<()> {
7575
Ok(())
7676
}
7777

78-
#[test_log::test(tokio::test)]
78+
#[tokio::test]
7979
async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> {
8080
// any reorg ≤ 5 should be invisible to consumers
81-
let setup = setup_sync_scanner(None, None, BlockNumberOrTag::Earliest, 5).await?;
82-
let provider = setup.provider;
83-
let contract = setup.contract;
84-
let scanner = setup.scanner;
85-
let mut stream = setup.stream;
81+
let SyncScannerSetup { provider, contract, scanner, mut stream, anvil: _anvil } =
82+
setup_sync_scanner(None, None, BlockNumberOrTag::Earliest, 5).await?;
8683

8784
// mine some initial "historic" blocks
88-
contract.increase().send().await?.watch().await?;
89-
provider.anvil_mine(Some(5), None).await?;
85+
for _ in 0..7 {
86+
contract.increase().send().await?.watch().await?;
87+
}
9088

9189
scanner.start().await?;
9290

9391
// emit "live" events
94-
for _ in 0..4 {
92+
for _ in 0..2 {
9593
contract.increase().send().await?.watch().await?;
9694
}
9795

98-
// assert only the first events has enough confirmations to be streamed
99-
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(1) }]);
96+
// assert historic events are streamed in a batch
97+
assert_next!(
98+
stream,
99+
&[
100+
TestCounter::CountIncreased { newCount: U256::from(1) },
101+
TestCounter::CountIncreased { newCount: U256::from(2) }
102+
]
103+
);
104+
// switching to "live" phase
100105
assert_next!(stream, ScannerStatus::SwitchingToLive);
106+
// assert confirmed live events are streamed separately
107+
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(3) }]);
108+
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(4) }]);
101109
let stream = assert_empty!(stream);
102110

103111
// Perform a shallow reorg on the live tail
@@ -116,13 +124,14 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> {
116124
provider.anvil_mine(Some(10), None).await?;
117125

118126
// no `ReorgDetected` should be emitted
119-
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(2) }]);
120-
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(3) }]);
127+
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(5) }]);
128+
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(6) }]);
129+
assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(7) }]);
121130
assert_next!(
122131
stream,
123132
&[
124-
TestCounter::CountIncreased { newCount: U256::from(4) },
125-
TestCounter::CountIncreased { newCount: U256::from(5) }
133+
TestCounter::CountIncreased { newCount: U256::from(8) },
134+
TestCounter::CountIncreased { newCount: U256::from(9) }
126135
]
127136
);
128137
assert_empty!(stream);

0 commit comments

Comments
 (0)