@@ -170,23 +170,11 @@ pub enum Command {
170170 Unsubscribe {
171171 response : oneshot:: Sender < Result < ( ) , BlockRangeScannerError > > ,
172172 } ,
173- GetStatus {
174- response : oneshot:: Sender < ServiceStatus > ,
175- } ,
176173 Shutdown {
177174 response : oneshot:: Sender < Result < ( ) , BlockRangeScannerError > > ,
178175 } ,
179176}
180177
181- #[ derive( Debug , Clone ) ]
182- pub struct ServiceStatus {
183- pub is_subscribed : bool ,
184- pub last_synced_block : BlockHashAndNumber ,
185- pub websocket_connected : bool ,
186- pub processed_count : u64 ,
187- pub error_count : u64 ,
188- }
189-
190178#[ derive( Default , Debug , Clone ) ]
191179pub struct BlockHashAndNumber {
192180 pub hash : BlockHash ,
@@ -404,10 +392,6 @@ impl<N: Network> Service<N> {
404392 self . handle_unsubscribe ( ) ;
405393 let _ = response. send ( Ok ( ( ) ) ) ;
406394 }
407- Command :: GetStatus { response } => {
408- let status = self . get_status ( ) ;
409- let _ = response. send ( status) ;
410- }
411395 Command :: Shutdown { response } => {
412396 self . shutdown = true ;
413397 self . handle_unsubscribe ( ) ;
@@ -590,8 +574,11 @@ impl<N: Network> Service<N> {
590574 Ok ( ws_stream) => {
591575 info ! ( "WebSocket connected for live blocks" ) ;
592576
593- let cur = expected_next_block;
594- let mut stream = ws_stream. into_stream ( ) . skip_while ( |header| header. number ( ) < cur) ;
577+ // ensure we start streaming only after the expected_next_block cutoff
578+ let cutoff = expected_next_block;
579+ let mut stream =
580+ ws_stream. into_stream ( ) . skip_while ( |header| header. number ( ) < cutoff) ;
581+
595582 while let Some ( incoming_block) = stream. next ( ) . await {
596583 let incoming_block_num = incoming_block. number ( ) ;
597584 info ! ( block_number = incoming_block_num, "Received block header" ) ;
@@ -737,16 +724,6 @@ impl<N: Network> Service<N> {
737724 }
738725 }
739726
740- fn get_status ( & self ) -> ServiceStatus {
741- ServiceStatus {
742- is_subscribed : self . subscriber . is_some ( ) ,
743- last_synced_block : self . current . clone ( ) ,
744- websocket_connected : self . websocket_connected ,
745- processed_count : self . processed_count ,
746- error_count : self . error_count ,
747- }
748- }
749-
750727 fn ensure_no_subscriber ( & self ) -> Result < ( ) , BlockRangeScannerError > {
751728 if self . subscriber . is_some ( ) {
752729 return Err ( BlockRangeScannerError :: MultipleSubscribers ) ;
@@ -884,24 +861,6 @@ impl BlockRangeScannerClient {
884861 response_rx. await . map_err ( |_| BlockRangeScannerError :: ServiceShutdown ) ?
885862 }
886863
887- /// Returns the current status of the subscription service.
888- ///
889- /// # Errors
890- ///
891- /// * `BlockRangeScannerError::ServiceShutdown` - if the service is already shutting down.
892- pub async fn get_status ( & self ) -> Result < ServiceStatus , BlockRangeScannerError > {
893- let ( response_tx, response_rx) = oneshot:: channel ( ) ;
894-
895- let command = Command :: GetStatus { response : response_tx } ;
896-
897- self . command_sender
898- . send ( command)
899- . await
900- . map_err ( |_| BlockRangeScannerError :: ServiceShutdown ) ?;
901-
902- response_rx. await . map_err ( |_| BlockRangeScannerError :: ServiceShutdown )
903- }
904-
905864 /// Shuts down the subscription service and unsubscribes the current subscriber.
906865 ///
907866 /// # Errors
@@ -978,33 +937,6 @@ mod tests {
978937 assert_eq ! ( scanner. block_confirmations, block_confirmations) ;
979938 }
980939
981- #[ test]
982- fn service_status_reflects_internal_state ( ) {
983- let asserter = Asserter :: new ( ) ;
984- let provider = mocked_provider ( asserter) ;
985- let ( mut service, _cmd) = Service :: new ( test_config ( ) , provider) ;
986-
987- let processed_count = 7 ;
988- let error_count = 2 ;
989- service. processed_count = processed_count;
990- service. error_count = error_count;
991- let hash = keccak256 ( b"random" ) ;
992- let block_number = 99 ;
993- service. current = BlockHashAndNumber { hash, number : block_number } ;
994- service. websocket_connected = true ;
995- service. subscriber = Some ( mpsc:: channel ( 1 ) . 0 ) ;
996-
997- let status = service. get_status ( ) ;
998-
999- assert ! ( status. is_subscribed) ;
1000- assert ! ( status. websocket_connected) ;
1001- assert_eq ! ( status. processed_count, processed_count) ;
1002- assert_eq ! ( status. error_count, error_count) ;
1003- let last = status. last_synced_block ;
1004- assert_eq ! ( last. number, block_number) ;
1005- assert_eq ! ( last. hash, hash) ;
1006- }
1007-
1008940 #[ tokio:: test]
1009941 async fn send_to_subscriber_increments_processed_count ( ) -> anyhow:: Result < ( ) > {
1010942 let asserter = Asserter :: new ( ) ;
0 commit comments