@@ -965,14 +965,38 @@ mod tests {
965965 } ;
966966 use alloy_node_bindings:: Anvil ;
967967 use serde_json:: { Value , json} ;
968- use tokio:: { sync:: mpsc, time:: timeout} ;
968+ use tokio:: {
969+ sync:: mpsc:: { self , Receiver } ,
970+ time:: timeout,
971+ } ;
969972 use tokio_stream:: StreamExt ;
970973
971974 use super :: * ;
972975
976+ // Trait to enable receiver-type-agnostic range receival
977+ trait RangeReceiver {
978+ async fn next_range ( & mut self ) -> Option < BlockRangeMessage > ;
979+ }
980+
981+ impl RangeReceiver for ReceiverStream < BlockRangeMessage > {
982+ async fn next_range ( & mut self ) -> Option < BlockRangeMessage > {
983+ self . next ( ) . await
984+ }
985+ }
986+
987+ impl RangeReceiver for Receiver < BlockRangeMessage > {
988+ async fn next_range ( & mut self ) -> Option < BlockRangeMessage > {
989+ self . recv ( ) . await
990+ }
991+ }
992+
973993 macro_rules! assert_next_range {
974- ( $stream: expr, $range: expr) => {
975- let next = $stream. next( ) . await ;
994+ ( $recv: expr, None ) => {
995+ let next = $recv. next_range( ) . await ;
996+ assert!( next. is_none( ) ) ;
997+ } ;
998+ ( $recv: expr, $range: expr) => {
999+ let next = $recv. next_range( ) . await ;
9761000 if let Some ( BlockRangeMessage :: Data ( range) ) = next {
9771001 assert_eq!( $range, range) ;
9781002 } else {
@@ -1393,27 +1417,27 @@ mod tests {
13931417
13941418 // ranges where each batch is of max blocks per epoch size
13951419 let mut stream = client. stream_historical ( 0 , 19 ) . await ?;
1396- assert_next_range ! ( stream, ( 0 ..=4 ) ) ;
1397- assert_next_range ! ( stream, ( 5 ..=9 ) ) ;
1398- assert_next_range ! ( stream, ( 10 ..=14 ) ) ;
1399- assert_next_range ! ( stream, ( 15 ..=19 ) ) ;
1400- assert ! ( stream. next ( ) . await . is_none ( ) ) ;
1420+ assert_next_range ! ( stream, 0 ..=4 ) ;
1421+ assert_next_range ! ( stream, 5 ..=9 ) ;
1422+ assert_next_range ! ( stream, 10 ..=14 ) ;
1423+ assert_next_range ! ( stream, 15 ..=19 ) ;
1424+ assert_next_range ! ( stream, None ) ;
14011425
14021426 // ranges where last batch is smaller than blocks per epoch
14031427 let mut stream = client. stream_historical ( 93 , 99 ) . await ?;
1404- assert_next_range ! ( stream, ( 93 ..=97 ) ) ;
1405- assert_next_range ! ( stream, ( 98 ..=99 ) ) ;
1406- assert ! ( stream. next ( ) . await . is_none ( ) ) ;
1428+ assert_next_range ! ( stream, 93 ..=97 ) ;
1429+ assert_next_range ! ( stream, 98 ..=99 ) ;
1430+ assert_next_range ! ( stream, None ) ;
14071431
14081432 // range where blocks per epoch is larger than the number of blocks in the range
14091433 let mut stream = client. stream_historical ( 3 , 5 ) . await ?;
1410- assert_next_range ! ( stream, ( 3 ..=5 ) ) ;
1411- assert ! ( stream. next ( ) . await . is_none ( ) ) ;
1434+ assert_next_range ! ( stream, 3 ..=5 ) ;
1435+ assert_next_range ! ( stream, None ) ;
14121436
14131437 // single item range
14141438 let mut stream = client. stream_historical ( 3 , 3 ) . await ?;
1415- assert_next_range ! ( stream, ( 3 ..=3 ) ) ;
1416- assert ! ( stream. next ( ) . await . is_none ( ) ) ;
1439+ assert_next_range ! ( stream, 3 ..=3 ) ;
1440+ assert_next_range ! ( stream, None ) ;
14171441
14181442 // range where blocks per epoch is larger than the number of blocks on chain
14191443 let client = BlockRangeScanner :: new ( )
@@ -1423,12 +1447,12 @@ mod tests {
14231447 . run ( ) ?;
14241448
14251449 let mut stream = client. stream_historical ( 0 , 20 ) . await ?;
1426- assert_next_range ! ( stream, ( 0 ..=20 ) ) ;
1427- assert ! ( stream. next ( ) . await . is_none ( ) ) ;
1450+ assert_next_range ! ( stream, 0 ..=20 ) ;
1451+ assert_next_range ! ( stream, None ) ;
14281452
14291453 let mut stream = client. stream_historical ( 0 , 99 ) . await ?;
1430- assert_next_range ! ( stream, ( 0 ..=99 ) ) ;
1431- assert ! ( stream. next ( ) . await . is_none ( ) ) ;
1454+ assert_next_range ! ( stream, 0 ..=99 ) ;
1455+ assert_next_range ! ( stream, None ) ;
14321456
14331457 Ok ( ( ) )
14341458 }
0 commit comments