Skip to content

Commit 0900807

Browse files
committed
test: add new tests + unfinished reorg test
1 parent e353599 commit 0900807

File tree

5 files changed

+120
-9
lines changed

5 files changed

+120
-9
lines changed

src/block_range_scanner.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@
6767
6868
use std::{cmp::Ordering, ops::RangeInclusive, sync::Arc};
6969

70+
#[cfg(test)]
71+
use std::sync::LazyLock;
72+
#[cfg(test)]
73+
use tokio::sync::Mutex;
7074
use tokio::sync::{mpsc, oneshot};
7175
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
7276

@@ -323,6 +327,14 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
323327
}
324328
}
325329

330+
#[cfg(test)]
331+
static TEST_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
332+
333+
#[cfg(test)]
334+
async fn proceed_if_test_lock_acquired() {
335+
_ = TEST_LOCK.lock().await;
336+
}
337+
326338
struct Service<N: Network> {
327339
config: Config,
328340
provider: RootProvider<N>,
@@ -624,6 +636,9 @@ impl<N: Network> Service<N> {
624636
let range_iter = block_range.rev().step_by(blocks_read_per_epoch as usize);
625637

626638
for batch_end in range_iter {
639+
#[cfg(test)]
640+
proceed_if_test_lock_acquired().await;
641+
627642
let batch_start =
628643
batch_end.saturating_sub(blocks_read_per_epoch as u64 - 1).max(stream_end);
629644

tests/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ pub async fn setup_scanner(
8484

8585
#[allow(clippy::missing_errors_doc)]
8686
#[allow(clippy::missing_panics_doc)]
87-
pub async fn reorg_with_new_txs<P>(
87+
pub async fn reorg_with_new_count_incr_txs<P>(
8888
provider: RootProvider,
8989
contract: TestCounter::TestCounterInstance<Arc<P>>,
9090
num_initial_events: u64,

tests/historic_to_live/reorg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio_stream::StreamExt;
99

1010
use event_scanner::{event_scanner::EventScannerMessage, types::ScannerStatus};
1111

12-
use crate::common::{TestSetup, reorg_with_new_txs, setup_scanner};
12+
use crate::common::{TestSetup, reorg_with_new_count_incr_txs, setup_scanner};
1313

1414
#[tokio::test]
1515
async fn block_confirmations_mitigate_reorgs_historic_to_live() -> anyhow::Result<()> {
@@ -32,7 +32,7 @@ async fn block_confirmations_mitigate_reorgs_historic_to_live() -> anyhow::Resul
3232
let reorg_depth = 2u64;
3333
let same_block = false;
3434

35-
let all_tx_hashes = reorg_with_new_txs(
35+
let all_tx_hashes = reorg_with_new_count_incr_txs(
3636
provider.clone(),
3737
contract.clone(),
3838
num_initial_events,

tests/latest_events/basic.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,99 @@ async fn scan_latest_multiple_listeners_to_same_event_receive_same_results() ->
199199

200200
Ok(())
201201
}
202+
203+
#[tokio::test]
204+
async fn scan_latest_different_filters_receive_different_results() -> anyhow::Result<()> {
205+
let setup = setup_scanner(None, None, None).await?;
206+
let contract = setup.contract;
207+
let mut client = setup.client;
208+
209+
// First listener for CountDecreased
210+
let filter_inc = EventFilter::new()
211+
.with_contract_address(*contract.address())
212+
.with_event(TestCounter::CountIncreased::SIGNATURE);
213+
let mut stream_inc = client.create_event_stream(filter_inc);
214+
215+
// Second listener for CountDecreased
216+
let filter_dec = EventFilter::new()
217+
.with_contract_address(*contract.address())
218+
.with_event(TestCounter::CountDecreased::SIGNATURE);
219+
let mut stream_dec = client.create_event_stream(filter_dec);
220+
221+
// Produce 5 increases, then 2 decreases
222+
let mut inc_hashes: Vec<FixedBytes<32>> = Vec::new();
223+
for _ in 0..5u8 {
224+
let r = contract.increase().send().await?.get_receipt().await?;
225+
inc_hashes.push(r.transaction_hash);
226+
}
227+
let mut dec_hashes: Vec<FixedBytes<32>> = Vec::new();
228+
for _ in 0..2u8 {
229+
let r = contract.decrease().send().await?.get_receipt().await?;
230+
dec_hashes.push(r.transaction_hash);
231+
}
232+
233+
// Ask for latest 3 across the full range: each filtered listener should receive their own last
234+
// 3 events
235+
client.scan_latest(3, BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?;
236+
237+
let logs_inc = collect_events(&mut stream_inc).await;
238+
let logs_dec = collect_events(&mut stream_dec).await;
239+
240+
// Should be different sequences and lengths match the requested count (or fewer if not enough)
241+
assert_eq!(logs_inc.len(), 3);
242+
assert_eq!(logs_dec.len(), 2); // only 2 decreases exist
243+
244+
// Validate increases: expect counts 3,4,5 and the corresponding tx hashes from inc_hashes[2..5]
245+
let expected_hashes_inc = inc_hashes[2..5].to_vec();
246+
assert_ordering(logs_inc, 3, expected_hashes_inc, contract.address());
247+
248+
// Validate decreases: expect counts 4,3 (after two decreases)
249+
let mut expected_count_dec = U256::from(4);
250+
for (log, &expected_hash) in logs_dec.iter().zip(dec_hashes.iter()) {
251+
let ev = log.log_decode::<TestCounter::CountDecreased>()?;
252+
assert_eq!(&ev.address(), contract.address());
253+
assert_eq!(ev.transaction_hash.unwrap(), expected_hash);
254+
assert_eq!(ev.inner.newCount, expected_count_dec);
255+
expected_count_dec -= ONE;
256+
}
257+
258+
Ok(())
259+
}
260+
261+
#[tokio::test]
262+
async fn scan_latest_ignores_reorg_and_returns_canonical_latest() -> anyhow::Result<()> {
263+
let setup = setup_scanner(None, None, None).await?;
264+
let contract = setup.contract;
265+
let provider = setup.provider;
266+
let client = setup.client;
267+
let mut stream = setup.stream;
268+
269+
// Create 4 events, then reorg last 2 blocks with 2 new txs
270+
let _initial_hashes = crate::common::reorg_with_new_count_incr_txs(
271+
provider.clone(),
272+
contract.clone(),
273+
4, // num_initial_events
274+
2, // num_new_events (these will be the canonical latest)
275+
2, // reorg depth
276+
false, // place new events in separate blocks
277+
)
278+
.await?;
279+
280+
// Now query latest 2 from full range
281+
client.scan_latest(2, BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?;
282+
283+
let logs = collect_events(&mut stream).await;
284+
assert_eq!(logs.len(), 2);
285+
286+
// After reorg, counts should be 3 and 4 (the chain kept the first two increments; then two new
287+
// increments) Validate exact address, tx hashes ordering via decode and count values.
288+
let mut expected_count = U256::from(3u64);
289+
for log in &logs {
290+
let ev = log.log_decode::<TestCounter::CountIncreased>()?;
291+
assert_eq!(&ev.address(), contract.address());
292+
assert_eq!(ev.inner.newCount, expected_count);
293+
expected_count += ONE;
294+
}
295+
296+
Ok(())
297+
}

tests/live_mode/reorg.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tokio_stream::StreamExt;
44

55
use tokio::{sync::Mutex, time::timeout};
66

7-
use crate::common::{TestSetup, reorg_with_new_txs, setup_scanner};
7+
use crate::common::{TestSetup, reorg_with_new_count_incr_txs, setup_scanner};
88
use alloy::{eips::BlockNumberOrTag, providers::ext::AnvilApi};
99
use event_scanner::{event_scanner::EventScannerMessage, types::ScannerStatus};
1010

@@ -20,7 +20,7 @@ async fn reorg_rescans_events_within_same_block() -> anyhow::Result<()> {
2020
let reorg_depth = 5;
2121
let same_block = true;
2222

23-
let expected_event_tx_hashes = reorg_with_new_txs(
23+
let expected_event_tx_hashes = reorg_with_new_count_incr_txs(
2424
provider,
2525
contract,
2626
num_initial_events,
@@ -83,7 +83,7 @@ async fn reorg_rescans_events_with_ascending_blocks() -> anyhow::Result<()> {
8383
// add events in ascending blocks from reorg point
8484
let same_block = false;
8585

86-
let expected_event_tx_hashes = reorg_with_new_txs(
86+
let expected_event_tx_hashes = reorg_with_new_count_incr_txs(
8787
provider,
8888
contract,
8989
num_initial_events,
@@ -145,7 +145,7 @@ async fn reorg_depth_one() -> anyhow::Result<()> {
145145
let num_new_events = 1;
146146
let same_block = true;
147147

148-
let expected_event_tx_hashes = reorg_with_new_txs(
148+
let expected_event_tx_hashes = reorg_with_new_count_incr_txs(
149149
provider,
150150
contract,
151151
num_initial_events,
@@ -207,7 +207,7 @@ async fn reorg_depth_two() -> anyhow::Result<()> {
207207
let reorg_depth = 2;
208208

209209
let same_block = true;
210-
let expected_event_tx_hashes = reorg_with_new_txs(
210+
let expected_event_tx_hashes = reorg_with_new_count_incr_txs(
211211
provider,
212212
contract,
213213
num_initial_events,
@@ -273,7 +273,7 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> {
273273
let reorg_depth = 2_u64;
274274
let same_block = true;
275275

276-
let all_tx_hashes = reorg_with_new_txs(
276+
let all_tx_hashes = reorg_with_new_count_incr_txs(
277277
provider.clone(),
278278
contract,
279279
num_initial_events,

0 commit comments

Comments
 (0)