Skip to content

Commit f8fc694

Browse files
authored
Optimization: Fetching Historic Block Ranges Before the Finalized Block Should Omit Reorg Check (#221)
1 parent e089e9d commit f8fc694

File tree

3 files changed

+98
-22
lines changed

3 files changed

+98
-22
lines changed

src/block_range_scanner.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,17 @@ impl<N: Network> Service<N> {
370370
_ => (start_block_num, end_block_num),
371371
};
372372

373-
info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data");
373+
info!(
374+
start_block = start_block_num,
375+
end_block = end_block_num,
376+
"Normalized the block range"
377+
);
374378

375379
tokio::spawn(async move {
376380
let mut reorg_handler =
377381
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
378382

379-
common::stream_block_range(
380-
start_block_num,
383+
_ = common::stream_historical_range(
381384
start_block_num,
382385
end_block_num,
383386
max_block_range,

src/block_range_scanner/common.rs

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
};
1010
use alloy::{
1111
consensus::BlockHeader,
12+
eips::BlockNumberOrTag,
1213
network::{BlockResponse, Network},
1314
primitives::BlockNumber,
1415
};
@@ -135,9 +136,12 @@ async fn initialize_live_streaming_state<N: Network>(
135136

136137
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
137138

139+
// The minimum common ancestor is the block before the stream start
140+
let min_common_ancestor = stream_start.saturating_sub(1);
141+
138142
// Catch up on any confirmed blocks between stream_start and the confirmed tip
139-
let previous_batch_end = stream_block_range(
140-
stream_start,
143+
let previous_batch_end = stream_range_with_reorg_handling(
144+
min_common_ancestor,
141145
stream_start,
142146
confirmed,
143147
max_block_range,
@@ -289,8 +293,11 @@ async fn stream_next_batch<N: Network>(
289293
return true;
290294
}
291295

292-
state.previous_batch_end = stream_block_range(
293-
stream_start,
296+
// The minimum common ancestor is the block before the stream start
297+
let min_common_ancestor = stream_start.saturating_sub(1);
298+
299+
state.previous_batch_end = stream_range_with_reorg_handling(
300+
min_common_ancestor,
294301
state.batch_start,
295302
batch_end_num,
296303
max_block_range,
@@ -319,9 +326,72 @@ struct LiveStreamingState<N: Network> {
319326
previous_batch_end: Option<N::BlockResponse>,
320327
}
321328

322-
/// Assumes that `min_block <= next_start_block <= end`.
323-
pub(crate) async fn stream_block_range<N: Network>(
324-
min_block: BlockNumber,
329+
#[must_use]
330+
pub(crate) async fn stream_historical_range<N: Network>(
331+
start: BlockNumber,
332+
end: BlockNumber,
333+
max_block_range: u64,
334+
sender: &mpsc::Sender<BlockScannerResult>,
335+
provider: &RobustProvider<N>,
336+
reorg_handler: &mut ReorgHandler<N>,
337+
) -> Option<()> {
338+
info!("Getting finalized block number");
339+
let finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await
340+
{
341+
Ok(block) => block,
342+
Err(e) => {
343+
error!(error = %e, "Failed to get finalized block");
344+
_ = sender.try_stream(e).await;
345+
return None;
346+
}
347+
};
348+
349+
// no reorg check for finalized blocks
350+
let mut batch_start = start;
351+
let finalized_batch_end = finalized.min(end);
352+
while batch_start < finalized_batch_end {
353+
let batch_end = batch_start.saturating_add(max_block_range - 1).min(end);
354+
355+
if !sender.try_stream(batch_start..=batch_end).await {
356+
return None; // channel closed
357+
}
358+
359+
batch_start = batch_end + 1;
360+
}
361+
362+
// covers case when `end <= finalized`
363+
if batch_start > end {
364+
return Some(()); // we're done
365+
}
366+
367+
// we have non-finalized block numbers to stream, a reorg can occur
368+
369+
// Possible minimal common ancestors when a reorg occurs:
370+
// * start > finalized -> the common ancestor we care about is the block before `start`, that's
371+
// where the stream should restart -> this is why we used `start - 1`
372+
// * start == finalized -> `start` should never be re-streamed on reorgs; stream should restart
373+
// on `start + 1`
374+
// * start < finalized -> if we got here, then `end > finalized`; on reorg, we should only
375+
// re-stream non-finalized blocks
376+
let min_common_ancestor = (start.saturating_sub(1)).max(finalized);
377+
378+
stream_range_with_reorg_handling(
379+
min_common_ancestor,
380+
batch_start,
381+
end,
382+
max_block_range,
383+
sender,
384+
provider,
385+
reorg_handler,
386+
)
387+
.await?;
388+
389+
Some(())
390+
}
391+
392+
/// Assumes that `min_common_ancestor <= next_start_block <= end`, performs no internal checks.
393+
pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
394+
min_common_ancestor: BlockNumber,
325395
mut next_start_block: BlockNumber,
326396
end: BlockNumber,
327397
max_block_range: u64,
@@ -364,13 +434,10 @@ pub(crate) async fn stream_block_range<N: Network>(
364434
if !sender.try_stream(Notification::ReorgDetected).await {
365435
return None;
366436
}
367-
if common_ancestor.header().number() < min_block {
368-
min_block
369-
} else {
370-
common_ancestor.header().number() + 1
371-
}
437+
438+
min_common_ancestor.max(common_ancestor.header().number()) + 1
372439
} else {
373-
batch_end_num.saturating_add(1)
440+
batch_end_num + 1
374441
};
375442

376443
if next_start_block > end {

src/block_range_scanner/sync_handler.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ impl<N: Network> SyncHandler<N> {
127127
)
128128
.await
129129
{
130-
Ok(start_block) => start_block,
130+
Ok(Some(start_block)) => start_block,
131+
Ok(None) => {
132+
return; // channel closed
133+
}
131134
Err(e) => {
132135
error!(error = %e, "Error during historical catchup, shutting down");
133136
_ = sender.try_stream(e).await;
@@ -158,18 +161,21 @@ impl<N: Network> SyncHandler<N> {
158161
sender: &mpsc::Sender<BlockScannerResult>,
159162
provider: &RobustProvider<N>,
160163
reorg_handler: &mut ReorgHandler<N>,
161-
) -> Result<BlockNumber, ScannerError> {
164+
) -> Result<Option<BlockNumber>, ScannerError> {
162165
while start_block < confirmed_tip {
163-
common::stream_block_range(
164-
start_block,
166+
if common::stream_historical_range(
165167
start_block,
166168
confirmed_tip,
167169
max_block_range,
168170
sender,
169171
provider,
170172
reorg_handler,
171173
)
172-
.await;
174+
.await
175+
.is_none()
176+
{
177+
return Ok(None);
178+
}
173179

174180
let latest = provider.get_block_number().await?;
175181

@@ -179,7 +185,7 @@ impl<N: Network> SyncHandler<N> {
179185

180186
info!("Historical catchup complete, ready to transition to live");
181187

182-
Ok(start_block)
188+
Ok(Some(start_block))
183189
}
184190

185191
/// Subscribes to live blocks and begins streaming

0 commit comments

Comments
 (0)