Skip to content

Commit e838723

Browse files
0xNeshiLeoPatOZ
andauthored
Block + Event Channel Design Proposal (#12)
Co-authored-by: Leo <[email protected]>
1 parent 737d0ff commit e838723

File tree

6 files changed

+430
-12
lines changed

6 files changed

+430
-12
lines changed

.github/workflows/check.yml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,7 @@ jobs:
6969
components: clippy
7070

7171
- name: Clippy Check
72-
uses: giraffate/clippy-action@v1
73-
with:
74-
reporter: "github-pr-check"
75-
github_token: ${{ secrets.GITHUB_TOKEN }}
76-
clippy_flags: --all-targets --all-features -- -D warnings -D clippy::pedantic
72+
run: cargo clippy --all-targets --all-features -- -D warnings -D clippy::pedantic
7773

7874
typos-cli:
7975
name: typos

src/block_scanner.rs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
#![allow(unused)]
2+
3+
use std::{future, marker::PhantomData, time::Duration};
4+
5+
use tokio::sync::mpsc::{self, Receiver, Sender};
6+
use tokio_stream::wrappers::ReceiverStream;
7+
8+
use alloy::{
9+
eips::BlockNumberOrTag,
10+
network::Network,
11+
providers::{Provider, RootProvider},
12+
rpc::{
13+
client::{ClientBuilder, RpcClient},
14+
types::Header,
15+
},
16+
transports::TransportError,
17+
};
18+
19+
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
20+
const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
21+
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12);
22+
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
23+
const BACK_OFF_MAX_RETRIES: u64 = 5;
24+
25+
// TODO: determine check exact default value
26+
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
27+
28+
// State sync aware retry settings
29+
const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30);
30+
const STATE_SYNC_MAX_RETRIES: u64 = 12;
31+
32+
#[derive(Debug)]
33+
pub enum BlockScannerError {
34+
ErrEOF,
35+
ErrContinue,
36+
TerminalError(u64),
37+
}
38+
39+
impl std::fmt::Display for BlockScannerError {
40+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41+
match self {
42+
BlockScannerError::ErrEOF => write!(f, "end of block batch iterator"),
43+
BlockScannerError::ErrContinue => write!(f, "continue"),
44+
BlockScannerError::TerminalError(height) => {
45+
write!(f, "terminal error at block height {height}")
46+
}
47+
}
48+
}
49+
}
50+
51+
type EndIterFunc = fn();
52+
type UpdateCurrentFunc = fn(Header);
53+
pub type OnBlocksFunc<N> =
54+
fn(<N as Network>::BlockResponse, UpdateCurrentFunc, EndIterFunc) -> anyhow::Result<()>;
55+
56+
pub struct BlockScannerBuilder<N: Network> {
57+
blocks_read_per_epoch: usize,
58+
start_height: BlockNumberOrTag,
59+
end_height: BlockNumberOrTag,
60+
on_blocks: OnBlocksFunc<N>,
61+
reorg_rewind_depth: u64,
62+
retry_interval: Duration,
63+
block_confirmations: u64,
64+
}
65+
66+
impl<N: Network> Default for BlockScannerBuilder<N> {
67+
fn default() -> Self {
68+
Self::new()
69+
}
70+
}
71+
72+
impl<N: Network> BlockScannerBuilder<N> {
73+
#[must_use]
74+
pub fn new() -> Self {
75+
Self {
76+
blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH,
77+
start_height: BlockNumberOrTag::Earliest,
78+
end_height: BlockNumberOrTag::Latest,
79+
on_blocks: |_, _, _| Ok(()),
80+
reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH,
81+
retry_interval: DEFAULT_RETRY_INTERVAL,
82+
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
83+
}
84+
}
85+
86+
#[must_use]
87+
pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self {
88+
self.blocks_read_per_epoch = blocks_read_per_epoch;
89+
self
90+
}
91+
92+
#[must_use]
93+
pub fn with_start_height(&mut self, start_height: BlockNumberOrTag) -> &mut Self {
94+
self.start_height = start_height;
95+
self
96+
}
97+
98+
#[must_use]
99+
pub fn with_end_height(&mut self, end_height: BlockNumberOrTag) -> &mut Self {
100+
self.end_height = end_height;
101+
self
102+
}
103+
104+
#[must_use]
105+
pub fn with_on_blocks(&mut self, on_blocks: OnBlocksFunc<N>) -> &mut Self {
106+
self.on_blocks = on_blocks;
107+
self
108+
}
109+
110+
#[must_use]
111+
pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self {
112+
self.reorg_rewind_depth = reorg_rewind_depth;
113+
self
114+
}
115+
116+
#[must_use]
117+
pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self {
118+
self.retry_interval = retry_interval;
119+
self
120+
}
121+
122+
#[must_use]
123+
pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self {
124+
self.block_confirmations = block_confirmations;
125+
self
126+
}
127+
128+
/// Connects to the provider via WebSocket
129+
///
130+
/// # Errors
131+
///
132+
/// Returns an error if the connection fails
133+
pub async fn connect_ws(
134+
self,
135+
connect: alloy::transports::ws::WsConnect,
136+
) -> Result<BlockScanner<RootProvider<N>, N>, TransportError> {
137+
let client = ClientBuilder::default().ws(connect).await?;
138+
Ok(self.connect_client(client))
139+
}
140+
141+
/// Connects to the provider via IPC
142+
///
143+
/// # Errors
144+
///
145+
/// Returns an error if the connection fails
146+
pub async fn connect_ipc<T>(
147+
self,
148+
connect: alloy::transports::ipc::IpcConnect<T>,
149+
) -> Result<BlockScanner<RootProvider<N>, N>, TransportError>
150+
where
151+
alloy::transports::ipc::IpcConnect<T>: alloy::pubsub::PubSubConnect,
152+
{
153+
let client = ClientBuilder::default().ipc(connect).await?;
154+
Ok(self.connect_client(client))
155+
}
156+
157+
#[must_use]
158+
pub fn connect_client(self, client: RpcClient) -> BlockScanner<RootProvider<N>, N> {
159+
let provider = RootProvider::new(client);
160+
self.connect_provider(provider)
161+
}
162+
163+
pub fn connect_provider<P>(self, provider: P) -> BlockScanner<P, N>
164+
where
165+
P: Provider<N>,
166+
{
167+
let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch);
168+
169+
BlockScanner {
170+
provider,
171+
sender,
172+
receiver,
173+
current: Header::default(),
174+
is_end: false,
175+
blocks_read_per_epoch: self.blocks_read_per_epoch,
176+
start_height: self.start_height,
177+
end_height: self.end_height,
178+
on_blocks: self.on_blocks,
179+
reorg_rewind_depth: self.reorg_rewind_depth,
180+
retry_interval: self.retry_interval,
181+
block_confirmations: self.block_confirmations,
182+
network: PhantomData,
183+
}
184+
}
185+
}
186+
187+
// BlockScanner iterates the blocks in batches between the given start and end heights,
188+
// with the awareness of reorganization.
189+
pub struct BlockScanner<P: Provider<N>, N: Network> {
190+
provider: P,
191+
sender: Sender<Result<N::BlockResponse, BlockScannerError>>,
192+
receiver: Receiver<Result<N::BlockResponse, BlockScannerError>>,
193+
blocks_read_per_epoch: usize,
194+
start_height: BlockNumberOrTag,
195+
end_height: BlockNumberOrTag,
196+
current: Header,
197+
on_blocks: OnBlocksFunc<N>,
198+
is_end: bool,
199+
reorg_rewind_depth: u64,
200+
retry_interval: Duration,
201+
block_confirmations: u64,
202+
network: PhantomData<fn() -> N>,
203+
}
204+
205+
impl<P, N> BlockScanner<P, N>
206+
where
207+
P: Provider<N>,
208+
N: Network,
209+
{
210+
pub async fn start(self) -> ReceiverStream<Result<N::BlockResponse, BlockScannerError>> {
211+
let receiver_stream = ReceiverStream::new(self.receiver);
212+
213+
future::ready(()).await;
214+
215+
tokio::spawn(async move {
216+
if self.sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {}
217+
});
218+
219+
receiver_stream
220+
}
221+
}

src/builder.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,47 @@ impl ScannerBuilder {
2424
}
2525
}
2626

27+
#[must_use]
2728
pub fn start_block(mut self, start_block: u64) -> Self {
2829
self.start_block = Some(start_block);
2930
self
3031
}
3132

33+
#[must_use]
3234
pub fn end_block(mut self, end_block: u64) -> Self {
3335
self.end_block = Some(end_block);
3436
self
3537
}
3638

39+
#[must_use]
3740
pub fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self {
3841
self.max_blocks_per_filter = max_blocks;
3942
self
4043
}
4144

45+
#[must_use]
4246
pub fn add_event_filter(mut self, filter: EventFilter) -> Self {
4347
self.tracked_events.push(filter);
4448
self
4549
}
4650

51+
#[must_use]
4752
pub fn add_event_filters(mut self, filters: Vec<EventFilter>) -> Self {
4853
self.tracked_events.extend(filters);
4954
self
5055
}
5156

57+
#[must_use]
5258
pub fn callback_config(mut self, cfg: CallbackConfig) -> Self {
5359
self.callback_config = cfg;
5460
self
5561
}
5662

63+
/// Builds the scanner
64+
///
65+
/// # Errors
66+
///
67+
/// Returns an error if the scanner fails to build
5768
pub async fn build(self) -> anyhow::Result<Scanner> {
5869
Scanner::new(
5970
self.rpc_url,

0 commit comments

Comments
 (0)