1+ use alloy:: primitives:: LogData ;
2+ use tokio_stream:: Stream ;
3+
4+ use crate :: Message ;
5+
16#[ macro_export]
27macro_rules! assert_next {
38 ( $stream: expr, $expected: expr) => {
@@ -13,7 +18,7 @@ macro_rules! assert_next {
1318 if let Some ( msg) = message {
1419 assert_eq!( msg, $expected)
1520 } else {
16- panic!( "Expected {:?}, got: {message:?} " , $expected)
21+ panic!( "Expected {:?}, but channel was closed " , $expected)
1722 }
1823 } ;
1924}
@@ -43,6 +48,173 @@ macro_rules! assert_empty {
4348 } } ;
4449}
4550
51+ /// Asserts that a stream emits a specific sequence of events in order.
52+ ///
53+ /// This macro consumes messages from a stream and verifies that the provided events are emitted
54+ /// in the exact order specified, regardless of how they are batched. The stream may emit events
55+ /// across multiple batches or all at once—the macro handles both cases. It ensures no unexpected
56+ /// events appear between the expected ones and that the sequence completes exactly as specified.
57+ ///
58+ /// The macro accepts events of any type implementing [`SolEvent`](alloy::sol_types::SolEvent).
59+ /// Events are compared by their encoded log data, allowing flexible matching across different
60+ /// batch configurations while maintaining strict ordering requirements.
61+ ///
62+ /// # Examples
63+ ///
64+ /// ```no_run
65+ /// # use alloy::sol;
66+ /// sol! {
67+ /// event CountIncreased(uint256 newCount);
68+ /// }
69+ ///
70+ /// #[tokio::test]
71+ /// async fn test_event_order() {
72+ /// // scanner setup...
73+ ///
74+ /// let mut stream = scanner.subscribe(EventFilter::new().contract_address(contract_address));
75+ ///
76+ /// // Assert these two events are emitted in order
77+ /// assert_event_sequence!(
78+ /// stream,
79+ /// &[
80+ /// CountIncreased { newCount: U256::from(1) },
81+ /// CountIncreased { newCount: U256::from(2) },
82+ /// ]
83+ /// );
84+ /// }
85+ /// ```
86+ ///
87+ /// The assertion passes whether events arrive in separate batches or together:
88+ /// * **Separate batches**: `[Event1]`, then `[Event2]`
89+ /// * **Single batch**: `[Event1, Event2]`
90+ ///
91+ /// # Panics
92+ ///
93+ /// * **Timeout**: The stream doesn't produce the next expected event within the timeout period
94+ /// (default 5 seconds, configurable via `timeout = N` parameter).
95+ /// * **Wrong event**: The stream emits a different event than the next expected one in the
96+ /// sequence.
97+ /// * **Extra events**: The stream emits more events than expected after the sequence completes.
98+ /// * **Stream closed early**: The stream ends before all expected events are emitted.
99+ /// * **Wrong message type**: The stream yields a non-`Data` message (e.g., `Error` or `Status`)
100+ /// when an event is expected.
101+ /// * **Empty sequence**: The macro is called with an empty event collection (use `assert_empty!`
102+ /// instead).
103+ ///
104+ /// On panic, the error message includes the remaining expected events for debugging.
105+ #[ macro_export]
106+ macro_rules! assert_event_sequence {
107+ // owned slices just pass to the borrowed slices variant
108+ ( $stream: expr, [ $( $event: expr) ,+ $( , ) ?] ) => {
109+ assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = 5 )
110+ } ;
111+ ( $stream: expr, [ $( $event: expr) ,+ $( , ) ?] , timeout = $secs: expr) => {
112+ assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = $secs)
113+ } ;
114+ // borrowed slices
115+ ( $stream: expr, & [ $( $event: expr) ,+ $( , ) ?] ) => {
116+ assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = 5 )
117+ } ;
118+ ( $stream: expr, & [ $( $event: expr) ,+ $( , ) ?] , timeout = $secs: expr) => {
119+ let expected_options = & [ $( alloy:: sol_types:: SolEvent :: encode_log_data( & $event) ) ,+] ;
120+
121+ $crate:: test_utils:: macros:: assert_event_sequence( & mut $stream, expected_options, $secs) . await
122+ } ;
123+ // variables and non-slice expressions
124+ ( $stream: expr, $events: expr) => {
125+ assert_event_sequence!( $stream, $events, timeout = 5 )
126+ } ;
127+ ( $stream: expr, $events: expr, timeout = $secs: expr) => {
128+ let expected_options = $events. iter( ) . map( alloy:: sol_types:: SolEvent :: encode_log_data) . collect:: <Vec <_>>( ) ;
129+ if expected_options. is_empty( ) {
130+ panic!( "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages." )
131+ }
132+ $crate:: test_utils:: macros:: assert_event_sequence( & mut $stream, expected_options. iter( ) , $secs) . await
133+ } ;
134+ }
135+
136+ /// Same as [`assert_event_sequence!`], but invokes [`assert_empty!`] at the end.
137+ #[ macro_export]
138+ macro_rules! assert_event_sequence_final {
139+ // owned slices
140+ ( $stream: expr, [ $( $event: expr) ,+ $( , ) ?] ) => { {
141+ assert_event_sequence_final!( $stream, & [ $( $event) ,+] )
142+ } } ;
143+ ( $stream: expr, [ $( $event: expr) ,+ $( , ) ?] , timeout = $secs: expr) => { {
144+ assert_event_sequence_final!( $stream, & [ $( $event) ,+] , timeout = $secs)
145+ } } ;
146+ // borrowed slices
147+ ( $stream: expr, & [ $( $event: expr) ,+ $( , ) ?] ) => { {
148+ assert_event_sequence_final!( $stream, & [ $( $event) ,+] , timeout = 5 )
149+ } } ;
150+ ( $stream: expr, & [ $( $event: expr) ,+ $( , ) ?] , timeout = $secs: expr) => { {
151+ $crate:: assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = $secs) ;
152+ $crate:: assert_empty!( $stream)
153+ } } ;
154+ // variables and non-slice expressions
155+ ( $stream: expr, $events: expr) => { {
156+ assert_event_sequence_final!( $stream, $events, timeout = 5 )
157+ } } ;
158+ ( $stream: expr, $events: expr, timeout = $secs: expr) => { {
159+ $crate:: assert_event_sequence!( $stream, $events, timeout = $secs) ;
160+ $crate:: assert_empty!( $stream)
161+ } } ;
162+ }
163+
164+ #[ allow( clippy:: missing_panics_doc) ]
165+ pub async fn assert_event_sequence < S : Stream < Item = Message > + Unpin > (
166+ stream : & mut S ,
167+ expected_options : impl IntoIterator < Item = & LogData > ,
168+ timeout_secs : u64 ,
169+ ) {
170+ let mut remaining = expected_options. into_iter ( ) ;
171+ let start = std:: time:: Instant :: now ( ) ;
172+ let timeout_duration = std:: time:: Duration :: from_secs ( timeout_secs) ;
173+
174+ while let Some ( expected) = remaining. next ( ) {
175+ let elapsed = start. elapsed ( ) ;
176+
177+ assert ! (
178+ elapsed < timeout_duration,
179+ "Timed out waiting for events. Still expecting: {:#?}" ,
180+ remaining. collect:: <Vec <_>>( )
181+ ) ;
182+
183+ let time_left = timeout_duration - elapsed;
184+ let message = tokio:: time:: timeout ( time_left, tokio_stream:: StreamExt :: next ( stream) )
185+ . await
186+ . expect ( "timed out waiting for next batch" ) ;
187+
188+ match message {
189+ Some ( Message :: Data ( batch) ) => {
190+ let mut batch = batch. iter ( ) ;
191+ let event = batch. next ( ) . expect ( "Streamed batch should not be empty" ) ;
192+ assert_eq ! (
193+ expected,
194+ event. data( ) ,
195+ "\n Remaining: {:#?}\n " ,
196+ remaining. collect:: <Vec <_>>( )
197+ ) ;
198+ while let Some ( event) = batch. next ( ) {
199+ let expected = remaining. next ( ) . unwrap_or_else ( || panic ! ( "Received more events than expected.\n Next event: {:#?}\n Streamed remaining: {batch:#?}" , event. data( ) ) ) ;
200+ assert_eq ! (
201+ expected,
202+ event. data( ) ,
203+ "\n Remaining: {:#?}\n " ,
204+ remaining. collect:: <Vec <_>>( )
205+ ) ;
206+ }
207+ }
208+ Some ( other) => {
209+ panic ! ( "Expected Message::Data, got: {other:#?}" ) ;
210+ }
211+ None => {
212+ panic ! ( "Stream closed while still expecting: {:#?}" , remaining. collect:: <Vec <_>>( ) ) ;
213+ }
214+ }
215+ }
216+ }
217+
46218/// Asserts that a stream of block ranges completely covers an expected block range.
47219///
48220/// This macro consumes messages from a stream and verifies that the block ranges received
@@ -154,3 +326,35 @@ macro_rules! assert_range_coverage {
154326 }
155327 } } ;
156328}
329+
330+ #[ cfg( test) ]
331+ mod tests {
332+ use alloy:: sol;
333+ use tokio:: sync:: mpsc;
334+ use tokio_stream:: wrappers:: ReceiverStream ;
335+
336+ sol ! {
337+ #[ derive( Debug ) ]
338+ event Transfer ( address indexed from, address indexed to, uint256 value) ;
339+ }
340+
341+ #[ tokio:: test]
342+ #[ should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages." ]
343+ async fn assert_event_sequence_macro_with_empty_vec ( ) {
344+ let ( _tx, rx) = mpsc:: channel ( 10 ) ;
345+ let mut stream = ReceiverStream :: new ( rx) ;
346+
347+ let empty_vec: Vec < Transfer > = Vec :: new ( ) ;
348+ assert_event_sequence ! ( stream, empty_vec) ;
349+ }
350+
351+ #[ tokio:: test]
352+ #[ should_panic = "error: assert_event_sequence! called with an empty collection. Use assert_empty! macro instead to check for no pending messages." ]
353+ async fn assert_event_sequence_macro_with_empty_slice ( ) {
354+ let ( _tx, rx) = mpsc:: channel ( 10 ) ;
355+ let mut stream = ReceiverStream :: new ( rx) ;
356+
357+ let empty_vec: & [ Transfer ] = & [ ] ;
358+ assert_event_sequence ! ( stream, empty_vec) ;
359+ }
360+ }
0 commit comments