Skip to content

Commit a1d995f

Browse files
authored
test: Create Macro That Addresses Unpredictability in Streamed Block Ranges (#189)
1 parent 6b1b2e0 commit a1d995f

File tree

3 files changed

+124
-44
lines changed

3 files changed

+124
-44
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ jobs:
5454

5555
# https://github.com/rust-lang/cargo/issues/6669
5656
- name: Run doc tests
57-
run: cargo test --locked --doc
57+
run: cargo test --locked --doc --all-features --no-fail-fast

src/test_utils/macros.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,115 @@ macro_rules! assert_empty {
4242
tokio_stream::wrappers::ReceiverStream::new(inner)
4343
}};
4444
}
45+
46+
/// Asserts that a stream of block ranges completely covers an expected block range.
47+
///
48+
/// This macro consumes messages from a stream and verifies that the block ranges received
49+
/// sequentially cover the entire `expected_range` without gaps or overlaps. Each streamed
50+
/// range must start exactly where the previous one ended, and all ranges must fit within
51+
/// the expected bounds.
52+
///
53+
/// The macro expects the stream to yield `Message::Data(range)` variants containing
54+
/// `RangeInclusive<u64>` values representing block ranges. It tracks coverage by ensuring
55+
/// each new range starts at the next expected block number and doesn't exceed the end of
56+
/// the expected range. Once the entire range is covered, the assertion succeeds.
57+
///
58+
/// # Example
59+
///
60+
/// ```rust
61+
/// use event_scanner::{assert_range_coverage, block_range_scanner::Message};
62+
/// use tokio::sync::mpsc;
63+
/// use tokio_stream::wrappers::ReceiverStream;
64+
///
65+
/// #[tokio::test]
66+
/// async fn test_scanner_covers_range() {
67+
/// let (tx, rx) = mpsc::channel(10);
68+
/// let mut stream = ReceiverStream::new(rx);
69+
///
70+
/// // Simulate a scanner that splits blocks 100-199 into chunks
71+
/// tokio::spawn(async move {
72+
/// tx.send(Message::Data(100..=149)).await.unwrap();
73+
/// tx.send(Message::Data(150..=199)).await.unwrap();
74+
/// });
75+
///
76+
/// // Assert that the stream covers blocks 100-199
77+
/// assert_range_coverage!(stream, 100..=199);
78+
/// }
79+
/// ```
80+
///
81+
/// # Panics
82+
///
83+
/// * **Timeout**: The stream doesn't produce the next expected range within the timeout period
84+
/// (default 5 seconds, configurable via `timeout = N` parameter).
85+
/// * **Gap or overlap**: A streamed range doesn't start exactly at the next expected block number,
86+
/// indicating a gap or overlap in coverage.
87+
/// * **Out of bounds**: A streamed range extends beyond the end of the expected range.
88+
/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
89+
/// when a block range is expected.
90+
/// * **Stream closed early**: The stream ends before the entire expected range is covered.
91+
///
92+
/// On panic, the error message includes the expected remaining range and all previously
93+
/// streamed ranges for debugging.
94+
#[macro_export]
95+
macro_rules! assert_range_coverage {
96+
($stream: expr, $expected_range: expr) => {
97+
assert_range_coverage!($stream, $expected_range, timeout = 5)
98+
};
99+
($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
100+
fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
101+
let start_bound = match range.start_bound() {
102+
::std::ops::Bound::Unbounded => 0,
103+
::std::ops::Bound::Excluded(&x) => x + 1,
104+
::std::ops::Bound::Included(&x) => x,
105+
};
106+
let end_bound = match range.end_bound() {
107+
::std::ops::Bound::Unbounded => u64::MAX,
108+
::std::ops::Bound::Excluded(&x) => x - 1,
109+
::std::ops::Bound::Included(&x) => x,
110+
};
111+
(start_bound, end_bound)
112+
}
113+
114+
let original_bounds = bounds(&$expected_range);
115+
let (mut start, end) = original_bounds;
116+
117+
let start_time = ::std::time::Instant::now();
118+
let timeout_duration = ::std::time::Duration::from_secs($secs);
119+
120+
// log all streamed ranges on failures
121+
let mut streamed_ranges = vec![];
122+
123+
while start <= end {
124+
let elapsed = start_time.elapsed();
125+
126+
assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
127+
128+
let time_left = timeout_duration - elapsed;
129+
let message =
130+
tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
131+
.await
132+
.expect("Timed out waiting for the next block range");
133+
134+
match message {
135+
Some( $crate::block_range_scanner::Message::Data(range)) => {
136+
let (streamed_start, streamed_end) = bounds(&range);
137+
streamed_ranges.push(range.clone());
138+
assert!(
139+
start == streamed_start && streamed_end <= end,
140+
"Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
141+
start..=end,
142+
range,
143+
streamed_ranges,
144+
);
145+
start = streamed_end + 1;
146+
}
147+
Some(other) => {
148+
panic!("Expected a block range, got: {other:#?}");
149+
}
150+
None => {
151+
panic!("Stream closed without covering range: {:#?}", start..=end);
152+
}
153+
}
154+
}
155+
}};
156+
}

tests/block_range_scanner.rs

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy::{
55
};
66
use alloy_node_bindings::Anvil;
77
use event_scanner::{
8-
ScannerError, ScannerStatus, assert_closed, assert_empty, assert_next,
8+
ScannerError, ScannerStatus, assert_closed, assert_empty, assert_next, assert_range_coverage,
99
block_range_scanner::BlockRangeScanner,
1010
};
1111

@@ -22,11 +22,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh
2222

2323
provider.anvil_mine(Some(5), None).await?;
2424

25-
assert_next!(stream, 1..=1);
26-
assert_next!(stream, 2..=2);
27-
assert_next!(stream, 3..=3);
28-
assert_next!(stream, 4..=4);
29-
assert_next!(stream, 5..=5);
25+
assert_range_coverage!(stream, 1..=5);
3026
let mut stream = assert_empty!(stream);
3127

3228
provider.anvil_mine(Some(1), None).await?;
@@ -40,11 +36,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh
4036

4137
provider.anvil_mine(Some(5), None).await?;
4238

43-
assert_next!(stream, 6..=6);
44-
assert_next!(stream, 7..=7);
45-
assert_next!(stream, 8..=8);
46-
assert_next!(stream, 9..=9);
47-
assert_next!(stream, 10..=10);
39+
assert_range_coverage!(stream, 6..=10);
4840
let mut stream = assert_empty!(stream);
4941

5042
provider.anvil_mine(Some(1), None).await?;
@@ -97,26 +89,18 @@ async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Re
9789
// assert initial block ranges immediately to avoid Anvil race condition:
9890
//
9991
// when a reorg happens after anvil_mine, Anvil occasionally first streams a non-zero block
100-
// number, which makes it impossible to deterministically assert the next expected block range
92+
// number, which makes it impossible to deterministically assert the first expected block range
10193
// streamed by the scanner
102-
assert_next!(stream, 0..=0);
103-
assert_next!(stream, 1..=1);
104-
assert_next!(stream, 2..=2);
105-
assert_next!(stream, 3..=3);
106-
assert_next!(stream, 4..=4);
107-
assert_next!(stream, 5..=5);
94+
assert_range_coverage!(stream, 0..=5);
95+
let mut stream = assert_empty!(stream);
10896

10997
// reorg less blocks than the block_confirmation config
11098
provider.anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs: vec![] }).await?;
11199
// mint additional blocks so the scanner processes reorged blocks
112100
provider.anvil_mine(Some(5), None).await?;
113101

114102
// no ReorgDetected should be emitted
115-
assert_next!(stream, 6..=6);
116-
assert_next!(stream, 7..=7);
117-
assert_next!(stream, 8..=8);
118-
assert_next!(stream, 9..=9);
119-
assert_next!(stream, 10..=10);
103+
assert_range_coverage!(stream, 6..=10);
120104
assert_empty!(stream);
121105

122106
Ok(())
@@ -137,34 +121,18 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<
137121
// assert initial block ranges immediately to avoid Anvil race condition:
138122
//
139123
// when a reorg happens after anvil_mine, Anvil occasionally first streams a non-zero block
140-
// number, which makes it impossible to deterministically assert the next expected block range
124+
// number, which makes it impossible to deterministically assert the first expected block range
141125
// streamed by the scanner
142-
assert_next!(stream, 0..=0);
143-
assert_next!(stream, 1..=1);
144-
assert_next!(stream, 2..=2);
145-
assert_next!(stream, 3..=3);
146-
assert_next!(stream, 4..=4);
147-
assert_next!(stream, 5..=5);
148-
assert_next!(stream, 6..=6);
149-
assert_next!(stream, 7..=7);
126+
assert_range_coverage!(stream, 0..=7);
127+
let mut stream = assert_empty!(stream);
150128

151129
// reorg more blocks than the block_confirmation config
152130
provider.anvil_reorg(ReorgOptions { depth: 8, tx_block_pairs: vec![] }).await?;
153131
// mint additional blocks
154132
provider.anvil_mine(Some(3), None).await?;
155133

156134
assert_next!(stream, ScannerStatus::ReorgDetected);
157-
assert_next!(stream, 0..=0);
158-
assert_next!(stream, 1..=1);
159-
assert_next!(stream, 2..=2);
160-
assert_next!(stream, 3..=3);
161-
assert_next!(stream, 4..=4);
162-
assert_next!(stream, 5..=5);
163-
assert_next!(stream, 6..=6);
164-
assert_next!(stream, 7..=7);
165-
assert_next!(stream, 8..=8);
166-
assert_next!(stream, 9..=9);
167-
assert_next!(stream, 10..=10);
135+
assert_range_coverage!(stream, 0..=10);
168136
assert_empty!(stream);
169137

170138
Ok(())

0 commit comments

Comments
 (0)