Skip to content

Commit 47c54ac

Browse files
committed
chore: merge main
2 parents c108cc2 + a1d995f commit 47c54ac

File tree

3 files changed

+125
-46
lines changed

3 files changed

+125
-46
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ jobs:
5454

5555
# https://github.com/rust-lang/cargo/issues/6669
5656
- name: Run doc tests
57-
run: cargo test --locked --doc
57+
run: cargo test --locked --doc --all-features --no-fail-fast

src/test_utils/macros.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,115 @@ macro_rules! assert_empty {
4242
tokio_stream::wrappers::ReceiverStream::new(inner)
4343
}};
4444
}
45+
46+
/// Asserts that a stream of block ranges completely covers an expected block range.
47+
///
48+
/// This macro consumes messages from a stream and verifies that the block ranges received
49+
/// sequentially cover the entire `expected_range` without gaps or overlaps. Each streamed
50+
/// range must start exactly where the previous one ended, and all ranges must fit within
51+
/// the expected bounds.
52+
///
53+
/// The macro expects the stream to yield `Message::Data(range)` variants containing
54+
/// `RangeInclusive<u64>` values representing block ranges. It tracks coverage by ensuring
55+
/// each new range starts at the next expected block number and doesn't exceed the end of
56+
/// the expected range. Once the entire range is covered, the assertion succeeds.
57+
///
58+
/// # Example
59+
///
60+
/// ```rust
61+
/// use event_scanner::{assert_range_coverage, block_range_scanner::Message};
62+
/// use tokio::sync::mpsc;
63+
/// use tokio_stream::wrappers::ReceiverStream;
64+
///
65+
/// #[tokio::test]
66+
/// async fn test_scanner_covers_range() {
67+
/// let (tx, rx) = mpsc::channel(10);
68+
/// let mut stream = ReceiverStream::new(rx);
69+
///
70+
/// // Simulate a scanner that splits blocks 100-199 into chunks
71+
/// tokio::spawn(async move {
72+
/// tx.send(Message::Data(100..=149)).await.unwrap();
73+
/// tx.send(Message::Data(150..=199)).await.unwrap();
74+
/// });
75+
///
76+
/// // Assert that the stream covers blocks 100-199
77+
/// assert_range_coverage!(stream, 100..=199);
78+
/// }
79+
/// ```
80+
///
81+
/// # Panics
82+
///
83+
/// * **Timeout**: The stream doesn't produce the next expected range within the timeout period
84+
/// (default 5 seconds, configurable via `timeout = N` parameter).
85+
/// * **Gap or overlap**: A streamed range doesn't start exactly at the next expected block number,
86+
/// indicating a gap or overlap in coverage.
87+
/// * **Out of bounds**: A streamed range extends beyond the end of the expected range.
88+
/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
89+
/// when a block range is expected.
90+
/// * **Stream closed early**: The stream ends before the entire expected range is covered.
91+
///
92+
/// On panic, the error message includes the expected remaining range and all previously
93+
/// streamed ranges for debugging.
94+
#[macro_export]
95+
macro_rules! assert_range_coverage {
96+
($stream: expr, $expected_range: expr) => {
97+
assert_range_coverage!($stream, $expected_range, timeout = 5)
98+
};
99+
($stream: expr, $expected_range: expr, timeout = $secs: expr) => {{
100+
fn bounds<R: ::std::ops::RangeBounds<u64>>(range: &R) -> (u64, u64) {
101+
let start_bound = match range.start_bound() {
102+
::std::ops::Bound::Unbounded => 0,
103+
::std::ops::Bound::Excluded(&x) => x + 1,
104+
::std::ops::Bound::Included(&x) => x,
105+
};
106+
let end_bound = match range.end_bound() {
107+
::std::ops::Bound::Unbounded => u64::MAX,
108+
::std::ops::Bound::Excluded(&x) => x - 1,
109+
::std::ops::Bound::Included(&x) => x,
110+
};
111+
(start_bound, end_bound)
112+
}
113+
114+
let original_bounds = bounds(&$expected_range);
115+
let (mut start, end) = original_bounds;
116+
117+
let start_time = ::std::time::Instant::now();
118+
let timeout_duration = ::std::time::Duration::from_secs($secs);
119+
120+
// log all streamed ranges on failures
121+
let mut streamed_ranges = vec![];
122+
123+
while start <= end {
124+
let elapsed = start_time.elapsed();
125+
126+
assert!(elapsed < timeout_duration, "Timed out. Still expecting: {:#?}", start..=end,);
127+
128+
let time_left = timeout_duration - elapsed;
129+
let message =
130+
tokio::time::timeout(time_left, tokio_stream::StreamExt::next(&mut $stream))
131+
.await
132+
.expect("Timed out waiting for the next block range");
133+
134+
match message {
135+
Some( $crate::block_range_scanner::Message::Data(range)) => {
136+
let (streamed_start, streamed_end) = bounds(&range);
137+
streamed_ranges.push(range.clone());
138+
assert!(
139+
start == streamed_start && streamed_end <= end,
140+
"Unexpected range bounds, expected max. range: {:#?}, got: {:#?}\nPrevious streams:\n{:#?}",
141+
start..=end,
142+
range,
143+
streamed_ranges,
144+
);
145+
start = streamed_end + 1;
146+
}
147+
Some(other) => {
148+
panic!("Expected a block range, got: {other:#?}");
149+
}
150+
None => {
151+
panic!("Stream closed without covering range: {:#?}", start..=end);
152+
}
153+
}
154+
}
155+
}};
156+
}

tests/block_range_scanner.rs

Lines changed: 12 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy::{
55
};
66
use alloy_node_bindings::Anvil;
77
use event_scanner::{
8-
ScannerError, ScannerStatus, assert_closed, assert_empty, assert_next,
8+
ScannerError, ScannerStatus, assert_closed, assert_empty, assert_next, assert_range_coverage,
99
block_range_scanner::BlockRangeScanner,
1010
};
1111

@@ -22,11 +22,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh
2222

2323
provider.anvil_mine(Some(5), None).await?;
2424

25-
assert_next!(stream, 1..=1);
26-
assert_next!(stream, 2..=2);
27-
assert_next!(stream, 3..=3);
28-
assert_next!(stream, 4..=4);
29-
assert_next!(stream, 5..=5);
25+
assert_range_coverage!(stream, 1..=5);
3026
let mut stream = assert_empty!(stream);
3127

3228
provider.anvil_mine(Some(1), None).await?;
@@ -40,11 +36,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh
4036

4137
provider.anvil_mine(Some(5), None).await?;
4238

43-
assert_next!(stream, 6..=6);
44-
assert_next!(stream, 7..=7);
45-
assert_next!(stream, 8..=8);
46-
assert_next!(stream, 9..=9);
47-
assert_next!(stream, 10..=10);
39+
assert_range_coverage!(stream, 6..=10);
4840
let mut stream = assert_empty!(stream);
4941

5042
provider.anvil_mine(Some(1), None).await?;
@@ -75,11 +67,7 @@ async fn live_with_block_confirmations_always_emits_genesis_block() -> anyhow::R
7567

7668
provider.anvil_mine(Some(5), None).await?;
7769

78-
assert_next!(stream, 1..=1);
79-
assert_next!(stream, 2..=2);
80-
assert_next!(stream, 3..=3);
81-
assert_next!(stream, 4..=4);
82-
assert_next!(stream, 5..=5);
70+
assert_range_coverage!(stream, 1..=5);
8371
let mut stream = assert_empty!(stream);
8472

8573
provider.anvil_mine(Some(1), None).await?;
@@ -133,26 +121,18 @@ async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Re
133121
// assert initial block ranges immediately to avoid Anvil race condition:
134122
//
135123
// when a reorg happens after anvil_mine, Anvil occasionally first streams a non-zero block
136-
// number, which makes it impossible to deterministically assert the next expected block range
124+
// number, which makes it impossible to deterministically assert the first expected block range
137125
// streamed by the scanner
138-
assert_next!(stream, 0..=0);
139-
assert_next!(stream, 1..=1);
140-
assert_next!(stream, 2..=2);
141-
assert_next!(stream, 3..=3);
142-
assert_next!(stream, 4..=4);
143-
assert_next!(stream, 5..=5);
126+
assert_range_coverage!(stream, 0..=5);
127+
let mut stream = assert_empty!(stream);
144128

145129
// reorg less blocks than the block_confirmation config
146130
provider.anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs: vec![] }).await?;
147131
// mint additional blocks so the scanner processes reorged blocks
148132
provider.anvil_mine(Some(5), None).await?;
149133

150134
// no ReorgDetected should be emitted
151-
assert_next!(stream, 6..=6);
152-
assert_next!(stream, 7..=7);
153-
assert_next!(stream, 8..=8);
154-
assert_next!(stream, 9..=9);
155-
assert_next!(stream, 10..=10);
135+
assert_range_coverage!(stream, 6..=10);
156136
assert_empty!(stream);
157137

158138
Ok(())
@@ -173,16 +153,10 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<
173153
// assert initial block ranges immediately to avoid Anvil race condition:
174154
//
175155
// when a reorg happens after anvil_mine, Anvil occasionally first streams a non-zero block
176-
// number, which makes it impossible to deterministically assert the next expected block range
156+
// number, which makes it impossible to deterministically assert the first expected block range
177157
// streamed by the scanner
178-
assert_next!(stream, 0..=0);
179-
assert_next!(stream, 1..=1);
180-
assert_next!(stream, 2..=2);
181-
assert_next!(stream, 3..=3);
182-
assert_next!(stream, 4..=4);
183-
assert_next!(stream, 5..=5);
184-
assert_next!(stream, 6..=6);
185-
assert_next!(stream, 7..=7);
158+
assert_range_coverage!(stream, 0..=7);
159+
let mut stream = assert_empty!(stream);
186160

187161
// reorg more blocks than the block_confirmation config
188162
provider.anvil_reorg(ReorgOptions { depth: 8, tx_block_pairs: vec![] }).await?;
@@ -191,14 +165,7 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result<
191165
provider.anvil_mine(Some(3), None).await?;
192166

193167
assert_next!(stream, ScannerStatus::ReorgDetected);
194-
assert_next!(stream, 3..=3);
195-
assert_next!(stream, 4..=4);
196-
assert_next!(stream, 5..=5);
197-
assert_next!(stream, 6..=6);
198-
assert_next!(stream, 7..=7);
199-
assert_next!(stream, 8..=8);
200-
assert_next!(stream, 9..=9);
201-
assert_next!(stream, 10..=10);
168+
assert_range_coverage!(stream, 3..=10);
202169
assert_empty!(stream);
203170

204171
Ok(())

0 commit comments

Comments
 (0)