Skip to content

Commit 40b63ce

Browse files
authored
test: Live Reorg Tests Cleanup + Reliability Improvement (#150)
1 parent dbf7401 commit 40b63ce

File tree

6 files changed

+175
-331
lines changed

6 files changed

+175
-331
lines changed

src/block_range_scanner.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,6 @@ impl BlockRangeScanner {
207207
}
208208

209209
/// Connects to an existing provider
210-
///
211-
/// # Errors
212-
///
213-
/// Returns an error if the connection fails
214210
#[must_use]
215211
pub fn connect<N: Network>(self, provider: RootProvider<N>) -> ConnectedBlockRangeScanner<N> {
216212
let robust_provider = RobustProvider::new(provider)

src/test_utils/macros.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
#[macro_export]
22
macro_rules! assert_next {
33
($stream: expr, $expected: expr) => {
4-
let message = tokio_stream::StreamExt::next(&mut $stream).await;
4+
assert_next!($stream, $expected, timeout = 5)
5+
};
6+
($stream: expr, $expected: expr, timeout = $secs: expr) => {
7+
let message = tokio::time::timeout(
8+
std::time::Duration::from_secs($secs),
9+
tokio_stream::StreamExt::next(&mut $stream),
10+
)
11+
.await
12+
.expect("timed out");
513
if let Some(msg) = message {
614
assert_eq!(msg, $expected)
715
} else {
@@ -13,7 +21,15 @@ macro_rules! assert_next {
1321
#[macro_export]
1422
macro_rules! assert_closed {
1523
($stream: expr) => {
16-
let message = tokio_stream::StreamExt::next(&mut $stream).await;
24+
assert_closed!($stream, timeout = 5)
25+
};
26+
($stream: expr, timeout = $secs: expr) => {
27+
let message = tokio::time::timeout(
28+
std::time::Duration::from_secs($secs),
29+
tokio_stream::StreamExt::next(&mut $stream),
30+
)
31+
.await
32+
.expect("timed out");
1733
assert!(message.is_none())
1834
};
1935
}

tests/common.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
#![allow(clippy::missing_panics_doc)]
33
#![allow(missing_docs)]
44

5-
use std::sync::Arc;
6-
75
use alloy::{
86
eips::BlockNumberOrTag,
97
network::Ethereum,
@@ -54,7 +52,7 @@ where
5452
P: Provider<Ethereum> + Clone,
5553
{
5654
pub provider: RootProvider,
57-
pub contract: TestCounter::TestCounterInstance<Arc<P>>,
55+
pub contract: TestCounter::TestCounterInstance<P>,
5856
pub scanner: S,
5957
pub stream: ReceiverStream<Message>,
6058
pub anvil: AnvilInstance,
@@ -72,12 +70,12 @@ pub async fn setup_common(
7270
) -> anyhow::Result<(
7371
AnvilInstance,
7472
RootProvider,
75-
TestCounter::TestCounterInstance<Arc<RootProvider>>,
73+
TestCounter::TestCounterInstance<RootProvider>,
7674
EventFilter,
7775
)> {
7876
let anvil = spawn_anvil(block_interval)?;
7977
let provider = build_provider(&anvil).await?;
80-
let contract = deploy_counter(Arc::new(provider.clone())).await?;
78+
let contract = deploy_counter(provider.clone()).await?;
8179

8280
let default_filter = EventFilter::new()
8381
.contract_address(*contract.address())
@@ -95,10 +93,8 @@ pub async fn setup_live_scanner(
9593
) -> anyhow::Result<LiveScannerSetup<impl Provider<Ethereum> + Clone>> {
9694
let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?;
9795

98-
let mut scanner = EventScannerBuilder::live()
99-
.block_confirmations(confirmations)
100-
.connect_ws(anvil.ws_endpoint_url())
101-
.await?;
96+
let mut scanner =
97+
EventScannerBuilder::live().block_confirmations(confirmations).connect(provider.clone());
10298

10399
let stream = scanner.subscribe(filter);
104100

@@ -116,8 +112,7 @@ pub async fn setup_sync_scanner(
116112
let mut scanner = EventScannerBuilder::sync()
117113
.from_block(from)
118114
.block_confirmations(confirmations)
119-
.connect_ws(anvil.ws_endpoint_url())
120-
.await?;
115+
.connect(provider.clone());
121116

122117
let stream = scanner.subscribe(filter);
123118

@@ -135,8 +130,7 @@ pub async fn setup_sync_from_latest_scanner(
135130
let mut scanner = EventScannerBuilder::sync()
136131
.from_latest(latest)
137132
.block_confirmations(confirmations)
138-
.connect_ws(anvil.ws_endpoint_url())
139-
.await?;
133+
.connect(provider.clone());
140134

141135
let stream = scanner.subscribe(filter);
142136

@@ -151,11 +145,8 @@ pub async fn setup_historic_scanner(
151145
) -> anyhow::Result<HistoricScannerSetup<impl Provider<Ethereum> + Clone>> {
152146
let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?;
153147

154-
let mut scanner = EventScannerBuilder::historic()
155-
.from_block(from)
156-
.to_block(to)
157-
.connect_ws(anvil.ws_endpoint_url())
158-
.await?;
148+
let mut scanner =
149+
EventScannerBuilder::historic().from_block(from).to_block(to).connect(provider.clone());
159150

160151
let stream = scanner.subscribe(filter);
161152

@@ -187,7 +178,7 @@ pub async fn setup_latest_scanner(
187178

188179
pub async fn reorg_with_new_count_incr_txs<P>(
189180
provider: RootProvider,
190-
contract: TestCounter::TestCounterInstance<Arc<P>>,
181+
contract: TestCounter::TestCounterInstance<P>,
191182
num_initial_events: u64,
192183
num_new_events: u64,
193184
reorg_depth: u64,
@@ -261,13 +252,14 @@ pub fn spawn_anvil(block_time: Option<f64>) -> anyhow::Result<AnvilInstance> {
261252

262253
pub async fn build_provider(anvil: &AnvilInstance) -> anyhow::Result<RootProvider> {
263254
let wallet = anvil.wallet().expect("anvil should return a default wallet");
264-
let provider = ProviderBuilder::new().wallet(wallet).connect(anvil.endpoint().as_str()).await?;
255+
let provider =
256+
ProviderBuilder::new().wallet(wallet).connect(anvil.ws_endpoint_url().as_str()).await?;
265257
Ok(provider.root().to_owned())
266258
}
267259

268260
pub async fn deploy_counter<P>(provider: P) -> anyhow::Result<TestCounter::TestCounterInstance<P>>
269261
where
270-
P: alloy::providers::Provider<Ethereum> + Clone,
262+
P: alloy::providers::Provider<Ethereum>,
271263
{
272264
let contract = TestCounter::deploy(provider).await?;
273265
Ok(contract)
@@ -283,7 +275,7 @@ pub(crate) trait TestCounterExt {
283275
) -> anyhow::Result<LogMetadata<TestCounter::CountDecreased>>;
284276
}
285277

286-
impl<P: Provider + Clone> TestCounterExt for TestCounter::TestCounterInstance<Arc<P>> {
278+
impl<P: Provider + Clone> TestCounterExt for TestCounter::TestCounterInstance<P> {
287279
async fn increase_and_get_meta(
288280
&self,
289281
) -> anyhow::Result<LogMetadata<TestCounter::CountIncreased>> {

tests/latest_events/basic.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::sync::Arc;
2-
31
use alloy::{
42
eips::BlockNumberOrTag,
53
network::Ethereum,
@@ -255,8 +253,8 @@ async fn latest_scanner_cross_contract_filtering() -> anyhow::Result<()> {
255253
let provider = setup.provider;
256254
let mut scanner = setup.scanner;
257255

258-
let contract_a = deploy_counter(Arc::new(provider.clone())).await?;
259-
let contract_b = deploy_counter(Arc::new(provider.clone())).await?;
256+
let contract_a = deploy_counter(provider.clone()).await?;
257+
let contract_b = deploy_counter(provider.clone()).await?;
260258

261259
// Listener only for contract A CountIncreased
262260
let filter_a = EventFilter::new()

tests/live/basic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()>
6565
let setup = setup_live_scanner(Some(0.1), None, 0).await?;
6666
let provider = setup.provider.clone();
6767
let a = setup.contract.clone();
68-
let b = deploy_counter(Arc::new(provider.clone())).await?;
68+
let b = deploy_counter(provider.clone()).await?;
6969

7070
let a_filter = EventFilter::new()
7171
.contract_address(*a.address())

0 commit comments

Comments
 (0)