@@ -512,6 +512,7 @@ mod tests {
512512 use alloy_node_bindings:: Anvil ;
513513 use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
514514 use tokio:: time:: sleep;
515+ use tokio_stream:: StreamExt ;
515516
516517 fn test_provider ( timeout : u64 , max_retries : usize , min_delay : u64 ) -> RobustProvider {
517518 RobustProvider {
@@ -718,4 +719,48 @@ mod tests {
718719
719720 Ok ( ( ) )
720721 }
722+
723+ #[ tokio:: test]
724+ async fn test_stream_with_failover ( ) -> anyhow:: Result < ( ) > {
725+ let mut anvil_1 = Some ( Anvil :: new ( ) . block_time ( 1 ) . try_spawn ( ) ?) ;
726+
727+ let ws_provider = ProviderBuilder :: new ( )
728+ . connect ( anvil_1. as_ref ( ) . unwrap ( ) . ws_endpoint_url ( ) . as_str ( ) )
729+ . await ?;
730+
731+ let anvil_2 = Anvil :: new ( ) . block_time ( 1 ) . try_spawn ( ) ?;
732+
733+ let ws_provider_2 =
734+ ProviderBuilder :: new ( ) . connect ( anvil_2. ws_endpoint_url ( ) . as_str ( ) ) . await ?;
735+
736+ let robust = RobustProviderBuilder :: fragile ( ws_provider. clone ( ) )
737+ . fallback ( ws_provider_2)
738+ . subscription_timeout ( Duration :: from_secs ( 3 ) )
739+ . build ( )
740+ . await ?;
741+
742+ let subscription = robust. subscribe_blocks ( ) . await ?;
743+ let mut stream = subscription. into_stream ( ) ;
744+
745+ while let Some ( result) = stream. next ( ) . await {
746+ let Ok ( block) = result else {
747+ break ;
748+ } ;
749+
750+ let block_number = block. number ( ) ;
751+
752+ // At block 10, drop the primary provider to test failover
753+ if block_number == 10 &&
754+ let Some ( anvil) = anvil_1. take ( )
755+ {
756+ drop ( anvil) ;
757+ }
758+
759+ if block_number >= 20 {
760+ break ;
761+ }
762+ }
763+
764+ Ok ( ( ) )
765+ }
721766}
0 commit comments