@@ -852,9 +852,21 @@ impl<N: Network> Service<N> {
852852 match Self :: get_block_subscription ( & provider) . await {
853853 Ok ( sub) => {
854854 let mut stream = sub. into_stream ( ) ;
855- while let Some ( h) = stream. next ( ) . await {
856- println ! ( "h {}" , h. number( ) ) ;
857- if live_block_num_sender. send ( h. number ( ) ) . await . is_err ( ) {
855+ let mut last_seen: Option < BlockNumber > = None ;
856+
857+ while let Some ( incoming_block) = stream. next ( ) . await {
858+ let incoming_block_num = incoming_block. number ( ) ;
859+ // Emit only non-increasing heads at/below end_num (reorg signals)
860+ let emit = match last_seen {
861+ Some ( prev) => {
862+ incoming_block_num <= prev && incoming_block_num <= end_num
863+ }
864+ None => false ,
865+ } ;
866+
867+ last_seen = Some ( incoming_block_num) ;
868+
869+ if emit && live_block_num_sender. send ( incoming_block_num) . await . is_err ( ) {
858870 warn ! ( "Downstream channel closed, stopping live header monitor" ) ;
859871 break ;
860872 }
@@ -876,21 +888,19 @@ impl<N: Network> Service<N> {
876888 mut rx : mpsc:: Receiver < BlockNumber > ,
877889 end_num : BlockNumber ,
878890 ) {
879- let Ok ( mut min_live_block ) = rx. try_recv ( ) else {
891+ let Ok ( first ) = rx. try_recv ( ) else {
880892 return ;
881893 } ;
882894
883- while let Ok ( live_block) = rx. try_recv ( ) {
884- if live_block < min_live_block {
885- min_live_block = live_block;
895+ // Live monitor forwards only non-increasing heads at/below end_num (reorg signals).
896+ // Determine earliest affected block and re-emit [reorg_start..=end_num].
897+ let mut reorg_start = first;
898+ while let Ok ( reorged_blocks) = rx. try_recv ( ) {
899+ if reorged_blocks < reorg_start {
900+ reorg_start = reorged_blocks;
886901 }
887902 }
888903
889- if min_live_block >= end_num {
890- return ;
891- }
892-
893- let reorg_start = min_live_block;
894904 let max_read = self . config . blocks_read_per_epoch as u64 ;
895905
896906 self . send_to_subscriber ( BlockRangeMessage :: Status ( ScannerStatus :: ReorgDetected ) ) . await ;
0 commit comments