@@ -733,13 +733,147 @@ impl BlockRangeScannerClient {
733733
734734#[ cfg( test) ]
735735mod tests {
736- use alloy:: network:: Ethereum ;
736+ use alloy:: {
737+ eips:: BlockNumberOrTag ,
738+ network:: Ethereum ,
739+ primitives:: { B256 , keccak256} ,
740+ rpc:: {
741+ client:: RpcClient ,
742+ types:: { Block as RpcBlock , Header , Transaction } ,
743+ } ,
744+ transports:: mock:: Asserter ,
745+ } ;
737746 use alloy_node_bindings:: Anvil ;
738-
747+ use serde_json:: { Value , json} ;
748+ use tokio:: sync:: mpsc;
739749 use tokio_stream:: StreamExt ;
740750
741751 use super :: * ;
742752
753+ fn test_config ( ) -> Config {
754+ Config { blocks_read_per_epoch : 5 , reorg_rewind_depth : 5 , block_confirmations : 0 }
755+ }
756+
757+ fn mocked_provider ( asserter : Asserter ) -> RootProvider < Ethereum > {
758+ RootProvider :: new ( RpcClient :: mocked ( asserter) )
759+ }
760+
761+ fn mock_block ( number : u64 , hash : B256 ) -> RpcBlock < Transaction , Header > {
762+ let mut block: RpcBlock < Transaction , Header > = RpcBlock :: default ( ) ;
763+ block. header . hash = hash;
764+ block. header . number = number;
765+ block
766+ }
767+
768+ #[ test]
769+ fn block_range_scanner_defaults_match_constants ( ) {
770+ let scanner = BlockRangeScanner :: new ( ) ;
771+
772+ assert_eq ! ( scanner. blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH ) ;
773+ assert_eq ! ( scanner. reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH ) ;
774+ assert_eq ! ( scanner. block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS ) ;
775+ }
776+
777+ #[ test]
778+ fn builder_methods_update_configuration ( ) {
779+ let blocks_read_per_epoch = 42 ;
780+ let reorg_rewind_depth = 12 ;
781+ let block_confirmations = 7 ;
782+
783+ let scanner = BlockRangeScanner :: new ( )
784+ . with_blocks_read_per_epoch ( blocks_read_per_epoch)
785+ . with_reorg_rewind_depth ( reorg_rewind_depth)
786+ . with_block_confirmations ( block_confirmations) ;
787+
788+ assert_eq ! ( scanner. blocks_read_per_epoch, blocks_read_per_epoch) ;
789+ assert_eq ! ( scanner. block_confirmations, block_confirmations) ;
790+ }
791+
792+ #[ test]
793+ fn service_status_reflects_internal_state ( ) {
794+ let asserter = Asserter :: new ( ) ;
795+ let provider = mocked_provider ( asserter) ;
796+ let ( mut service, _cmd) = Service :: new ( test_config ( ) , provider) ;
797+
798+ let processed_count = 7 ;
799+ let error_count = 2 ;
800+ service. processed_count = processed_count;
801+ service. error_count = error_count;
802+ let hash = keccak256 ( b"random" ) ;
803+ let block_number = 99 ;
804+ service. current = Some ( BlockHashAndNumber { hash, number : block_number } ) ;
805+ service. websocket_connected = true ;
806+ service. subscriber = Some ( mpsc:: channel ( 1 ) . 0 ) ;
807+
808+ let status = service. get_status ( ) ;
809+
810+ assert ! ( status. is_subscribed) ;
811+ assert ! ( status. websocket_connected) ;
812+ assert_eq ! ( status. processed_count, processed_count) ;
813+ assert_eq ! ( status. error_count, error_count) ;
814+ let last = status. last_synced_block . expect ( "last synced block is set" ) ;
815+ assert_eq ! ( last. number, block_number) ;
816+ assert_eq ! ( last. hash, hash) ;
817+ }
818+
819+ #[ tokio:: test]
820+ async fn send_to_subscriber_increments_processed_count ( ) -> anyhow:: Result < ( ) > {
821+ let asserter = Asserter :: new ( ) ;
822+ let provider = mocked_provider ( asserter) ;
823+ let ( mut service, _cmd) = Service :: new ( test_config ( ) , provider) ;
824+
825+ let ( tx, mut rx) = mpsc:: channel ( 1 ) ;
826+ service. subscriber = Some ( tx) ;
827+
828+ let expected_range = 10 ..=11 ;
829+ service. send_to_subscriber ( Ok ( expected_range. clone ( ) ) ) . await ;
830+
831+ assert_eq ! ( service. processed_count, 1 ) ;
832+ assert ! ( service. subscriber. is_some( ) ) ;
833+
834+ let received = rx. recv ( ) . await . expect ( "range received" ) ?;
835+ assert_eq ! ( received, expected_range) ;
836+
837+ Ok ( ( ) )
838+ }
839+
840+ #[ tokio:: test]
841+ async fn send_to_subscriber_removes_closed_channel ( ) -> anyhow:: Result < ( ) > {
842+ let asserter = Asserter :: new ( ) ;
843+ let provider = mocked_provider ( asserter) ;
844+ let ( mut service, _cmd) = Service :: new ( test_config ( ) , provider) ;
845+
846+ let ( tx, rx) = mpsc:: channel ( 1 ) ;
847+ service. websocket_connected = true ;
848+ service. subscriber = Some ( tx) ;
849+ // channel is closed
850+ drop ( rx) ;
851+
852+ service. send_to_subscriber ( Ok ( 15 ..=15 ) ) . await ;
853+
854+ assert ! ( service. subscriber. is_none( ) ) ;
855+ assert ! ( !service. websocket_connected) ;
856+ assert_eq ! ( service. processed_count, 0 ) ;
857+
858+ Ok ( ( ) )
859+ }
860+
861+ #[ test]
862+ fn handle_unsubscribe_clears_subscriber ( ) {
863+ let asserter = Asserter :: new ( ) ;
864+ let provider = mocked_provider ( asserter) ;
865+ let ( mut service, _cmd) = Service :: new ( test_config ( ) , provider) ;
866+
867+ let ( tx, _rx) = mpsc:: channel ( 1 ) ;
868+ service. websocket_connected = true ;
869+ service. subscriber = Some ( tx) ;
870+
871+ service. handle_unsubscribe ( ) ;
872+
873+ assert ! ( service. subscriber. is_none( ) ) ;
874+ assert ! ( !service. websocket_connected) ;
875+ }
876+
743877 #[ tokio:: test]
744878 async fn live_mode_processes_all_blocks ( ) -> anyhow:: Result < ( ) > {
745879 let anvil = Anvil :: new ( ) . block_time_f64 ( 0.01 ) . try_spawn ( ) ?;
@@ -779,4 +913,81 @@ mod tests {
779913
780914 Ok ( ( ) )
781915 }
916+
917+ #[ tokio:: test]
918+ async fn rewinds_on_detected_reorg ( ) -> anyhow:: Result < ( ) > {
919+ let asserter = Asserter :: new ( ) ;
920+ let provider = mocked_provider ( asserter. clone ( ) ) ;
921+
922+ let mut config = test_config ( ) ;
923+ config. reorg_rewind_depth = 6 ;
924+ let ( mut service, _cmd) = Service :: new ( config. clone ( ) , provider) ;
925+
926+ let original_height = 10 ;
927+ let original_hash = keccak256 ( b"original block" ) ;
928+ let original_block = mock_block ( original_height, original_hash) ;
929+ service. current =
930+ Some ( BlockHashAndNumber :: from_header :: < Ethereum > ( original_block. header ( ) ) ) ;
931+
932+ let expected_rewind_height = original_height - config. reorg_rewind_depth ;
933+ let expected_rewind_hash = keccak256 ( b"rewound block" ) ;
934+ let rewound_block = mock_block ( expected_rewind_height, expected_rewind_hash) ;
935+
936+ // Mock provider responses for reorg detection and rewind:
937+ // 1. get_block_by_hash(original_hash) -> None (block not found = reorg detected)
938+ asserter. push_success ( & Value :: Null ) ;
939+ // 2. get_block_number() -> 12 (current chain head is at 12)
940+ asserter. push_success ( & json ! ( format!( "0x{:x}" , original_height + 2 ) ) ) ;
941+ // 3. get_block_by_number(expected_rewind_height) -> rewound_block
942+ asserter. push_success ( & rewound_block) ;
943+
944+ service. ensure_current_not_reorged ( ) . await ?;
945+
946+ let current = service. current . expect ( "current block should be set after rewind" ) ;
947+ assert_eq ! ( current. number, expected_rewind_height, "should rewind by reorg_rewind_depth" ) ;
948+ assert_eq ! ( current. hash, expected_rewind_hash, "should use hash of block at rewind height" ) ;
949+
950+ Ok ( ( ) )
951+ }
952+
953+ #[ tokio:: test]
954+ async fn buffered_messages_trim_ranges_prior_to_cutoff ( ) -> anyhow:: Result < ( ) > {
955+ let ( buffer_tx, buffer_rx) = mpsc:: channel ( 8 ) ;
956+ buffer_tx. send ( 40 ..=44 ) . await . unwrap ( ) ;
957+ buffer_tx. send ( 45 ..=54 ) . await . unwrap ( ) ;
958+ buffer_tx. send ( 60 ..=61 ) . await . unwrap ( ) ;
959+ drop ( buffer_tx) ;
960+
961+ let ( out_tx, mut out_rx) = mpsc:: channel ( 8 ) ;
962+
963+ Service :: < Ethereum > :: process_buffered_messages ( buffer_rx, out_tx, 50 ) . await ;
964+
965+ let mut forwarded = Vec :: new ( ) ;
966+ while let Some ( result) = out_rx. recv ( ) . await {
967+ forwarded. push ( result. unwrap ( ) ) ;
968+ }
969+
970+ assert_eq ! ( forwarded, vec![ 50 ..=54 , 60 ..=61 ] ) ;
971+
972+ Ok ( ( ) )
973+ }
974+
975+ #[ tokio:: test]
976+ async fn forwards_errors_to_subscribers ( ) -> anyhow:: Result < ( ) > {
977+ let asserter = Asserter :: new ( ) ;
978+ let provider = mocked_provider ( asserter) ;
979+ let ( mut service, _cmd) = Service :: new ( test_config ( ) , provider) ;
980+
981+ let ( tx, mut rx) = mpsc:: channel ( 1 ) ;
982+ service. subscriber = Some ( tx) ;
983+
984+ service. send_to_subscriber ( Err ( Error :: WebSocketConnectionFailed ( 4 ) ) ) . await ;
985+
986+ match rx. recv ( ) . await . expect ( "subscriber should stay open" ) {
987+ Err ( Error :: WebSocketConnectionFailed ( attempts) ) => assert_eq ! ( attempts, 4 ) ,
988+ other => panic ! ( "unexpected message: {other:?}" ) ,
989+ }
990+
991+ Ok ( ( ) )
992+ }
782993}
0 commit comments