Skip to content

Commit 05fc91a

Browse files
committed
feat: use historic handling when streaming live range larger than 1
1 parent 667d08f commit 05fc91a

File tree

1 file changed

+34
-44
lines changed

1 file changed

+34
-44
lines changed

src/block_range_scanner.rs

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -605,26 +605,26 @@ impl<N: Network> Service<N> {
605605
sender: &mpsc::Sender<Message>,
606606
provider: &RobustProvider<N>,
607607
reorg_handler: &mut ReorgHandler<N>,
608-
) {
608+
) -> Option<N::BlockResponse> {
609609
let mut batch_count = 0;
610610

611611
let mut next_start_block = start;
612612

613613
// must be <= to include the edge case when start == end (i.e. return the single block
614614
// range)
615-
while next_start_block <= end {
615+
loop {
616616
let batch_end_num = next_start_block.saturating_add(max_block_range - 1).min(end);
617617
let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
618618
Ok(block) => block,
619619
Err(e) => {
620620
error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
621621
_ = sender.try_stream(e).await;
622-
return;
622+
return None;
623623
}
624624
};
625625

626626
if !sender.try_stream(next_start_block..=batch_end_num).await {
627-
break;
627+
return Some(batch_end);
628628
}
629629

630630
batch_count += 1;
@@ -637,21 +637,24 @@ impl<N: Network> Service<N> {
637637
Err(e) => {
638638
error!(error = %e, "Failed to perform reorg check");
639639
_ = sender.try_stream(e).await;
640-
return;
640+
return None;
641641
}
642642
};
643643

644644
next_start_block = if let Some(common_ancestor) = reorged_opt {
645645
if !sender.try_stream(ScannerStatus::ReorgDetected).await {
646-
return;
646+
return None;
647647
}
648648
common_ancestor.header().number() + 1
649649
} else {
650650
batch_end_num.saturating_add(1)
651651
};
652-
}
653652

654-
info!(batch_count = batch_count, "Historical sync completed");
653+
if next_start_block > end {
654+
info!(batch_count = batch_count, "Historical sync completed");
655+
return Some(batch_end);
656+
}
657+
}
655658
}
656659

657660
async fn stream_live_blocks(
@@ -685,7 +688,7 @@ impl<N: Network> Service<N> {
685688

686689
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
687690

688-
Self::stream_historical_blocks(
691+
let mut previous_batch_end = Self::stream_historical_blocks(
689692
stream_start,
690693
confirmed,
691694
max_block_range,
@@ -695,15 +698,12 @@ impl<N: Network> Service<N> {
695698
)
696699
.await;
697700

701+
if previous_batch_end.is_none() {
702+
// the sender channel is closed
703+
return;
704+
}
705+
698706
let mut batch_start = stream_start;
699-
let mut previous_batch_end = match provider.get_block_by_number(confirmed.into()).await {
700-
Ok(block) => Some(block),
701-
Err(e) => {
702-
error!(batch_start = batch_start, batch_end = confirmed, error = %e, "Failed to get initial batch end block");
703-
_ = sender.try_stream(e).await;
704-
return;
705-
}
706-
};
707707

708708
while let Some(incoming_block) = stream.next().await {
709709
let incoming_block_num = incoming_block.number();
@@ -747,35 +747,25 @@ impl<N: Network> Service<N> {
747747
}
748748
}
749749

750-
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
751-
if confirmed >= batch_start {
752-
loop {
753-
// NOTE: Edge case when difference between range end and range start >= max
754-
// reads
755-
let batch_end_num =
756-
confirmed.min(batch_start.saturating_add(max_block_range - 1));
757-
previous_batch_end = match provider
758-
.get_block_by_number(batch_end_num.into())
759-
.await
760-
{
761-
Ok(block) => Some(block),
762-
Err(e) => {
763-
error!(batch_start = batch_start, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
764-
_ = sender.try_stream(e).await;
765-
return;
766-
}
767-
};
768-
if !sender.try_stream(batch_start..=batch_end_num).await {
769-
return;
770-
}
771-
772-
// SAFETY: Overflow cannot realistically happen
773-
batch_start = batch_end_num + 1;
750+
let batch_end_num = incoming_block_num.saturating_sub(block_confirmations);
751+
if batch_end_num >= batch_start {
752+
previous_batch_end = Self::stream_historical_blocks(
753+
batch_start,
754+
batch_end_num,
755+
max_block_range,
756+
&sender,
757+
provider,
758+
reorg_handler,
759+
)
760+
.await;
774761

775-
if batch_end_num == confirmed {
776-
break;
777-
}
762+
if previous_batch_end.is_none() {
763+
// the sender channel is closed
764+
return;
778765
}
766+
767+
// SAFETY: Overflow cannot realistically happen
768+
batch_start = batch_end_num + 1;
779769
}
780770
}
781771
}

0 commit comments

Comments
 (0)