Skip to content

Commit f36fee3

Browse files
authored
Merge branch 'main' into from-to-accept-hash
2 parents c43b3b8 + 5682aa8 commit f36fee3

File tree

9 files changed

+509
-390
lines changed

9 files changed

+509
-390
lines changed

src/test_utils/macros.rs

Lines changed: 205 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use alloy::primitives::LogData;
2+
use tokio_stream::Stream;
3+
4+
use crate::Message;
5+
16
#[macro_export]
27
macro_rules! assert_next {
38
($stream: expr, $expected: expr) => {
@@ -13,7 +18,7 @@ macro_rules! assert_next {
1318
if let Some(msg) = message {
1419
assert_eq!(msg, $expected)
1520
} else {
16-
panic!("Expected {:?}, got: {message:?}", $expected)
21+
panic!("Expected {:?}, but channel was closed", $expected)
1722
}
1823
};
1924
}
@@ -43,6 +48,173 @@ macro_rules! assert_empty {
4348
}};
4449
}
4550

51+
/// Asserts that a stream emits a specific sequence of events in order.
52+
///
53+
/// This macro consumes messages from a stream and verifies that the provided events are emitted
54+
/// in the exact order specified, regardless of how they are batched. The stream may emit events
55+
/// across multiple batches or all at once—the macro handles both cases. It ensures no unexpected
56+
/// events appear between the expected ones and that the sequence completes exactly as specified.
57+
///
58+
/// The macro accepts events of any type implementing [`SolEvent`](alloy::sol_types::SolEvent).
59+
/// Events are compared by their encoded log data, allowing flexible matching across different
60+
/// batch configurations while maintaining strict ordering requirements.
61+
///
62+
/// # Examples
63+
///
64+
/// ```no_run
65+
/// # use alloy::sol;
66+
/// sol! {
67+
/// event CountIncreased(uint256 newCount);
68+
/// }
69+
///
70+
/// #[tokio::test]
71+
/// async fn test_event_order() {
72+
/// // scanner setup...
73+
///
74+
/// let mut stream = scanner.subscribe(EventFilter::new().contract_address(contract_address));
75+
///
76+
/// // Assert these two events are emitted in order
77+
/// assert_event_sequence!(
78+
/// stream,
79+
/// &[
80+
/// CountIncreased { newCount: U256::from(1) },
81+
/// CountIncreased { newCount: U256::from(2) },
82+
/// ]
83+
/// );
84+
/// }
85+
/// ```
86+
///
87+
/// The assertion passes whether events arrive in separate batches or together:
88+
/// * **Separate batches**: `[Event1]`, then `[Event2]`
89+
/// * **Single batch**: `[Event1, Event2]`
90+
///
91+
/// # Panics
92+
///
93+
/// * **Timeout**: The stream doesn't produce the next expected event within the timeout period
94+
/// (default 5 seconds, configurable via `timeout = N` parameter).
95+
/// * **Wrong event**: The stream emits a different event than the next expected one in the
96+
/// sequence.
97+
/// * **Extra events**: The stream emits more events than expected after the sequence completes.
98+
/// * **Stream closed early**: The stream ends before all expected events are emitted.
99+
/// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
100+
/// when an event is expected.
101+
/// * **Empty sequence**: The macro is called with an empty event collection (use `assert_empty!`
102+
/// instead).
103+
///
104+
/// On panic, the error message includes the remaining expected events for debugging.
105+
#[macro_export]
106+
macro_rules! assert_event_sequence {
107+
// owned slices just pass to the borrowed slices variant
108+
($stream: expr, [$($event:expr),+ $(,)?]) => {
109+
assert_event_sequence!($stream, &[$($event),+], timeout = 5)
110+
};
111+
($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {
112+
assert_event_sequence!($stream, &[$($event),+], timeout = $secs)
113+
};
114+
// borrowed slices
115+
($stream: expr, &[$($event:expr),+ $(,)?]) => {
116+
assert_event_sequence!($stream, &[$($event),+], timeout = 5)
117+
};
118+
($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {
119+
let expected_options = &[$(alloy::sol_types::SolEvent::encode_log_data(&$event)),+];
120+
121+
$crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options, $secs).await
122+
};
123+
// variables and non-slice expressions
124+
($stream: expr, $events: expr) => {
125+
assert_event_sequence!($stream, $events, timeout = 5)
126+
};
127+
($stream: expr, $events: expr, timeout = $secs: expr) => {
128+
let expected_options = $events.iter().map(alloy::sol_types::SolEvent::encode_log_data).collect::<Vec<_>>();
129+
if expected_options.is_empty() {
130+
panic!("error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages.")
131+
}
132+
$crate::test_utils::macros::assert_event_sequence(&mut $stream, expected_options.iter(), $secs).await
133+
};
134+
}
135+
136+
/// Same as [`assert_event_sequence!`], but invokes [`assert_empty!`] at the end.
137+
#[macro_export]
138+
macro_rules! assert_event_sequence_final {
139+
// owned slices
140+
($stream: expr, [$($event:expr),+ $(,)?]) => {{
141+
assert_event_sequence_final!($stream, &[$($event),+])
142+
}};
143+
($stream: expr, [$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
144+
assert_event_sequence_final!($stream, &[$($event),+], timeout = $secs)
145+
}};
146+
// borrowed slices
147+
($stream: expr, &[$($event:expr),+ $(,)?]) => {{
148+
assert_event_sequence_final!($stream, &[$($event),+], timeout = 5)
149+
}};
150+
($stream: expr, &[$($event:expr),+ $(,)?], timeout = $secs: expr) => {{
151+
$crate::assert_event_sequence!($stream, &[$($event),+], timeout = $secs);
152+
$crate::assert_empty!($stream)
153+
}};
154+
// variables and non-slice expressions
155+
($stream: expr, $events: expr) => {{
156+
assert_event_sequence_final!($stream, $events, timeout = 5)
157+
}};
158+
($stream: expr, $events: expr, timeout = $secs: expr) => {{
159+
$crate::assert_event_sequence!($stream, $events, timeout = $secs);
160+
$crate::assert_empty!($stream)
161+
}};
162+
}
163+
164+
#[allow(clippy::missing_panics_doc)]
165+
pub async fn assert_event_sequence<S: Stream<Item = Message> + Unpin>(
166+
stream: &mut S,
167+
expected_options: impl IntoIterator<Item = &LogData>,
168+
timeout_secs: u64,
169+
) {
170+
let mut remaining = expected_options.into_iter();
171+
let start = std::time::Instant::now();
172+
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
173+
174+
while let Some(expected) = remaining.next() {
175+
let elapsed = start.elapsed();
176+
177+
assert!(
178+
elapsed < timeout_duration,
179+
"Timed out waiting for events. Still expecting: {:#?}",
180+
remaining.collect::<Vec<_>>()
181+
);
182+
183+
let time_left = timeout_duration - elapsed;
184+
let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream))
185+
.await
186+
.expect("timed out waiting for next batch");
187+
188+
match message {
189+
Some(Message::Data(batch)) => {
190+
let mut batch = batch.iter();
191+
let event = batch.next().expect("Streamed batch should not be empty");
192+
assert_eq!(
193+
expected,
194+
event.data(),
195+
"\nRemaining: {:#?}\n",
196+
remaining.collect::<Vec<_>>()
197+
);
198+
while let Some(event) = batch.next() {
199+
let expected = remaining.next().unwrap_or_else(|| panic!("Received more events than expected.\nNext event: {:#?}\nStreamed remaining: {batch:#?}", event.data()));
200+
assert_eq!(
201+
expected,
202+
event.data(),
203+
"\nRemaining: {:#?}\n",
204+
remaining.collect::<Vec<_>>()
205+
);
206+
}
207+
}
208+
Some(other) => {
209+
panic!("Expected Message::Data, got: {other:#?}");
210+
}
211+
None => {
212+
panic!("Stream closed while still expecting: {:#?}", remaining.collect::<Vec<_>>());
213+
}
214+
}
215+
}
216+
}
217+
46218
/// Asserts that a stream of block ranges completely covers an expected block range.
47219
///
48220
/// This macro consumes messages from a stream and verifies that the block ranges received
@@ -154,3 +326,35 @@ macro_rules! assert_range_coverage {
154326
}
155327
}};
156328
}
329+
330+
#[cfg(test)]
331+
mod tests {
332+
use alloy::sol;
333+
use tokio::sync::mpsc;
334+
use tokio_stream::wrappers::ReceiverStream;
335+
336+
sol! {
337+
#[derive(Debug)]
338+
event Transfer(address indexed from, address indexed to, uint256 value);
339+
}
340+
341+
#[tokio::test]
342+
#[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
343+
async fn assert_event_sequence_macro_with_empty_vec() {
344+
let (_tx, rx) = mpsc::channel(10);
345+
let mut stream = ReceiverStream::new(rx);
346+
347+
let empty_vec: Vec<Transfer> = Vec::new();
348+
assert_event_sequence!(stream, empty_vec);
349+
}
350+
351+
#[tokio::test]
352+
#[should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages."]
353+
async fn assert_event_sequence_macro_with_empty_slice() {
354+
let (_tx, rx) = mpsc::channel(10);
355+
let mut stream = ReceiverStream::new(rx);
356+
357+
let empty_vec: &[Transfer] = &[];
358+
assert_event_sequence!(stream, empty_vec);
359+
}
360+
}

tests/historic/basic.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,9 @@ use crate::common::{TestCounter, setup_historic_scanner};
55

66
#[tokio::test]
77
async fn processes_events_within_specified_historical_range() -> anyhow::Result<()> {
8-
let setup = setup_historic_scanner(
9-
Some(0.1),
10-
None,
11-
BlockNumberOrTag::Earliest,
12-
BlockNumberOrTag::Latest,
13-
)
14-
.await?;
8+
let setup =
9+
setup_historic_scanner(None, None, BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest)
10+
.await?;
1511
let contract = setup.contract;
1612
let scanner = setup.scanner;
1713
let mut stream = setup.stream;

tests/latest_events/basic.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::common::{TestCounter, deploy_counter, setup_common, setup_latest_scan
66
use event_scanner::{EventFilter, EventScannerBuilder, assert_closed, assert_next};
77

88
#[tokio::test]
9-
async fn latest_scanner_exact_count_returns_last_events_in_order() -> anyhow::Result<()> {
9+
async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> {
1010
let count = 5;
1111
let setup = setup_latest_scanner(None, None, count, None, None).await?;
1212
let contract = setup.contract;
@@ -36,7 +36,7 @@ async fn latest_scanner_exact_count_returns_last_events_in_order() -> anyhow::Re
3636
}
3737

3838
#[tokio::test]
39-
async fn latest_scanner_fewer_available_than_count_returns_all() -> anyhow::Result<()> {
39+
async fn fewer_available_than_count_returns_all() -> anyhow::Result<()> {
4040
let count = 5;
4141
let setup = setup_latest_scanner(None, None, count, None, None).await?;
4242
let contract = setup.contract;
@@ -64,7 +64,7 @@ async fn latest_scanner_fewer_available_than_count_returns_all() -> anyhow::Resu
6464
}
6565

6666
#[tokio::test]
67-
async fn latest_scanner_no_events_returns_empty() -> anyhow::Result<()> {
67+
async fn no_events_returns_empty() -> anyhow::Result<()> {
6868
let count = 5;
6969
let setup = setup_latest_scanner(None, None, count, None, None).await?;
7070
let scanner = setup.scanner;
@@ -81,7 +81,7 @@ async fn latest_scanner_no_events_returns_empty() -> anyhow::Result<()> {
8181
}
8282

8383
#[tokio::test]
84-
async fn latest_scanner_respects_range_subset() -> anyhow::Result<()> {
84+
async fn respects_range_subset() -> anyhow::Result<()> {
8585
let (_anvil, provider, contract, default_filter) = setup_common(None, None).await?;
8686
// Mine 6 events, one per tx (auto-mined), then manually mint 2 empty blocks to widen range
8787
contract.increase().send().await?.watch().await?;
@@ -118,8 +118,7 @@ async fn latest_scanner_respects_range_subset() -> anyhow::Result<()> {
118118
}
119119

120120
#[tokio::test]
121-
async fn latest_scanner_multiple_listeners_to_same_event_receive_same_results() -> anyhow::Result<()>
122-
{
121+
async fn multiple_listeners_to_same_event_receive_same_results() -> anyhow::Result<()> {
123122
let count = 5;
124123
let setup = setup_latest_scanner(None, None, count, None, None).await?;
125124
let contract = setup.contract;
@@ -161,7 +160,7 @@ async fn latest_scanner_multiple_listeners_to_same_event_receive_same_results()
161160
}
162161

163162
#[tokio::test]
164-
async fn latest_scanner_different_filters_receive_different_results() -> anyhow::Result<()> {
163+
async fn different_filters_receive_different_results() -> anyhow::Result<()> {
165164
let count = 3;
166165
let setup = setup_latest_scanner(None, None, count, None, None).await?;
167166
let contract = setup.contract;
@@ -216,7 +215,7 @@ async fn latest_scanner_different_filters_receive_different_results() -> anyhow:
216215
}
217216

218217
#[tokio::test]
219-
async fn latest_scanner_mixed_events_and_filters_return_correct_streams() -> anyhow::Result<()> {
218+
async fn mixed_events_and_filters_return_correct_streams() -> anyhow::Result<()> {
220219
let count = 2;
221220
let setup = setup_latest_scanner(None, None, count, None, None).await?;
222221
let contract = setup.contract;
@@ -229,17 +228,11 @@ async fn latest_scanner_mixed_events_and_filters_return_correct_streams() -> any
229228
.event(TestCounter::CountDecreased::SIGNATURE);
230229
let mut stream_dec = scanner.subscribe(filter_dec);
231230

232-
// Sequence: inc(1), inc(2), dec(1), inc(2), dec(1)
233-
// inc -> 1
234-
contract.increase().send().await?.watch().await?;
235-
// inc -> 2
236-
contract.increase().send().await?.watch().await?;
237-
// dec -> 1
238-
contract.decrease().send().await?.watch().await?;
239-
// inc -> 2
240-
contract.increase().send().await?.watch().await?;
241-
// dec -> 1
242-
contract.decrease().send().await?.watch().await?;
231+
contract.increase().send().await?.watch().await?; // inc(1)
232+
contract.increase().send().await?.watch().await?; // inc(2)
233+
contract.decrease().send().await?.watch().await?; // dec(1)
234+
contract.increase().send().await?.watch().await?; // inc(2)
235+
contract.decrease().send().await?.watch().await?; // dec(1)
243236

244237
scanner.start().await?;
245238

@@ -265,7 +258,7 @@ async fn latest_scanner_mixed_events_and_filters_return_correct_streams() -> any
265258
}
266259

267260
#[tokio::test]
268-
async fn latest_scanner_ignores_non_tracked_contract() -> anyhow::Result<()> {
261+
async fn ignores_non_tracked_contract() -> anyhow::Result<()> {
269262
// Manual setup to deploy two contracts
270263
let setup = setup_latest_scanner(None, None, 5, None, None).await?;
271264
let provider = setup.provider;
@@ -300,7 +293,7 @@ async fn latest_scanner_ignores_non_tracked_contract() -> anyhow::Result<()> {
300293
}
301294

302295
#[tokio::test]
303-
async fn latest_scanner_large_gaps_and_empty_ranges() -> anyhow::Result<()> {
296+
async fn large_gaps_and_empty_ranges() -> anyhow::Result<()> {
304297
// Manual setup to mine empty blocks
305298
let (_anvil, provider, contract, default_filter) = setup_common(None, None).await?;
306299

@@ -337,7 +330,7 @@ async fn latest_scanner_large_gaps_and_empty_ranges() -> anyhow::Result<()> {
337330
}
338331

339332
#[tokio::test]
340-
async fn latest_scanner_boundary_range_single_block() -> anyhow::Result<()> {
333+
async fn boundary_range_single_block() -> anyhow::Result<()> {
341334
let (_anvil, provider, contract, default_filter) = setup_common(None, None).await?;
342335

343336
contract.increase().send().await?.watch().await?;

0 commit comments

Comments
 (0)