@@ -19,23 +19,47 @@ macro_rules! assert_next {
1919}
2020
2121#[ macro_export]
22- macro_rules! assert_next_any {
22+ macro_rules! assert_event_sequence {
2323 ( $stream: expr, $expected_options: expr) => {
24- assert_next_any !( $stream, $expected_options, timeout = 5 )
24+ assert_event_sequence !( $stream, $expected_options, timeout = 5 )
2525 } ;
2626 ( $stream: expr, $expected_options: expr, timeout = $secs: expr) => {
27- let message = tokio:: time:: timeout(
28- std:: time:: Duration :: from_secs( $secs) ,
29- tokio_stream:: StreamExt :: next( & mut $stream) ,
30- )
31- . await
32- . expect( "timed out" ) ;
27+ let expected_options = $expected_options;
28+ if expected_options. is_empty( ) {
29+ panic!( "assert_event_sequence! called with empty array. Use assert_empty! macro instead to check for no pending messages." ) ;
30+ }
3331
34- if let Some ( data) = message {
35- let matched = $expected_options. iter( ) . any( |expected| data == * expected) ;
36- assert!( matched, "Expected one of:\n {:#?}\n \n Got:\n {:#?}" , $expected_options, data) ;
37- } else {
38- panic!( "Expected one of {:?}, but channel was closed" , $expected_options)
32+ let mut remaining = expected_options. iter( ) ;
33+ let start = std:: time:: Instant :: now( ) ;
34+ let timeout_duration = std:: time:: Duration :: from_secs( $secs) ;
35+
36+ while let Some ( expected) = remaining. next( ) {
37+ let elapsed = start. elapsed( ) ;
38+ if elapsed >= timeout_duration {
39+ panic!( "Timed out waiting for events. Still expecting: {:#?}" , remaining) ;
40+ }
41+
42+ let time_left = timeout_duration - elapsed;
43+ let message =
44+ tokio:: time:: timeout( time_left, tokio_stream:: StreamExt :: next( & mut $stream) )
45+ . await
46+ . expect( "timed out waiting for next batch" ) ;
47+
48+ match message {
49+ Some ( $crate:: ScannerMessage :: Data ( batch) ) => {
50+ let mut batch = batch. iter( ) ;
51+ let event = batch. next( ) . expect( "Streamed batch should not be empty" ) ;
52+ assert_eq!( & alloy:: sol_types:: SolEvent :: encode_log_data( expected) , event. data( ) , "Unexpected event: {:#?}\n Expected: {:#?}\n Remaining: {:#?}" , event, expected, remaining) ;
53+ while let Some ( event) = batch. next( ) {
54+ let expected = remaining. next( ) . unwrap_or_else( || panic!( "Received more events than expected, current: {:#?}\n Streamed batch: {:#?}" , event, batch) ) ;
55+ assert_eq!( & alloy:: sol_types:: SolEvent :: encode_log_data( expected) , event. data( ) , "Unexpected event: {:#?}\n Expected: {:#?}\n Remaining: {:#?}" , event, expected, remaining) ;
56+ }
57+ }
58+ Some ( other) => {
59+ panic!( "Expected ScannerMessage::Data, got: {:#?}" , other) ;
60+ }
61+ None => { panic!( "Stream closed while still expecting: {:#?}" , remaining) ; }
62+ }
3963 }
4064 } ;
4165}
0 commit comments