Skip to content

Commit 87b7362

Browse files
LeoPatOZ0xNeshi
andauthored
Abstract block looping to seperate crate (#247)
Co-authored-by: Nenad <[email protected]>
1 parent 1e89606 commit 87b7362

File tree

4 files changed

+372
-64
lines changed

4 files changed

+372
-64
lines changed

src/block_range_scanner.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -79,18 +79,21 @@ use alloy::{
7979
network::{BlockResponse, Network, primitives::HeaderResponse},
8080
primitives::BlockNumber,
8181
};
82-
use tracing::{debug, error, info, warn};
82+
use tracing::{error, info, warn};
8383

8484
mod common;
85+
mod range_iterator;
8586
mod reorg_handler;
8687
mod ring_buffer;
8788
mod sync_handler;
8889

90+
pub(crate) use range_iterator::RangeIterator;
91+
8992
use reorg_handler::ReorgHandler;
9093
pub use ring_buffer::RingBufferCapacity;
9194

9295
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
93-
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
96+
9497
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
9598

9699
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
@@ -472,8 +475,6 @@ impl<N: Network> Service<N> {
472475
provider: &RobustProvider<N>,
473476
reorg_handler: &mut ReorgHandler<N>,
474477
) {
475-
let mut batch_count = 0;
476-
477478
// for checking whether reorg occurred
478479
let mut tip = from;
479480

@@ -490,29 +491,15 @@ impl<N: Network> Service<N> {
490491
}
491492
};
492493

493-
// we're iterating in reverse
494-
let mut batch_from = from;
495494
let finalized_number = finalized_block.header().number();
496495

497496
// only check reorg if our tip is after the finalized block
498497
let check_reorg = tip.header().number() > finalized_number;
499498

500-
while batch_from >= to {
501-
let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
502-
499+
let mut iter = RangeIterator::reverse(from, to, max_block_range);
500+
for range in &mut iter {
503501
// stream the range regularly, i.e. from smaller block number to greater
504-
if !sender.try_stream(batch_to..=batch_from).await {
505-
break;
506-
}
507-
508-
batch_count += 1;
509-
if batch_count % 10 == 0 {
510-
debug!(batch_count = batch_count, "Processed rewind batches");
511-
}
512-
513-
// check early if end of stream achieved to avoid subtraction overflow when `to
514-
// == 0`
515-
if batch_to == to {
502+
if !sender.try_stream(range).await {
516503
break;
517504
}
518505

@@ -539,11 +526,9 @@ impl<N: Network> Service<N> {
539526
return;
540527
}
541528
}
542-
543-
batch_from = batch_to - 1;
544529
}
545530

546-
info!(batch_count = batch_count, "Rewind completed");
531+
info!(batch_count = iter.batch_count(), "Rewind completed");
547532
}
548533

549534
/// Handles re-scanning of reorged blocks.
@@ -586,18 +571,10 @@ impl<N: Network> Service<N> {
586571
// Re-scan only the affected range (from common_ancestor + 1 up to tip)
587572
let rescan_from = common_ancestor + 1;
588573

589-
let mut rescan_batch_start = rescan_from;
590-
while rescan_batch_start <= tip_number {
591-
let rescan_batch_end = (rescan_batch_start + max_block_range - 1).min(tip_number);
592-
593-
if !sender.try_stream(rescan_batch_start..=rescan_batch_end).await {
574+
for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) {
575+
if !sender.try_stream(batch).await {
594576
return false;
595577
}
596-
597-
if rescan_batch_end == tip_number {
598-
break;
599-
}
600-
rescan_batch_start = rescan_batch_end + 1;
601578
}
602579

603580
true

src/block_range_scanner/common.rs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use tokio_stream::StreamExt;
33

44
use crate::{
55
ScannerError,
6-
block_range_scanner::{BlockScannerResult, reorg_handler::ReorgHandler},
6+
block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler},
77
robust_provider::{RobustProvider, RobustSubscription, subscription},
88
types::{Notification, TryStream},
99
};
@@ -13,7 +13,7 @@ use alloy::{
1313
network::{BlockResponse, Network},
1414
primitives::BlockNumber,
1515
};
16-
use tracing::{debug, error, info, warn};
16+
use tracing::{error, info, warn};
1717

1818
#[allow(clippy::too_many_arguments)]
1919
pub(crate) async fn stream_live_blocks<N: Network>(
@@ -347,18 +347,17 @@ pub(crate) async fn stream_historical_range<N: Network>(
347347
};
348348

349349
// no reorg check for finalized blocks
350-
let mut batch_start = start;
351350
let finalized_batch_end = finalized.min(end);
352-
while batch_start <= finalized_batch_end {
353-
let batch_end = batch_start.saturating_add(max_block_range - 1).min(finalized_batch_end);
354-
355-
if !sender.try_stream(batch_start..=batch_end).await {
351+
for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
352+
if !sender.try_stream(range).await {
356353
return None; // channel closed
357354
}
358-
359-
batch_start = batch_end + 1;
360355
}
361356

357+
// If start > finalized_batch_end, the loop above was empty and we should
358+
// continue from start. Otherwise, continue from after finalized_batch_end.
359+
let batch_start = start.max(finalized_batch_end + 1);
360+
362361
// covers case when `end <= finalized`
363362
if batch_start > end {
364363
return Some(()); // we're done
@@ -392,35 +391,31 @@ pub(crate) async fn stream_historical_range<N: Network>(
392391
/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
393392
pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
394393
min_common_ancestor: BlockNumber,
395-
mut next_start_block: BlockNumber,
394+
next_start_block: BlockNumber,
396395
end: BlockNumber,
397396
max_block_range: u64,
398397
sender: &mpsc::Sender<BlockScannerResult>,
399398
provider: &RobustProvider<N>,
400399
reorg_handler: &mut ReorgHandler<N>,
401400
) -> Option<N::BlockResponse> {
402-
let mut batch_count = 0;
401+
let mut last_batch_end: Option<N::BlockResponse> = None;
402+
let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);
403403

404-
loop {
405-
let batch_end_num = next_start_block.saturating_add(max_block_range - 1).min(end);
404+
while let Some(batch) = iter.next() {
405+
let batch_end_num = *batch.end();
406406
let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
407407
Ok(block) => block,
408408
Err(e) => {
409-
error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
409+
error!(batch_start = batch.start(), batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
410410
_ = sender.try_stream(e).await;
411411
return None;
412412
}
413413
};
414414

415-
if !sender.try_stream(next_start_block..=batch_end_num).await {
415+
if !sender.try_stream(batch).await {
416416
return None; // channel closed
417417
}
418418

419-
batch_count += 1;
420-
if batch_count % 10 == 0 {
421-
debug!(batch_count = batch_count, "Processed historical batches");
422-
}
423-
424419
let reorged_opt = match reorg_handler.check(&batch_end).await {
425420
Ok(opt) => opt,
426421
Err(e) => {
@@ -430,19 +425,17 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
430425
}
431426
};
432427

433-
next_start_block = if let Some(common_ancestor) = reorged_opt {
428+
if let Some(common_ancestor) = reorged_opt {
434429
let common_ancestor = common_ancestor.header().number();
435430
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
436431
return None;
437432
}
438-
(common_ancestor + 1).max(min_common_ancestor)
439-
} else {
440-
batch_end_num + 1
441-
};
442-
443-
if next_start_block > end {
444-
info!(batch_count = batch_count, "Historical sync completed");
445-
return Some(batch_end);
433+
iter.reset_to((common_ancestor + 1).max(min_common_ancestor));
446434
}
435+
436+
last_batch_end = Some(batch_end);
447437
}
438+
439+
info!(batch_count = iter.batch_count(), "Historical sync completed");
440+
last_batch_end
448441
}

0 commit comments

Comments
 (0)