Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0c2e786
feat: remove command + brs client
LeoPatOZ Dec 18, 2025
f27636f
feat: remove servce shutdown error
LeoPatOZ Dec 18, 2025
56ae738
feat: remove block range scanner client
LeoPatOZ Dec 18, 2025
29f68ae
docs: remove client
LeoPatOZ Dec 18, 2025
915f6fb
feat: move block range scanner to brs folder and seperate crates
LeoPatOZ Dec 18, 2025
1b37475
fix: docs
LeoPatOZ Dec 18, 2025
4cfb3b2
feat: better rexports
LeoPatOZ Dec 18, 2025
10e5f4c
feat: add back errors
LeoPatOZ Dec 18, 2025
c62ec4b
feat: add back error comments
LeoPatOZ Dec 18, 2025
0376a4b
Merge branch 'refactor-brs' into merge-brs-crate
LeoPatOZ Dec 18, 2025
16a37b3
ref: fix doc
LeoPatOZ Dec 18, 2025
a5c6f71
fmt: fix
LeoPatOZ Dec 18, 2025
a5c2029
feat: add rewind handler
LeoPatOZ Dec 18, 2025
d0c8f57
ref: remove mut self
LeoPatOZ Dec 18, 2025
7126576
Merge branch 'main' into merge-brs-crate
LeoPatOZ Dec 19, 2025
bc91cff
ref: rename BRS to BRS Builder, and Connected BRS to BRS
LeoPatOZ Dec 19, 2025
618ac72
fix: doc
LeoPatOZ Dec 19, 2025
ad364c6
fix: readme connect methods
LeoPatOZ Dec 19, 2025
2f61cd3
ref: remove pub(crate) common
LeoPatOZ Dec 19, 2025
110b149
Merge branch 'merge-brs-crate' into feat-rewind-handler
LeoPatOZ Dec 19, 2025
e4ee534
fix: merge import
LeoPatOZ Dec 19, 2025
de59731
fix: doc
LeoPatOZ Dec 19, 2025
14d125d
fix: format
LeoPatOZ Dec 19, 2025
e2bbda2
ref: rename client to brs in test
LeoPatOZ Dec 19, 2025
7959e09
Merge branch 'merge-brs-crate' into feat-rewind-handler
LeoPatOZ Dec 19, 2025
eccffd1
fix merge conflicts
0xNeshi Dec 22, 2025
031ad1e
revert README changes
0xNeshi Dec 22, 2025
8ce7fea
ref: move rewind validation to RewindHandler
0xNeshi Dec 22, 2025
e81d87f
docs: move stream_rewind links to end of docs
0xNeshi Dec 22, 2025
7349982
docs: simplify BlockRangeScanner link
0xNeshi Dec 22, 2025
83a37a3
ref: make all start fns accept immutable self
0xNeshi Dec 22, 2025
7c04fae
test: make all BRS vars immutable
0xNeshi Dec 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/block_range_scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod builder;
mod common;
mod range_iterator;
mod reorg_handler;
mod rewind_handler;
mod ring_buffer;
mod scanner;
mod sync_handler;
Expand Down
189 changes: 189 additions & 0 deletions src/block_range_scanner/rewind_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::cmp::Ordering;

use alloy::{
consensus::BlockHeader,
eips::{BlockId, BlockNumberOrTag},
network::{BlockResponse, Network, primitives::HeaderResponse},
};
use tokio::{sync::mpsc, try_join};

use crate::{
Notification, ScannerError,
block_range_scanner::{
common::BlockScannerResult, range_iterator::RangeIterator, reorg_handler::ReorgHandler,
ring_buffer::RingBufferCapacity,
},
robust_provider::RobustProvider,
types::TryStream,
};

pub(crate) struct RewindHandler<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
start_id: BlockId,
end_id: BlockId,
sender: mpsc::Sender<BlockScannerResult>,
reorg_handler: ReorgHandler<N>,
}

impl<N: Network> RewindHandler<N> {
pub fn new(
provider: RobustProvider<N>,
max_block_range: u64,
start_id: BlockId,
end_id: BlockId,
past_blocks_storage_capacity: RingBufferCapacity,
sender: mpsc::Sender<BlockScannerResult>,
) -> Self {
let reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
Self { provider, max_block_range, start_id, end_id, sender, reorg_handler }
}

pub async fn run(self) -> Result<(), ScannerError> {
let RewindHandler {
provider,
max_block_range,
start_id,
end_id,
sender,
mut reorg_handler,
} = self;

let (start_block, end_block) =
try_join!(provider.get_block(start_id), provider.get_block(end_id))?;

// normalize block range: from (higher) -> to (lower)
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
Ordering::Greater => (start_block, end_block),
_ => (end_block, start_block),
};

tokio::spawn(async move {
Self::handle_stream_rewind(
from,
to,
max_block_range,
&sender,
&provider,
&mut reorg_handler,
)
.await;
});

Ok(())
}

/// Streams blocks in reverse order from `from` to `to`.
async fn handle_stream_rewind(
from: N::BlockResponse,
to: N::BlockResponse,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) {
// for checking whether reorg occurred
let mut tip = from;

let from = tip.header().number();
let to = to.header().number();

let finalized_block = match provider.get_block_by_number(BlockNumberOrTag::Finalized).await
{
Ok(block) => block,
Err(e) => {
error!(error = %e, "Failed to get finalized block");
_ = sender.try_stream(e).await;
return;
}
};

let finalized_number = finalized_block.header().number();

// only check reorg if our tip is after the finalized block
let check_reorg = tip.header().number() > finalized_number;

let mut iter = RangeIterator::reverse(from, to, max_block_range);
for range in &mut iter {
// stream the range regularly, i.e. from smaller block number to greater
if !sender.try_stream(range).await {
break;
}

if check_reorg {
let reorg = match reorg_handler.check(&tip).await {
Ok(opt) => opt,
Err(e) => {
error!(error = %e, "Terminal RPC call error, shutting down");
_ = sender.try_stream(e).await;
return;
}
};

if let Some(common_ancestor) = reorg &&
!Self::handle_reorg_rescan(
&mut tip,
common_ancestor,
max_block_range,
sender,
provider,
)
.await
{
return;
}
}
}

info!(batch_count = iter.batch_count(), "Rewind completed");
}

/// Handles re-scanning of reorged blocks.
///
/// Returns `true` on success, `false` if stream closed or terminal error occurred.
async fn handle_reorg_rescan(
tip: &mut N::BlockResponse,
common_ancestor: N::BlockResponse,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
) -> bool {
let tip_number = tip.header().number();
let common_ancestor = common_ancestor.header().number();
info!(
block_number = %tip_number,
hash = %HeaderResponse::hash(tip.header()),
common_ancestor = %common_ancestor,
"Reorg detected"
);

if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
return false;
}

// Get the new tip block (same height as original tip, but new hash)
*tip = match provider.get_block_by_number(tip_number.into()).await {
Ok(block) => block,
Err(e) => {
if matches!(e, crate::robust_provider::Error::BlockNotFound(_)) {
error!("Unexpected error: pre-reorg chain tip should exist on a reorged chain");
} else {
error!(error = %e, "Terminal RPC call error, shutting down");
}
_ = sender.try_stream(e).await;
return false;
}
};

// Re-scan only the affected range (from common_ancestor + 1 up to tip)
let rescan_from = common_ancestor + 1;

for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) {
if !sender.try_stream(batch).await {
return false;
}
}

true
}
}
Loading