Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0bbe07f
init
0xNeshi Sep 15, 2025
6bc1c22
make compile
0xNeshi Sep 15, 2025
a3c75f1
add reorg as in Go
0xNeshi Sep 15, 2025
dd1456e
add docs
0xNeshi Sep 15, 2025
a4e70d8
fmt
0xNeshi Sep 15, 2025
8a18762
fix process_buffered_messages
0xNeshi Sep 15, 2025
1936624
add test
0xNeshi Sep 15, 2025
1a8aef8
fix test
0xNeshi Sep 16, 2025
6b5c6ed
remove rand
0xNeshi Sep 16, 2025
0a03d4e
spin service in bg thread on BlockScanner run
0xNeshi Sep 16, 2025
129fb55
clippy
0xNeshi Sep 16, 2025
267db76
add back builder pattern
0xNeshi Sep 16, 2025
aa65164
rename
0xNeshi Sep 16, 2025
9c85ec6
expose provider util
0xNeshi Sep 16, 2025
8071cf8
fmt
0xNeshi Sep 16, 2025
263da65
clippy
0xNeshi Sep 16, 2025
7e96cdf
fmt
0xNeshi Sep 16, 2025
9405de6
clippy
0xNeshi Sep 16, 2025
79d5b12
fix test
0xNeshi Sep 16, 2025
2ff0d6b
fmt
0xNeshi Sep 16, 2025
080f154
support ipc
0xNeshi Sep 16, 2025
5faf4f9
fmt
0xNeshi Sep 16, 2025
8b9b004
remove redundant code from test
0xNeshi Sep 16, 2025
313680b
pass provider by ref to connect_websocket
0xNeshi Sep 16, 2025
82b4b08
fix doc code
0xNeshi Sep 16, 2025
fb3f88a
fmt
0xNeshi Sep 16, 2025
c69a703
Update src/block_scanner_new.rs
0xNeshi Sep 16, 2025
17aece1
merge
0xNeshi Sep 17, 2025
9532d4c
rename SubscriptionError->BlockScannerError
0xNeshi Sep 17, 2025
7ccc629
remove buffer_size param from subscribe
0xNeshi Sep 17, 2025
f94bf48
update event scanner to new block scanner
0xNeshi Sep 17, 2025
eda25f2
rename connect_websocket->get_block_subscription
0xNeshi Sep 17, 2025
c9cf5f8
ignore local vscode settings
0xNeshi Sep 17, 2025
9993810
fmt
0xNeshi Sep 17, 2025
ef9a977
store provider instead of connection
0xNeshi Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target
/examples/**/target
.DS_Store
.vscode
6 changes: 1 addition & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
[workspace]
members = [
".",
"examples/historical_scanning",
"examples/simple_counter",
]
members = [".", "examples/historical_scanning", "examples/simple_counter"]
resolver = "2"

[lints.clippy]
Expand Down
54 changes: 27 additions & 27 deletions src/block_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#![allow(unused)]

use std::{future, marker::PhantomData, ops::Range, time::Duration};
use std::{marker::PhantomData, ops::Range, time::Duration};

use tokio::sync::mpsc::{self, Receiver, Sender, error::SendError};
use tokio::sync::mpsc::{self, error::SendError};
use tokio_stream::wrappers::ReceiverStream;

use alloy::{
consensus::BlockHeader,
eips::{BlockId, BlockNumberOrTag, RpcBlockHash},
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::{BlockHash, BlockNumber},
providers::{Provider, RootProvider},
Expand All @@ -16,14 +16,17 @@ use alloy::{
client::{ClientBuilder, RpcClient},
types::Header,
},
transports::{RpcError, TransportError, TransportErrorKind, ipc::IpcConnect, ws::WsConnect},
transports::{RpcError, TransportErrorKind, ipc::IpcConnect, ws::WsConnect},
};

// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12);
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;

const CHANNEL_BUFFER_SIZE: usize = 10000;
const MAX_BUFFERED_MESSAGES: usize = 50000;

// TODO: determine check exact default value
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;

Expand Down Expand Up @@ -199,7 +202,7 @@ impl<N: Network> BlockScannerBuilder<N> {
self.connect_client(client).await
}

#[must_use]
#[allow(clippy::missing_errors_doc)]
pub async fn connect_client(
self,
client: RpcClient,
Expand All @@ -208,6 +211,9 @@ impl<N: Network> BlockScannerBuilder<N> {
self.connect_provider(provider).await
}

#[allow(clippy::single_match_else)]
#[allow(clippy::missing_errors_doc)]
#[allow(clippy::missing_panics_doc)]
pub async fn connect_provider<P>(
self,
provider: P,
Expand All @@ -219,10 +225,7 @@ impl<N: Network> BlockScannerBuilder<N> {
match (self.start_height, end_height) {
(_, BlockNumberOrTag::Latest) => (),
(_, BlockNumberOrTag::Number(end))
if end == provider.get_block_number().await? =>
{
()
}
if end == provider.get_block_number().await? => {}
(_, BlockNumberOrTag::Number(end)) if end > provider.get_block_number().await? => {
return Err(BlockScannerError::NonExistentEndHeader(end_height));
}
Expand All @@ -241,7 +244,7 @@ impl<N: Network> BlockScannerBuilder<N> {
}
// TODO: handle other cases
_ => {}
};
}
}

let (start_block, end_height) = match (self.start_height, self.end_height) {
Expand Down Expand Up @@ -286,8 +289,8 @@ struct BlockHashAndNumber {
}

impl BlockHashAndNumber {
fn from_header<N: Network>(block: &N::HeaderResponse) -> Self {
Self { hash: block.hash(), number: block.number() }
fn from_header<N: Network>(header: &N::HeaderResponse) -> Self {
Self { hash: header.hash(), number: header.number() }
}
}

Expand Down Expand Up @@ -316,19 +319,20 @@ where
&self.provider
}

pub async fn start(&self) -> ReceiverStream<Result<Range<u64>, BlockScannerError>> {
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);
#[allow(clippy::missing_errors_doc)]
pub async fn start(
&mut self,
) -> Result<ReceiverStream<Result<Range<u64>, BlockScannerError>>, StartError> {
let (sender, receiver) =
mpsc::channel::<Result<Range<u64>, BlockScannerError>>(self.blocks_read_per_epoch);

let receiver_stream = ReceiverStream::new(receiver);

match (self.start_height, self.end_height) {
(_, Some(end_height)) => {
self.ensure_current_not_reorged().await?;
if let Some(end_height) = self.end_height {
self.ensure_current_not_reorged().await?;

sender.send(Ok(self.start_height..end_height)).await?;
sender.send(Err(BlockScannerError::ErrEOF {})).await?;
}
_ => {}
sender.send(Ok(self.start_height..end_height)).await?;
sender.send(Err(BlockScannerError::ErrEOF {})).await?;
}

tokio::spawn(
Expand All @@ -348,11 +352,7 @@ where
}

async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockScannerError> {
let mut new_current_height = if self.current.number <= self.reorg_rewind_depth {
0
} else {
self.current.number - self.reorg_rewind_depth
};
let mut new_current_height = self.current.number.saturating_sub(self.reorg_rewind_depth);

let head = self.provider.get_block_number().await?;
if head < new_current_height {
Expand All @@ -363,7 +363,7 @@ where
.provider
.get_block_by_number(new_current_height.into())
.await?
.map(|block| BlockHashAndNumber::from_header::<N>(&block.header()))
.map(|block| BlockHashAndNumber::from_header::<N>(block.header()))
.expect("block should exist");

println!(
Expand Down
Loading