@@ -1766,9 +1766,15 @@ fn stream_new_blocks<
17661766 let block_stream =
17671767 central_source. stream_new_blocks( header_marker, up_to) . fuse( ) ;
17681768 pin_mut!( block_stream) ;
1769+ let mut next_expected_block = header_marker;
17691770 while let Some ( maybe_block) = block_stream. next( ) . await {
17701771 let ( block_number, block, signature) = maybe_block?;
1772+ if block_number != next_expected_block {
1773+ warn!( "Gap detected in block stream. Expected {}, found {}. Discarding and retrying..." , next_expected_block, block_number) ;
1774+ break ;
1775+ }
17711776 yield SyncEvent :: BlockAvailable { block_number, block , signature } ;
1777+ next_expected_block = next_expected_block. unchecked_next( ) ;
17721778 }
17731779 }
17741780 }
@@ -1801,20 +1807,28 @@ fn stream_new_state_diffs<TCentralSource: CentralSourceTrait + Sync + Send>(
18011807 central_source. stream_state_updates( state_marker, up_to) . fuse( ) ;
18021808 pin_mut!( state_diff_stream) ;
18031809
1810+ let mut next_expected_block = state_marker;
18041811 while let Some ( maybe_state_diff) = state_diff_stream. next( ) . await {
18051812 let (
18061813 block_number,
18071814 block_hash,
18081815 mut state_diff,
18091816 deployed_contract_class_definitions,
18101817 ) = maybe_state_diff?;
1818+
1819+ if block_number != next_expected_block {
1820+ warn!( "Gap detected in state diff stream. Expected {}, found {}. Discarding and retrying..." , next_expected_block, block_number) ;
1821+ break ;
1822+ }
1823+
18111824 sort_state_diff( & mut state_diff) ;
18121825 yield SyncEvent :: StateDiffAvailable {
18131826 block_number,
18141827 block_hash,
18151828 state_diff,
18161829 deployed_contract_class_definitions,
18171830 } ;
1831+ next_expected_block = next_expected_block. unchecked_next( ) ;
18181832 }
18191833 }
18201834 }
0 commit comments