Skip to content

Commit c89da66

Browse files
authored
test: fix reorg_rescans_events_within_same_block (#147)
1 parent 3c4ebb1 commit c89da66

File tree

2 files changed

+55
-58
lines changed

2 files changed

+55
-58
lines changed

src/robust_provider.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ use std::{future::Future, sync::Arc, time::Duration};
33
use alloy::{
44
eips::{BlockId, BlockNumberOrTag},
55
network::Network,
6-
providers::{Provider, RootProvider},
6+
providers::{Provider, RootProvider, ext::AnvilApi},
77
pubsub::Subscription,
8-
rpc::types::{Filter, Log},
9-
transports::{RpcError, TransportErrorKind},
8+
rpc::types::{Filter, Log, anvil::ReorgOptions},
9+
transports::{RpcError, TransportErrorKind, TransportResult},
1010
};
1111
use backon::{ExponentialBuilder, Retryable};
1212
use thiserror::Error;
@@ -138,6 +138,10 @@ impl<N: Network> RobustProvider<N> {
138138
result
139139
}
140140

141+
pub async fn anvil_reorg(&self, options: ReorgOptions) -> TransportResult<()> {
142+
self.provider.anvil_reorg(options).await
143+
}
144+
141145
/// Fetch a block by hash with retry and timeout.
142146
///
143147
/// # Errors

tests/live_mode/reorg.rs

Lines changed: 48 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,67 +4,60 @@ use tokio_stream::StreamExt;
44

55
use tokio::{sync::Mutex, time::timeout};
66

7-
use crate::common::{LiveScannerSetup, reorg_with_new_count_incr_txs, setup_live_scanner};
8-
use alloy::providers::ext::AnvilApi;
9-
use event_scanner::{Message, ScannerStatus};
7+
use crate::common::{
8+
LiveScannerSetup, TestCounter::CountIncreased, reorg_with_new_count_incr_txs, setup_common,
9+
setup_live_scanner,
10+
};
11+
use alloy::{
12+
primitives::U256,
13+
providers::{Provider, ext::AnvilApi},
14+
rpc::types::anvil::{ReorgOptions, TransactionData},
15+
};
16+
use event_scanner::{
17+
EventScannerBuilder, Message, ScannerStatus, assert_empty, assert_next,
18+
robust_provider::RobustProvider,
19+
};
1020

1121
#[tokio::test]
1222
async fn reorg_rescans_events_within_same_block() -> anyhow::Result<()> {
13-
let LiveScannerSetup { provider: _provider, contract, scanner, mut stream, anvil } =
14-
setup_live_scanner(Option::Some(0.1), Option::None, 0).await?;
23+
let (_anvil, provider, contract, filter) = setup_common(None, None).await?;
24+
let provider = RobustProvider::new(provider.root().clone());
25+
let mut scanner = EventScannerBuilder::live().block_confirmations(0).connect(provider.clone());
26+
let mut stream = scanner.subscribe(filter);
1527

1628
scanner.start().await?;
1729

18-
let num_initial_events = 5;
19-
let num_new_events = 3;
20-
let reorg_depth = 5;
21-
let same_block = true;
22-
23-
let expected_event_tx_hashes = reorg_with_new_count_incr_txs(
24-
&anvil,
25-
contract,
26-
num_initial_events,
27-
num_new_events,
28-
reorg_depth,
29-
same_block,
30-
)
31-
.await?;
32-
33-
let event_block_count = Arc::new(Mutex::new(Vec::new()));
34-
let event_block_count_clone = Arc::clone(&event_block_count);
35-
36-
let reorg_detected = Arc::new(Mutex::new(false));
37-
let reorg_detected_clone = reorg_detected.clone();
38-
39-
let event_counting = async move {
40-
while let Some(message) = stream.next().await {
41-
match message {
42-
Message::Data(logs) => {
43-
let mut guard = event_block_count_clone.lock().await;
44-
for log in logs {
45-
if let Some(n) = log.transaction_hash {
46-
guard.push(n);
47-
}
48-
}
49-
}
50-
Message::Error(e) => {
51-
panic!("panic with error {e}");
52-
}
53-
Message::Status(status) => {
54-
if matches!(status, ScannerStatus::ReorgDetected) {
55-
*reorg_detected_clone.lock().await = true;
56-
}
57-
}
58-
}
59-
}
60-
};
61-
62-
let _ = timeout(Duration::from_secs(5), event_counting).await;
63-
64-
let final_blocks: Vec<_> = event_block_count.lock().await.clone();
65-
assert!(*reorg_detected.lock().await);
66-
assert_eq!(final_blocks.len() as u64, num_initial_events + num_new_events);
67-
assert_eq!(final_blocks, expected_event_tx_hashes);
30+
// emit initial events
31+
for _ in 0..5 {
32+
contract.increase().send().await?.watch().await?;
33+
}
34+
35+
// assert initial events are emitted as expected
36+
assert_next!(stream, &[CountIncreased { newCount: U256::from(1) }]);
37+
assert_next!(stream, &[CountIncreased { newCount: U256::from(2) }]);
38+
assert_next!(stream, &[CountIncreased { newCount: U256::from(3) }]);
39+
assert_next!(stream, &[CountIncreased { newCount: U256::from(4) }]);
40+
assert_next!(stream, &[CountIncreased { newCount: U256::from(5) }]);
41+
let mut stream = assert_empty!(stream);
42+
43+
// reorg the chain
44+
let tx_block_pairs = (0..3)
45+
.map(|_| (TransactionData::JSON(contract.increase().into_transaction_request()), 0))
46+
.collect();
47+
48+
provider.anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?;
49+
50+
// assert expected messages post-reorg
51+
assert_next!(stream, ScannerStatus::ReorgDetected);
52+
assert_next!(
53+
stream,
54+
&[
55+
CountIncreased { newCount: U256::from(2) },
56+
CountIncreased { newCount: U256::from(3) },
57+
CountIncreased { newCount: U256::from(4) },
58+
]
59+
);
60+
assert_empty!(stream);
6861

6962
Ok(())
7063
}

0 commit comments

Comments
 (0)