@@ -128,10 +128,7 @@ pub enum Error {
128128 #[ error( "WebSocket connection failed after {0} attempts" ) ]
129129 WebSocketConnectionFailed ( usize ) ,
130130
131- #[ error( "End of block batch" ) ]
132- Eof ,
133-
134- #[ error( "Reorg detected" ) ]
131+ #[ error( "Reorg Detected" ) ]
135132 ReorgDetected ,
136133}
137134
@@ -465,7 +462,9 @@ impl<N: Network> Service<N> {
465462
466463 self . sync_historical_data ( start_block, end_block) . await ?;
467464
468- info ! ( "Successfully synced historical data" ) ;
465+ _ = self . subscriber . take ( ) ;
466+
467+ info ! ( "Successfully synced historical data, closing the stream" ) ;
469468
470469 Ok ( ( ) )
471470 }
@@ -573,12 +572,6 @@ impl<N: Network> Service<N> {
573572
574573 info ! ( batch_count = batch_count, "Historical sync completed" ) ;
575574
576- if let Some ( sender) = & self . subscriber &&
577- sender. send ( Err ( Error :: Eof ) ) . await . is_err ( )
578- {
579- warn ! ( "Subscriber channel closed, cleaning up" ) ;
580- }
581-
582575 Ok ( ( ) )
583576 }
584577
@@ -599,14 +592,11 @@ impl<N: Network> Service<N> {
599592 info ! ( block_number = incoming_block_num, "Received block header" ) ;
600593
601594 if incoming_block_num < expected_next_block {
595+ warn ! ( "Reorg detected: sending forked range" ) ;
602596 if sender. send ( Err ( Error :: ReorgDetected ) ) . await . is_err ( ) {
603597 warn ! ( "Downstream channel closed, stopping live blocks task" ) ;
604598 return ;
605599 }
606- warn ! ( "Reorg detected: sending forked range" ) ;
607- // TODO: should we send the incoming block range or incoming block num -
608- // reorg depth? The incoming block should be the
609- // latest block from the reorg point so no real need
610600
611601 // resets cursor to incoming block num
612602 expected_next_block = incoming_block_num;
0 commit comments