Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
21c4a6e
feat: add common ancestor to notif
LeoPatOZ Dec 4, 2025
5ef900b
fix: update test for new reorg notif
LeoPatOZ Dec 4, 2025
c956b43
feat: update common fn with new ancestor block notif
LeoPatOZ Dec 4, 2025
4bc5b1f
feat: add reorg streaming and log removal logic
LeoPatOZ Dec 4, 2025
567ddda
ref: move reorg check to internal
LeoPatOZ Dec 4, 2025
31c0e5a
feat: finalized tip reorg opt
LeoPatOZ Dec 4, 2025
5af945c
format: common rs
LeoPatOZ Dec 5, 2025
4737724
Merge branch 'main' into reorg-log-opt
LeoPatOZ Dec 5, 2025
e0286f2
ref: simplify if else
LeoPatOZ Dec 5, 2025
8f204d5
test: latest roerg logic
LeoPatOZ Dec 5, 2025
637fa87
doc: add note about possible opt
LeoPatOZ Dec 5, 2025
3633605
doc: fix comment
LeoPatOZ Dec 5, 2025
11c71aa
Merge branch 'main' into reorg-log-opt
LeoPatOZ Dec 5, 2025
aa6c884
ref: ignore reorg tests for now (add comment why)
LeoPatOZ Dec 8, 2025
c27b70d
ref: remove from in handle reorg
LeoPatOZ Dec 8, 2025
c74e6d5
fix: only send reorg range but continue rewind from previous batch to
LeoPatOZ Dec 8, 2025
e357412
ref: remove retain
LeoPatOZ Dec 8, 2025
b0eaf4b
ref: better reorg block included test + naming
LeoPatOZ Dec 8, 2025
87ad933
ref: use skip_while
LeoPatOZ Dec 8, 2025
3be4d69
feat: maintain proper log ordering + tests
LeoPatOZ Dec 8, 2025
c7584ae
Merge branch 'main' into reorg-log-opt
LeoPatOZ Dec 8, 2025
d8a7fb0
Merge branch 'reorg-log-opt' into maintain-log-order-rewind
LeoPatOZ Dec 8, 2025
81875e4
doc: add comment about reorg handling
LeoPatOZ Dec 8, 2025
d8ffb21
Merge branch 'reorg-log-opt' into maintain-log-order-rewind
LeoPatOZ Dec 9, 2025
ea42f53
ref: use range end instead + remove clone
LeoPatOZ Dec 9, 2025
b216fa7
comment: explain ignored test in comment
LeoPatOZ Dec 9, 2025
ffe122e
test: add BRS tests (ignored until ack channel)
LeoPatOZ Dec 9, 2025
f695d40
Merge branch 'reorg-log-opt' into maintain-log-order-rewind
LeoPatOZ Dec 9, 2025
3370ebe
feat: reverse rewind range to maintain event order
LeoPatOZ Dec 10, 2025
67293ce
feat: add batch iterator helper
LeoPatOZ Dec 10, 2025
1de069d
ref: use for loop
LeoPatOZ Dec 10, 2025
500405e
fix: small bug with batch start
LeoPatOZ Dec 10, 2025
26c0fce
Merge branch 'main' into abstract-loop
LeoPatOZ Dec 11, 2025
12de3ea
fix: merge
LeoPatOZ Dec 11, 2025
f396bc9
fix: doc
LeoPatOZ Dec 11, 2025
3abc3ad
fix: fmt
LeoPatOZ Dec 11, 2025
b20e9b7
Merge branch 'main' into abstract-loop
LeoPatOZ Dec 15, 2025
7a5692c
Merge branch 'main' into abstract-loop
LeoPatOZ Dec 15, 2025
51a29e0
ref: remove example
LeoPatOZ Dec 15, 2025
47134df
ref: use typestate pattern
LeoPatOZ Dec 15, 2025
e38ed39
Merge branch 'main' into abstract-loop
LeoPatOZ Dec 15, 2025
01b0706
ref: remove dead code clippy
LeoPatOZ Dec 15, 2025
1b23f99
feat: add total batches and remove option current
LeoPatOZ Dec 16, 2025
17cbcca
ref: remove tests
LeoPatOZ Dec 16, 2025
a9b4e43
ref: remove unused fns
LeoPatOZ Dec 16, 2025
526ba75
ref: use pub const
LeoPatOZ Dec 16, 2025
2131781
feat: add size hint
LeoPatOZ Dec 16, 2025
217626b
Merge branch 'main' into abstract-loop
LeoPatOZ Dec 16, 2025
3cd38d1
revert: exact size + fix size_hint
LeoPatOZ Dec 17, 2025
606cf4b
ref: use consitent end logic
LeoPatOZ Dec 17, 2025
628aae6
feat: size hint for foward
LeoPatOZ Dec 17, 2025
41469b9
ref: batch --> range
LeoPatOZ Dec 18, 2025
e76aded
remove reference to taiko constant values
0xNeshi Dec 18, 2025
cc39627
add newline between DEFAULT_MAX_BLOCK_RANGE and DEFAULT_BLOCK_CONFIRM…
0xNeshi Dec 18, 2025
6c326f6
Merge branch 'main' into abstract-loop
LeoPatOZ Dec 18, 2025
aced50f
Merge branch 'main' into abstract-loop
0xNeshi Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,21 @@ use alloy::{
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::BlockNumber,
};
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};

mod common;
mod range_iterator;
mod reorg_handler;
mod ring_buffer;
mod sync_handler;

pub(crate) use range_iterator::RangeIterator;

use reorg_handler::ReorgHandler;
pub use ring_buffer::RingBufferCapacity;

pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19

pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;

pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
Expand Down Expand Up @@ -472,8 +475,6 @@ impl<N: Network> Service<N> {
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) {
let mut batch_count = 0;

// for checking whether reorg occurred
let mut tip = from;

Expand All @@ -490,29 +491,15 @@ impl<N: Network> Service<N> {
}
};

// we're iterating in reverse
let mut batch_from = from;
let finalized_number = finalized_block.header().number();

// only check reorg if our tip is after the finalized block
let check_reorg = tip.header().number() > finalized_number;

while batch_from >= to {
let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);

let mut iter = RangeIterator::reverse(from, to, max_block_range);
for range in &mut iter {
// stream the range regularly, i.e. from smaller block number to greater
if !sender.try_stream(batch_to..=batch_from).await {
break;
}

batch_count += 1;
if batch_count % 10 == 0 {
debug!(batch_count = batch_count, "Processed rewind batches");
}

// check early if end of stream achieved to avoid subtraction overflow when `to
// == 0`
if batch_to == to {
if !sender.try_stream(range).await {
break;
}

Expand All @@ -539,11 +526,9 @@ impl<N: Network> Service<N> {
return;
}
}

batch_from = batch_to - 1;
}

info!(batch_count = batch_count, "Rewind completed");
info!(batch_count = iter.batch_count(), "Rewind completed");
}

/// Handles re-scanning of reorged blocks.
Expand Down Expand Up @@ -586,18 +571,10 @@ impl<N: Network> Service<N> {
// Re-scan only the affected range (from common_ancestor + 1 up to tip)
let rescan_from = common_ancestor + 1;

let mut rescan_batch_start = rescan_from;
while rescan_batch_start <= tip_number {
let rescan_batch_end = (rescan_batch_start + max_block_range - 1).min(tip_number);

if !sender.try_stream(rescan_batch_start..=rescan_batch_end).await {
for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) {
if !sender.try_stream(batch).await {
return false;
}

if rescan_batch_end == tip_number {
break;
}
rescan_batch_start = rescan_batch_end + 1;
}

true
Expand Down
51 changes: 22 additions & 29 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tokio_stream::StreamExt;

use crate::{
ScannerError,
block_range_scanner::{BlockScannerResult, reorg_handler::ReorgHandler},
block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler},
robust_provider::{RobustProvider, RobustSubscription, subscription},
types::{Notification, TryStream},
};
Expand All @@ -13,7 +13,7 @@ use alloy::{
network::{BlockResponse, Network},
primitives::BlockNumber,
};
use tracing::{debug, error, info, warn};
use tracing::{error, info, warn};

#[allow(clippy::too_many_arguments)]
pub(crate) async fn stream_live_blocks<N: Network>(
Expand Down Expand Up @@ -347,18 +347,17 @@ pub(crate) async fn stream_historical_range<N: Network>(
};

// no reorg check for finalized blocks
let mut batch_start = start;
let finalized_batch_end = finalized.min(end);
while batch_start <= finalized_batch_end {
let batch_end = batch_start.saturating_add(max_block_range - 1).min(finalized_batch_end);

if !sender.try_stream(batch_start..=batch_end).await {
for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
if !sender.try_stream(range).await {
return None; // channel closed
}

batch_start = batch_end + 1;
}

// If start > finalized_batch_end, the loop above was empty and we should
// continue from start. Otherwise, continue from after finalized_batch_end.
let batch_start = start.max(finalized_batch_end + 1);

// covers case when `end <= finalized`
if batch_start > end {
return Some(()); // we're done
Expand Down Expand Up @@ -392,35 +391,31 @@ pub(crate) async fn stream_historical_range<N: Network>(
/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
min_common_ancestor: BlockNumber,
mut next_start_block: BlockNumber,
next_start_block: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) -> Option<N::BlockResponse> {
let mut batch_count = 0;
let mut last_batch_end: Option<N::BlockResponse> = None;
let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);

loop {
let batch_end_num = next_start_block.saturating_add(max_block_range - 1).min(end);
while let Some(batch) = iter.next() {
let batch_end_num = *batch.end();
let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
Ok(block) => block,
Err(e) => {
error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
error!(batch_start = batch.start(), batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
_ = sender.try_stream(e).await;
return None;
}
};

if !sender.try_stream(next_start_block..=batch_end_num).await {
if !sender.try_stream(batch).await {
return None; // channel closed
}

batch_count += 1;
if batch_count % 10 == 0 {
debug!(batch_count = batch_count, "Processed historical batches");
}

let reorged_opt = match reorg_handler.check(&batch_end).await {
Ok(opt) => opt,
Err(e) => {
Expand All @@ -430,19 +425,17 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
}
};

next_start_block = if let Some(common_ancestor) = reorged_opt {
if let Some(common_ancestor) = reorged_opt {
let common_ancestor = common_ancestor.header().number();
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
return None;
}
(common_ancestor + 1).max(min_common_ancestor)
} else {
batch_end_num + 1
};

if next_start_block > end {
info!(batch_count = batch_count, "Historical sync completed");
return Some(batch_end);
iter.reset_to((common_ancestor + 1).max(min_common_ancestor));
}

last_batch_end = Some(batch_end);
}

info!(batch_count = iter.batch_count(), "Historical sync completed");
last_batch_end
}
Loading