Skip to content

Commit 499b683

Browse files
committed
Merge branch 'block-scanner' into migrate-event-scanner-tests
2 parents cafdfb2 + 8cd6f30 commit 499b683

File tree

6 files changed

+1035
-95
lines changed

6 files changed

+1035
-95
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/target
22
/examples/**/target
33
.DS_Store
4+
.vscode

Cargo.toml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
[workspace]
2-
members = [
3-
".",
4-
"examples/historical_scanning",
5-
"examples/simple_counter",
6-
]
2+
members = [".", "examples/historical_scanning", "examples/simple_counter"]
73
resolver = "2"
84

95
[lints.clippy]

src/block_scanner.rs

Lines changed: 187 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
#![allow(unused)]
22

3-
use std::{future, marker::PhantomData, ops::Range, time::Duration};
3+
use std::{marker::PhantomData, ops::Range, time::Duration};
44

5-
use tokio::sync::mpsc::{self, Receiver, Sender};
5+
use tokio::sync::mpsc::{self, error::SendError};
66
use tokio_stream::wrappers::ReceiverStream;
77

88
use alloy::{
9+
consensus::BlockHeader,
910
eips::BlockNumberOrTag,
10-
network::Network,
11+
network::{BlockResponse, Network, primitives::HeaderResponse},
12+
primitives::{BlockHash, BlockNumber},
1113
providers::{Provider, RootProvider},
1214
pubsub::PubSubConnect,
1315
rpc::{
1416
client::{ClientBuilder, RpcClient},
1517
types::Header,
1618
},
17-
transports::{TransportError, ipc::IpcConnect, ws::WsConnect},
19+
transports::{RpcError, TransportErrorKind, ipc::IpcConnect, ws::WsConnect},
1820
};
1921

2022
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
2123
const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
2224
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12);
2325
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
2426

27+
const CHANNEL_BUFFER_SIZE: usize = 10000;
28+
const MAX_BUFFERED_MESSAGES: usize = 50000;
29+
2530
// TODO: determine check exact default value
2631
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
2732

@@ -30,6 +35,16 @@ pub enum BlockScannerError {
3035
ErrEOF,
3136
ErrContinue,
3237
TerminalError(u64),
38+
EndHeightSmallerThanStartHeight(BlockNumberOrTag, BlockNumberOrTag),
39+
NonExistentStartHeader(BlockNumberOrTag),
40+
NonExistentEndHeader(BlockNumberOrTag),
41+
Rpc(RpcError<TransportErrorKind>),
42+
}
43+
44+
impl From<RpcError<TransportErrorKind>> for BlockScannerError {
45+
fn from(value: RpcError<TransportErrorKind>) -> Self {
46+
BlockScannerError::Rpc(value)
47+
}
3348
}
3449

3550
impl std::fmt::Display for BlockScannerError {
@@ -40,6 +55,43 @@ impl std::fmt::Display for BlockScannerError {
4055
BlockScannerError::TerminalError(height) => {
4156
write!(f, "terminal error at block height {height}")
4257
}
58+
BlockScannerError::EndHeightSmallerThanStartHeight(start, end) => {
59+
write!(f, "start height ({start}) > end height ({end})")
60+
}
61+
BlockScannerError::NonExistentStartHeader(height) => {
62+
write!(f, "failed to get start header, height: {height}")
63+
}
64+
BlockScannerError::NonExistentEndHeader(height) => {
65+
write!(f, "failed to get end header, height: {height}")
66+
}
67+
BlockScannerError::Rpc(err) => err.fmt(f),
68+
}
69+
}
70+
}
71+
72+
#[derive(Debug)]
73+
pub enum StartError {
74+
BlockScannerError(BlockScannerError),
75+
SendError(SendError<Result<Range<BlockNumber>, BlockScannerError>>),
76+
}
77+
78+
impl From<SendError<Result<Range<BlockNumber>, BlockScannerError>>> for StartError {
79+
fn from(value: SendError<Result<Range<BlockNumber>, BlockScannerError>>) -> Self {
80+
StartError::SendError(value)
81+
}
82+
}
83+
84+
impl From<BlockScannerError> for StartError {
85+
fn from(value: BlockScannerError) -> Self {
86+
StartError::BlockScannerError(value)
87+
}
88+
}
89+
90+
impl std::fmt::Display for StartError {
91+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92+
match self {
93+
StartError::BlockScannerError(err) => err.fmt(f),
94+
StartError::SendError(err) => err.fmt(f),
4395
}
4496
}
4597
}
@@ -129,9 +181,9 @@ impl<N: Network> BlockScannerBuilder<N> {
129181
pub async fn connect_ws(
130182
self,
131183
connect: WsConnect,
132-
) -> Result<BlockScanner<RootProvider<N>, N>, TransportError> {
184+
) -> Result<BlockScanner<RootProvider<N>, N>, BlockScannerError> {
133185
let client = ClientBuilder::default().ws(connect).await?;
134-
Ok(self.connect_client(client))
186+
self.connect_client(client).await
135187
}
136188

137189
/// Connects to the provider via IPC
@@ -142,37 +194,103 @@ impl<N: Network> BlockScannerBuilder<N> {
142194
pub async fn connect_ipc<T>(
143195
self,
144196
connect: IpcConnect<T>,
145-
) -> Result<BlockScanner<RootProvider<N>, N>, TransportError>
197+
) -> Result<BlockScanner<RootProvider<N>, N>, BlockScannerError>
146198
where
147199
IpcConnect<T>: PubSubConnect,
148200
{
149201
let client = ClientBuilder::default().ipc(connect).await?;
150-
Ok(self.connect_client(client))
202+
self.connect_client(client).await
151203
}
152204

153-
#[must_use]
154-
pub fn connect_client(self, client: RpcClient) -> BlockScanner<RootProvider<N>, N> {
205+
#[allow(clippy::missing_errors_doc)]
206+
pub async fn connect_client(
207+
self,
208+
client: RpcClient,
209+
) -> Result<BlockScanner<RootProvider<N>, N>, BlockScannerError> {
155210
let provider = RootProvider::new(client);
156-
self.connect_provider(provider)
211+
self.connect_provider(provider).await
157212
}
158213

159-
pub fn connect_provider<P>(self, provider: P) -> BlockScanner<P, N>
214+
#[allow(clippy::single_match_else)]
215+
#[allow(clippy::missing_errors_doc)]
216+
#[allow(clippy::missing_panics_doc)]
217+
pub async fn connect_provider<P>(
218+
self,
219+
provider: P,
220+
) -> Result<BlockScanner<P, N>, BlockScannerError>
160221
where
161222
P: Provider<N>,
162223
{
163-
BlockScanner {
224+
if let Some(end_height) = self.end_height {
225+
match (self.start_height, end_height) {
226+
(_, BlockNumberOrTag::Latest) => (),
227+
(_, BlockNumberOrTag::Number(end))
228+
if end == provider.get_block_number().await? => {}
229+
(_, BlockNumberOrTag::Number(end)) if end > provider.get_block_number().await? => {
230+
return Err(BlockScannerError::NonExistentEndHeader(end_height));
231+
}
232+
(BlockNumberOrTag::Number(start), BlockNumberOrTag::Number(end)) => {
233+
if start > end {
234+
return Err(BlockScannerError::EndHeightSmallerThanStartHeight(
235+
self.start_height,
236+
end_height,
237+
));
238+
}
239+
let start_block_number =
240+
provider.get_block_number_by_id(self.start_height.into()).await?;
241+
if start_block_number.is_none() {
242+
return Err(BlockScannerError::NonExistentStartHeader(self.start_height));
243+
}
244+
}
245+
// TODO: handle other cases
246+
_ => {}
247+
}
248+
}
249+
250+
let (start_block, end_height) = match (self.start_height, self.end_height) {
251+
(_, Some(end_height)) => {
252+
let start_block = provider
253+
.get_block_by_number(self.start_height)
254+
.await?
255+
.expect("already checked");
256+
let end_height_number = provider.get_block_number_by_id(end_height.into()).await?;
257+
(start_block, end_height_number)
258+
}
259+
(_, None) => {
260+
let start_block = provider
261+
.get_block_by_number(self.start_height)
262+
.await?
263+
.expect("already checked");
264+
(start_block, None)
265+
}
266+
};
267+
268+
let start_header = start_block.header();
269+
270+
Ok(BlockScanner {
164271
provider,
165-
current: Header::default(),
272+
current: BlockHashAndNumber::from_header::<N>(start_header),
166273
is_end: false,
167274
blocks_read_per_epoch: self.blocks_read_per_epoch,
168-
start_height: self.start_height,
169-
end_height: self.end_height,
275+
start_height: start_header.number(),
276+
end_height,
170277
on_blocks: self.on_blocks,
171278
reorg_rewind_depth: self.reorg_rewind_depth,
172279
retry_interval: self.retry_interval,
173280
block_confirmations: self.block_confirmations,
174281
network: PhantomData,
175-
}
282+
})
283+
}
284+
}
285+
286+
struct BlockHashAndNumber {
287+
hash: BlockHash,
288+
number: BlockNumber,
289+
}
290+
291+
impl BlockHashAndNumber {
292+
fn from_header<N: Network>(header: &N::HeaderResponse) -> Self {
293+
Self { hash: header.hash(), number: header.number() }
176294
}
177295
}
178296

@@ -181,9 +299,9 @@ impl<N: Network> BlockScannerBuilder<N> {
181299
pub struct BlockScanner<P: Provider<N>, N: Network> {
182300
provider: P,
183301
blocks_read_per_epoch: usize,
184-
start_height: BlockNumberOrTag,
185-
end_height: Option<BlockNumberOrTag>,
186-
current: Header,
302+
start_height: BlockNumber,
303+
end_height: Option<BlockNumber>,
304+
current: BlockHashAndNumber,
187305
on_blocks: OnBlocksFunc<N>,
188306
is_end: bool,
189307
reorg_rewind_depth: u64,
@@ -201,18 +319,61 @@ where
201319
&self.provider
202320
}
203321

204-
pub async fn start(&self) -> ReceiverStream<Result<Range<u64>, BlockScannerError>> {
205-
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);
322+
#[allow(clippy::missing_errors_doc)]
323+
pub async fn start(
324+
&mut self,
325+
) -> Result<ReceiverStream<Result<Range<u64>, BlockScannerError>>, StartError> {
326+
let (sender, receiver) =
327+
mpsc::channel::<Result<Range<u64>, BlockScannerError>>(self.blocks_read_per_epoch);
206328

207329
let receiver_stream = ReceiverStream::new(receiver);
208330

209-
future::ready(()).await;
331+
if let Some(end_height) = self.end_height {
332+
self.ensure_current_not_reorged().await?;
333+
334+
sender.send(Ok(self.start_height..end_height)).await?;
335+
sender.send(Err(BlockScannerError::ErrEOF {})).await?;
336+
}
210337

211338
tokio::spawn(
212339
async move { if sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {} },
213340
);
214341

215-
receiver_stream
342+
Ok(receiver_stream)
343+
}
344+
345+
async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockScannerError> {
346+
let current_block = self.provider.get_block_by_hash(self.current.hash).await?;
347+
if current_block.is_some() {
348+
return Ok(());
349+
}
350+
351+
self.rewind_on_reorg_detected().await
352+
}
353+
354+
async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockScannerError> {
355+
let mut new_current_height = self.current.number.saturating_sub(self.reorg_rewind_depth);
356+
357+
let head = self.provider.get_block_number().await?;
358+
if head < new_current_height {
359+
new_current_height = head;
360+
}
361+
362+
let current = self
363+
.provider
364+
.get_block_by_number(new_current_height.into())
365+
.await?
366+
.map(|block| BlockHashAndNumber::from_header::<N>(block.header()))
367+
.expect("block should exist");
368+
369+
println!(
370+
"Rewind on reorg detected\noldCurrent: {}, newCurrent: {}",
371+
self.current.number, current.number
372+
);
373+
374+
self.current = current;
375+
376+
Ok(())
216377
}
217378
}
218379

@@ -279,9 +440,9 @@ mod tests {
279440
let ws = WsConnect::new(anvil.ws_endpoint_url());
280441

281442
let builder = BlockScannerBuilder::<Ethereum>::new();
282-
let scanner = builder.connect_ws(ws).await.expect("failed to connect ws");
443+
let mut scanner = builder.connect_ws(ws).await.expect("failed to connect ws");
283444

284-
let mut stream = scanner.start().await;
445+
let mut stream = scanner.start().await.unwrap();
285446
let first = stream.next().await;
286447
match first {
287448
Some(Err(BlockScannerError::ErrEOF)) => {}

0 commit comments

Comments
 (0)