Skip to content

Commit 281d874

Browse files
LeoPatOZ0xNeshi
andauthored
ref: extract BRS rewind logic into RewindHandler (#276)
Co-authored-by: 0xNeshi <[email protected]>
1 parent 7a7aee6 commit 281d874

File tree

8 files changed

+248
-187
lines changed

8 files changed

+248
-187
lines changed

src/block_range_scanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod builder;
22
mod common;
33
mod range_iterator;
44
mod reorg_handler;
5+
mod rewind_handler;
56
mod ring_buffer;
67
mod scanner;
78
mod sync_handler;
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
use std::cmp::Ordering;
2+
3+
use alloy::{
4+
consensus::BlockHeader,
5+
eips::{BlockId, BlockNumberOrTag},
6+
network::{BlockResponse, Network, primitives::HeaderResponse},
7+
};
8+
use tokio::{sync::mpsc, try_join};
9+
10+
use crate::{
11+
Notification, ScannerError,
12+
block_range_scanner::{
13+
common::BlockScannerResult, range_iterator::RangeIterator, reorg_handler::ReorgHandler,
14+
ring_buffer::RingBufferCapacity,
15+
},
16+
robust_provider::RobustProvider,
17+
types::TryStream,
18+
};
19+
20+
pub(crate) struct RewindHandler<N: Network> {
21+
provider: RobustProvider<N>,
22+
max_block_range: u64,
23+
start_id: BlockId,
24+
end_id: BlockId,
25+
sender: mpsc::Sender<BlockScannerResult>,
26+
reorg_handler: ReorgHandler<N>,
27+
}
28+
29+
impl<N: Network> RewindHandler<N> {
30+
pub fn new(
31+
provider: RobustProvider<N>,
32+
max_block_range: u64,
33+
start_id: BlockId,
34+
end_id: BlockId,
35+
past_blocks_storage_capacity: RingBufferCapacity,
36+
sender: mpsc::Sender<BlockScannerResult>,
37+
) -> Self {
38+
let reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
39+
Self { provider, max_block_range, start_id, end_id, sender, reorg_handler }
40+
}
41+
42+
pub async fn run(self) -> Result<(), ScannerError> {
43+
let RewindHandler {
44+
provider,
45+
max_block_range,
46+
start_id,
47+
end_id,
48+
sender,
49+
mut reorg_handler,
50+
} = self;
51+
52+
let (start_block, end_block) =
53+
try_join!(provider.get_block(start_id), provider.get_block(end_id))?;
54+
55+
// normalize block range: from (higher) -> to (lower)
56+
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
57+
Ordering::Greater => (start_block, end_block),
58+
_ => (end_block, start_block),
59+
};
60+
61+
tokio::spawn(async move {
62+
Self::handle_stream_rewind(
63+
from,
64+
to,
65+
max_block_range,
66+
&sender,
67+
&provider,
68+
&mut reorg_handler,
69+
)
70+
.await;
71+
});
72+
73+
Ok(())
74+
}
75+
76+
/// Streams blocks in reverse order from `from` to `to`.
77+
async fn handle_stream_rewind(
78+
from: N::BlockResponse,
79+
to: N::BlockResponse,
80+
max_block_range: u64,
81+
sender: &mpsc::Sender<BlockScannerResult>,
82+
provider: &RobustProvider<N>,
83+
reorg_handler: &mut ReorgHandler<N>,
84+
) {
85+
// for checking whether reorg occurred
86+
let mut tip = from;
87+
88+
let from = tip.header().number();
89+
let to = to.header().number();
90+
91+
let finalized_block = match provider.get_block_by_number(BlockNumberOrTag::Finalized).await
92+
{
93+
Ok(block) => block,
94+
Err(e) => {
95+
error!(error = %e, "Failed to get finalized block");
96+
_ = sender.try_stream(e).await;
97+
return;
98+
}
99+
};
100+
101+
let finalized_number = finalized_block.header().number();
102+
103+
// only check reorg if our tip is after the finalized block
104+
let check_reorg = tip.header().number() > finalized_number;
105+
106+
let mut iter = RangeIterator::reverse(from, to, max_block_range);
107+
for range in &mut iter {
108+
// stream the range regularly, i.e. from smaller block number to greater
109+
if !sender.try_stream(range).await {
110+
break;
111+
}
112+
113+
if check_reorg {
114+
let reorg = match reorg_handler.check(&tip).await {
115+
Ok(opt) => opt,
116+
Err(e) => {
117+
error!(error = %e, "Terminal RPC call error, shutting down");
118+
_ = sender.try_stream(e).await;
119+
return;
120+
}
121+
};
122+
123+
if let Some(common_ancestor) = reorg &&
124+
!Self::handle_reorg_rescan(
125+
&mut tip,
126+
common_ancestor,
127+
max_block_range,
128+
sender,
129+
provider,
130+
)
131+
.await
132+
{
133+
return;
134+
}
135+
}
136+
}
137+
138+
info!(batch_count = iter.batch_count(), "Rewind completed");
139+
}
140+
141+
/// Handles re-scanning of reorged blocks.
142+
///
143+
/// Returns `true` on success, `false` if stream closed or terminal error occurred.
144+
async fn handle_reorg_rescan(
145+
tip: &mut N::BlockResponse,
146+
common_ancestor: N::BlockResponse,
147+
max_block_range: u64,
148+
sender: &mpsc::Sender<BlockScannerResult>,
149+
provider: &RobustProvider<N>,
150+
) -> bool {
151+
let tip_number = tip.header().number();
152+
let common_ancestor = common_ancestor.header().number();
153+
info!(
154+
block_number = %tip_number,
155+
hash = %HeaderResponse::hash(tip.header()),
156+
common_ancestor = %common_ancestor,
157+
"Reorg detected"
158+
);
159+
160+
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
161+
return false;
162+
}
163+
164+
// Get the new tip block (same height as original tip, but new hash)
165+
*tip = match provider.get_block_by_number(tip_number.into()).await {
166+
Ok(block) => block,
167+
Err(e) => {
168+
if matches!(e, crate::robust_provider::Error::BlockNotFound(_)) {
169+
error!("Unexpected error: pre-reorg chain tip should exist on a reorged chain");
170+
} else {
171+
error!(error = %e, "Terminal RPC call error, shutting down");
172+
}
173+
_ = sender.try_stream(e).await;
174+
return false;
175+
}
176+
};
177+
178+
// Re-scan only the affected range (from common_ancestor + 1 up to tip)
179+
let rescan_from = common_ancestor + 1;
180+
181+
for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) {
182+
if !sender.try_stream(batch).await {
183+
return false;
184+
}
185+
}
186+
187+
true
188+
}
189+
}

0 commit comments

Comments
 (0)