Skip to content

Commit 0bfd0c9

Browse files
LeoPatOZ0xNeshi
andauthored
Reorg Rewind optimisation (#231)
Co-authored-by: 0xNeshi <[email protected]>
1 parent df3fdee commit 0bfd0c9

File tree

9 files changed

+629
-81
lines changed

9 files changed

+629
-81
lines changed

src/block_range_scanner.rs

Lines changed: 120 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,13 @@ use tokio_stream::wrappers::ReceiverStream;
6969
use crate::{
7070
ScannerError, ScannerMessage,
7171
block_range_scanner::sync_handler::SyncHandler,
72-
robust_provider::{IntoRobustProvider, RobustProvider, provider::Error as RobustProviderError},
72+
robust_provider::{IntoRobustProvider, RobustProvider},
7373
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
7474
};
7575

7676
use alloy::{
7777
consensus::BlockHeader,
78-
eips::BlockId,
78+
eips::{BlockId, BlockNumberOrTag},
7979
network::{BlockResponse, Network, primitives::HeaderResponse},
8080
primitives::BlockNumber,
8181
};
@@ -442,12 +442,6 @@ impl<N: Network> Service<N> {
442442
}
443443

444444
/// Streams blocks in reverse order from `from` to `to`.
445-
///
446-
/// The `from` block is assumed to be greater than or equal to the `to` block.
447-
///
448-
/// # Errors
449-
///
450-
/// Returns an error if the stream fails
451445
async fn stream_rewind(
452446
from: N::BlockResponse,
453447
to: N::BlockResponse,
@@ -464,8 +458,22 @@ impl<N: Network> Service<N> {
464458
let from = tip.header().number();
465459
let to = to.header().number();
466460

461+
let finalized_block = match provider.get_block_by_number(BlockNumberOrTag::Finalized).await
462+
{
463+
Ok(block) => block,
464+
Err(e) => {
465+
error!(error = %e, "Failed to get finalized block");
466+
_ = sender.try_stream(e).await;
467+
return;
468+
}
469+
};
470+
467471
// we're iterating in reverse
468472
let mut batch_from = from;
473+
let finalized_number = finalized_block.header().number();
474+
475+
// only check reorg if our tip is after the finalized block
476+
let check_reorg = tip.header().number() > finalized_number;
469477

470478
while batch_from >= to {
471479
let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
@@ -486,51 +494,92 @@ impl<N: Network> Service<N> {
486494
break;
487495
}
488496

489-
let reorged_opt = match reorg_handler.check(&tip).await {
490-
Ok(opt) => {
491-
info!(block_number = %from, hash = %tip.header().hash(), "Reorg detected");
492-
opt
493-
}
494-
Err(e) => {
495-
error!(error = %e, "Terminal RPC call error, shutting down");
496-
_ = sender.try_stream(e).await;
497-
return;
498-
}
499-
};
500-
501-
// For now we only care if a reorg occurred, not which block it was.
502-
// Once we optimize 'latest' mode to update only the reorged logs, we will need the
503-
// exact common ancestor.
504-
if reorged_opt.is_some() {
505-
info!(block_number = %from, hash = %tip.header().hash(), "Reorg detected");
506-
507-
if !sender.try_stream(Notification::ReorgDetected).await {
508-
break;
509-
}
510-
511-
// restart rewind
512-
batch_from = from;
513-
// store the updated end block hash
514-
tip = match provider.get_block_by_number(from.into()).await {
515-
Ok(block) => block,
516-
Err(RobustProviderError::BlockNotFound(_)) => {
517-
panic!("Block with number '{from}' should exist post-reorg");
518-
}
497+
if check_reorg {
498+
let reorg = match reorg_handler.check(&tip).await {
499+
Ok(opt) => opt,
519500
Err(e) => {
520501
error!(error = %e, "Terminal RPC call error, shutting down");
521502
_ = sender.try_stream(e).await;
522503
return;
523504
}
524505
};
525-
} else {
526-
// `batch_to` is always greater than `to`, so `batch_to - 1` is always a valid
527-
// unsigned integer
528-
batch_from = batch_to - 1;
506+
507+
if let Some(common_ancestor) = reorg &&
508+
!Self::handle_reorg_rescan(
509+
&mut tip,
510+
common_ancestor,
511+
max_block_range,
512+
sender,
513+
provider,
514+
)
515+
.await
516+
{
517+
return;
518+
}
529519
}
520+
521+
batch_from = batch_to - 1;
530522
}
531523

532524
info!(batch_count = batch_count, "Rewind completed");
533525
}
526+
527+
/// Handles re-scanning of reorged blocks.
528+
///
529+
/// Returns `true` on success, `false` if stream closed or terminal error occurred.
530+
async fn handle_reorg_rescan(
531+
tip: &mut N::BlockResponse,
532+
common_ancestor: N::BlockResponse,
533+
max_block_range: u64,
534+
sender: &mpsc::Sender<BlockScannerResult>,
535+
provider: &RobustProvider<N>,
536+
) -> bool {
537+
let tip_number = tip.header().number();
538+
let common_ancestor = common_ancestor.header().number();
539+
info!(
540+
block_number = %tip_number,
541+
hash = %tip.header().hash(),
542+
common_ancestor = %common_ancestor,
543+
"Reorg detected"
544+
);
545+
546+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
547+
return false;
548+
}
549+
550+
// Get the new tip block (same height as original tip, but new hash)
551+
*tip = match provider.get_block_by_number(tip_number.into()).await {
552+
Ok(block) => block,
553+
Err(e) => {
554+
if matches!(e, crate::robust_provider::Error::BlockNotFound(_)) {
555+
error!("Unexpected error: pre-reorg chain tip should exist on a reorged chain");
556+
} else {
557+
error!(error = %e, "Terminal RPC call error, shutting down");
558+
}
559+
_ = sender.try_stream(e).await;
560+
return false;
561+
}
562+
};
563+
564+
// Re-scan only the affected range (from common_ancestor + 1 up to tip)
565+
let rescan_from = common_ancestor + 1;
566+
567+
let mut rescan_batch_start = rescan_from;
568+
while rescan_batch_start <= tip_number {
569+
let rescan_batch_end = (rescan_batch_start + max_block_range - 1).min(tip_number);
570+
571+
if !sender.try_stream(rescan_batch_start..=rescan_batch_end).await {
572+
return false;
573+
}
574+
575+
if rescan_batch_end == tip_number {
576+
break;
577+
}
578+
rescan_batch_start = rescan_batch_end + 1;
579+
}
580+
581+
true
582+
}
534583
}
535584

536585
pub struct BlockRangeScannerClient {
@@ -643,14 +692,41 @@ impl BlockRangeScannerClient {
643692

644693
/// Streams blocks in reverse order from `start_id` to `end_id`.
645694
///
695+
/// The `start_id` block is assumed to be greater than or equal to the `end_id` block.
696+
/// Blocks are streamed in batches, where each batch is ordered from lower to higher
697+
/// block numbers (chronological order within each batch), but batches themselves
698+
/// progress from newer to older blocks.
699+
///
646700
/// # Arguments
647701
///
648-
/// * `start_id` - The starting block id (defaults to Latest if None).
649-
/// * `end_id` - The ending block id (defaults to Earliest if None).
702+
/// * `start_id` - The starting block id (higher block number).
703+
/// * `end_id` - The ending block id (lower block number).
704+
///
705+
/// # Reorg Handling
706+
///
707+
/// Reorg checks are only performed when the specified block range tip is above the
708+
/// current finalized block height. When a reorg is detected:
709+
///
710+
/// 1. A [`Notification::ReorgDetected`] is emitted with the common ancestor block
711+
/// 2. The scanner fetches the new tip block at the same height
712+
/// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to
713+
/// the new tip)
714+
/// 4. The reverse scan continues from where it left off
715+
///
716+
/// If the range tip is at or below the finalized block, no reorg checks are
717+
/// performed since finalized blocks cannot be reorganized.
718+
///
719+
/// # Note
720+
///
721+
/// The reason reorged blocks are streamed in chronological order is to make it easier to handle
722+
/// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks
723+
/// to the result collection, which must maintain chronological order.
650724
///
651725
/// # Errors
652726
///
653727
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
728+
///
729+
/// [latest mode]: crate::EventScannerBuilder::latest
654730
pub async fn rewind(
655731
&self,
656732
start_id: impl Into<BlockId>,

src/block_range_scanner/common.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,12 @@ async fn handle_reorg_detected<N: Network>(
251251
state: &mut LiveStreamingState<N>,
252252
sender: &mpsc::Sender<BlockScannerResult>,
253253
) -> bool {
254-
if !sender.try_stream(Notification::ReorgDetected).await {
254+
let ancestor_num = common_ancestor.header().number();
255+
256+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await {
255257
return false;
256258
}
257259

258-
let ancestor_num = common_ancestor.header().number();
259-
260260
// Reset streaming position based on common ancestor
261261
if ancestor_num < stream_start {
262262
// Reorg went before our starting point - restart from stream_start
@@ -431,11 +431,11 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
431431
};
432432

433433
next_start_block = if let Some(common_ancestor) = reorged_opt {
434-
if !sender.try_stream(Notification::ReorgDetected).await {
434+
let common_ancestor = common_ancestor.header().number();
435+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
435436
return None;
436437
}
437-
438-
min_common_ancestor.max(common_ancestor.header().number()) + 1
438+
(common_ancestor + 1).max(min_common_ancestor)
439439
} else {
440440
batch_end_num + 1
441441
};

0 commit comments

Comments
 (0)