1+ use alloy:: primitives:: LogData ;
2+ use tokio_stream:: Stream ;
3+
4+ use crate :: Message ;
5+
16#[ macro_export]
27macro_rules! assert_next {
38 ( $stream: expr, $expected: expr) => {
@@ -22,48 +27,96 @@ macro_rules! assert_next {
2227
2328#[ macro_export]
2429macro_rules! assert_event_sequence {
25- ( $stream: expr, $expected_options: expr) => {
26- assert_event_sequence!( $stream, $expected_options, timeout = 5 )
30+ // owned slices just pass to the borrowed slices variant
31+ ( $stream: expr, [ $( $event: expr) ,+ $( , ) ?] ) => {
32+ assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = 5 )
2733 } ;
28- ( $stream: expr, $expected_options: expr, timeout = $secs: expr) => {
29- let expected_options = $expected_options;
30- if expected_options. is_empty( ) {
31- panic!( "assert_event_sequence! called with empty array. Use assert_empty! macro instead to check for no pending messages." ) ;
32- }
34+ ( $stream: expr, [ $( $event: expr) ,+ $( , ) ?] , timeout = $secs: expr) => {
35+ assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = $secs)
36+ } ;
37+ // borrowed slices
38+ ( $stream: expr, & [ $( $event: expr) ,+ $( , ) ?] ) => {
39+ assert_event_sequence!( $stream, & [ $( $event) ,+] , timeout = 5 )
40+ } ;
41+ ( $stream: expr, & [ $( $event: expr) ,+ $( , ) ?] , timeout = $secs: expr) => {
42+ let expected_options = & [
43+ $(
44+ {
45+ let event = & $event;
46+ let encoded_data = alloy:: sol_types:: SolEvent :: encode_log_data( event) ;
47+ let debug_string = format!( "{:#?}" , event) ;
48+ ( encoded_data, debug_string)
49+ }
50+ ) ,+
51+ ] ;
3352
34- let mut remaining = expected_options. iter( ) ;
35- let start = std:: time:: Instant :: now( ) ;
36- let timeout_duration = std:: time:: Duration :: from_secs( $secs) ;
53+ $crate:: test_utils:: macros:: assert_event_sequence( & mut $stream, expected_options, $secs) . await
54+ } ;
55+ // variables and non-slice expressions
56+ ( $stream: expr, $events: expr) => {
57+ assert_event_sequence!( $stream, $events, timeout = 5 )
58+ } ;
59+ ( $stream: expr, $events: expr, timeout = $secs: expr) => {
60+ let expected_options = $events. iter( ) . map( |e| ( alloy:: sol_types:: SolEvent :: encode_log_data( e) , format!( "{e:#?}" ) ) ) . collect:: <Vec <_>>( ) ;
61+ $crate:: test_utils:: macros:: assert_event_sequence( & mut $stream, expected_options. iter( ) , $secs) . await
62+ } ;
63+ }
3764
38- while let Some ( expected) = remaining. next( ) {
39- let elapsed = start. elapsed( ) ;
40- if elapsed >= timeout_duration {
41- panic!( "Timed out waiting for events. Still expecting: {:#?}" , remaining) ;
42- }
65+ pub async fn assert_event_sequence < S : Stream < Item = Message > + Unpin > (
66+ stream : & mut S ,
67+ expected_options : impl IntoIterator < Item = & ( LogData , String ) > ,
68+ timeout_secs : u64 ,
69+ ) {
70+ let mut remaining = expected_options. into_iter ( ) ;
71+ let start = std:: time:: Instant :: now ( ) ;
72+ let timeout_duration = std:: time:: Duration :: from_secs ( timeout_secs) ;
4373
44- let time_left = timeout_duration - elapsed;
45- let message =
46- tokio:: time:: timeout( time_left, tokio_stream:: StreamExt :: next( & mut $stream) )
47- . await
48- . expect( "timed out waiting for next batch" ) ;
74+ while let Some ( expected) = remaining. next ( ) {
75+ let elapsed = start. elapsed ( ) ;
76+ if elapsed >= timeout_duration {
77+ panic ! (
78+ "Timed out waiting for events. Still expecting: {:#?}" ,
79+ remaining. collect:: <Vec <_>>( )
80+ ) ;
81+ }
4982
50- match message {
51- Some ( $crate:: ScannerMessage :: Data ( batch) ) => {
52- let mut batch = batch. iter( ) ;
53- let event = batch. next( ) . expect( "Streamed batch should not be empty" ) ;
54- assert_eq!( & alloy:: sol_types:: SolEvent :: encode_log_data( expected) , event. data( ) , "Unexpected event: {:#?}\n Expected: {:#?}\n Remaining: {:#?}" , event, expected, remaining) ;
55- while let Some ( event) = batch. next( ) {
56- let expected = remaining. next( ) . unwrap_or_else( || panic!( "Received more events than expected, current: {:#?}\n Streamed batch: {:#?}" , event, batch) ) ;
57- assert_eq!( & alloy:: sol_types:: SolEvent :: encode_log_data( expected) , event. data( ) , "Unexpected event: {:#?}\n Expected: {:#?}\n Remaining: {:#?}" , event, expected, remaining) ;
58- }
59- }
60- Some ( other) => {
61- panic!( "Expected ScannerMessage::Data, got: {:#?}" , other) ;
83+ let time_left = timeout_duration - elapsed;
84+ let message = tokio:: time:: timeout ( time_left, tokio_stream:: StreamExt :: next ( stream) )
85+ . await
86+ . expect ( "timed out waiting for next batch" ) ;
87+
88+ match message {
89+ Some ( Message :: Data ( batch) ) => {
90+ let mut batch = batch. iter ( ) ;
91+ let event = batch. next ( ) . expect ( "Streamed batch should not be empty" ) ;
92+ assert_eq ! (
93+ & expected. 0 ,
94+ event. data( ) ,
95+ "Unexpected event: {:#?}\n Expected: {}\n Remaining: {:#?}" ,
96+ event,
97+ expected. 1 ,
98+ remaining. collect:: <Vec <_>>( )
99+ ) ;
100+ while let Some ( event) = batch. next ( ) {
101+ let expected = remaining. next ( ) . unwrap_or_else ( || panic ! ( "Received more events than expected, current: {:#?}\n Streamed batch: {:#?}" , event, batch) ) ;
102+ assert_eq ! (
103+ & expected. 0 ,
104+ event. data( ) ,
105+ "Unexpected event: {:#?}\n Expected: {}\n Remaining: {:#?}" ,
106+ event,
107+ expected. 1 ,
108+ remaining. collect:: <Vec <_>>( )
109+ ) ;
62110 }
63- None => { panic!( "Stream closed while still expecting: {:#?}" , remaining) ; }
111+ }
112+ Some ( other) => {
113+ panic ! ( "Expected Message::Data, got: {:#?}" , other) ;
114+ }
115+ None => {
116+ panic ! ( "Stream closed while still expecting: {:#?}" , remaining. collect:: <Vec <_>>( ) ) ;
64117 }
65118 }
66- } ;
119+ }
67120}
68121
69122#[ macro_export]
0 commit comments