diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index e1ae14b2..e4bd630f 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -69,11 +69,7 @@ jobs: components: clippy - name: Clippy Check - uses: giraffate/clippy-action@v1 - with: - reporter: "github-pr-check" - github_token: ${{ secrets.GITHUB_TOKEN }} - clippy_flags: --all-targets --all-features -- -D warnings -D clippy::pedantic + run: cargo clippy --all-targets --all-features -- -D warnings -D clippy::pedantic typos-cli: name: typos diff --git a/src/block_scanner.rs b/src/block_scanner.rs new file mode 100644 index 00000000..bba72319 --- /dev/null +++ b/src/block_scanner.rs @@ -0,0 +1,221 @@ +#![allow(unused)] + +use std::{future, marker::PhantomData, time::Duration}; + +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio_stream::wrappers::ReceiverStream; + +use alloy::{ + eips::BlockNumberOrTag, + network::Network, + providers::{Provider, RootProvider}, + rpc::{ + client::{ClientBuilder, RpcClient}, + types::Header, + }, + transports::TransportError, +}; + +// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19 +const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000; +const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12); +const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; +const BACK_OFF_MAX_RETRIES: u64 = 5; + +// TODO: determine check exact default value +const DEFAULT_REORG_REWIND_DEPTH: u64 = 0; + +// State sync aware retry settings +const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30); +const STATE_SYNC_MAX_RETRIES: u64 = 12; + +#[derive(Debug)] +pub enum BlockScannerError { + ErrEOF, + ErrContinue, + TerminalError(u64), +} + +impl std::fmt::Display for BlockScannerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BlockScannerError::ErrEOF => write!(f, "end of block batch iterator"), + BlockScannerError::ErrContinue => write!(f, "continue"), + BlockScannerError::TerminalError(height) => { + write!(f, "terminal error at block height {height}") + } + } + } +} + +type EndIterFunc = fn(); +type UpdateCurrentFunc = fn(Header); +pub type OnBlocksFunc = + fn(::BlockResponse, UpdateCurrentFunc, EndIterFunc) -> anyhow::Result<()>; + +pub struct BlockScannerBuilder { + blocks_read_per_epoch: usize, + start_height: BlockNumberOrTag, + end_height: BlockNumberOrTag, + on_blocks: OnBlocksFunc, + reorg_rewind_depth: u64, + retry_interval: Duration, + block_confirmations: u64, +} + +impl Default for BlockScannerBuilder { + fn default() -> Self { + Self::new() + } +} + +impl BlockScannerBuilder { + #[must_use] + pub fn new() -> Self { + Self { + blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH, + start_height: BlockNumberOrTag::Earliest, + end_height: BlockNumberOrTag::Latest, + on_blocks: |_, _, _| Ok(()), + reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH, + retry_interval: DEFAULT_RETRY_INTERVAL, + block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, + } + } + + #[must_use] + pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self { + self.blocks_read_per_epoch = blocks_read_per_epoch; + self + } + + #[must_use] + pub fn with_start_height(&mut self, start_height: BlockNumberOrTag) -> &mut Self { + self.start_height = start_height; + self + } + + #[must_use] + pub fn with_end_height(&mut self, end_height: BlockNumberOrTag) -> &mut Self { + self.end_height = end_height; + self + } + + #[must_use] + pub fn with_on_blocks(&mut self, on_blocks: OnBlocksFunc) -> &mut Self { + self.on_blocks = on_blocks; + self + } + + #[must_use] + pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { + self.reorg_rewind_depth = reorg_rewind_depth; + self + } + + #[must_use] + pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self { + self.retry_interval = retry_interval; + self + } + + #[must_use] + pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self { + self.block_confirmations = block_confirmations; + self + } + + /// Connects to the provider via WebSocket + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ws( + self, + connect: alloy::transports::ws::WsConnect, + ) -> Result, N>, TransportError> { + let client = ClientBuilder::default().ws(connect).await?; + Ok(self.connect_client(client)) + } + + /// Connects to the provider via IPC + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ipc( + self, + connect: alloy::transports::ipc::IpcConnect, + ) -> Result, N>, TransportError> + where + alloy::transports::ipc::IpcConnect: alloy::pubsub::PubSubConnect, + { + let client = ClientBuilder::default().ipc(connect).await?; + Ok(self.connect_client(client)) + } + + #[must_use] + pub fn connect_client(self, client: RpcClient) -> BlockScanner, N> { + let provider = RootProvider::new(client); + self.connect_provider(provider) + } + + pub fn connect_provider

(self, provider: P) -> BlockScanner + where + P: Provider, + { + let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch); + + BlockScanner { + provider, + sender, + receiver, + current: Header::default(), + is_end: false, + blocks_read_per_epoch: self.blocks_read_per_epoch, + start_height: self.start_height, + end_height: self.end_height, + on_blocks: self.on_blocks, + reorg_rewind_depth: self.reorg_rewind_depth, + retry_interval: self.retry_interval, + block_confirmations: self.block_confirmations, + network: PhantomData, + } + } +} + +// BlockScanner iterates the blocks in batches between the given start and end heights, +// with the awareness of reorganization. +pub struct BlockScanner, N: Network> { + provider: P, + sender: Sender>, + receiver: Receiver>, + blocks_read_per_epoch: usize, + start_height: BlockNumberOrTag, + end_height: BlockNumberOrTag, + current: Header, + on_blocks: OnBlocksFunc, + is_end: bool, + reorg_rewind_depth: u64, + retry_interval: Duration, + block_confirmations: u64, + network: PhantomData N>, +} + +impl BlockScanner +where + P: Provider, + N: Network, +{ + pub async fn start(self) -> ReceiverStream> { + let receiver_stream = ReceiverStream::new(self.receiver); + + future::ready(()).await; + + tokio::spawn(async move { + if self.sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {} + }); + + receiver_stream + } +} diff --git a/src/builder.rs b/src/builder.rs index 973cbd61..11542f3e 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -24,36 +24,47 @@ impl ScannerBuilder { } } + #[must_use] pub fn start_block(mut self, start_block: u64) -> Self { self.start_block = Some(start_block); self } + #[must_use] pub fn end_block(mut self, end_block: u64) -> Self { self.end_block = Some(end_block); self } + #[must_use] pub fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self { self.max_blocks_per_filter = max_blocks; self } + #[must_use] pub fn add_event_filter(mut self, filter: EventFilter) -> Self { self.tracked_events.push(filter); self } + #[must_use] pub fn add_event_filters(mut self, filters: Vec) -> Self { self.tracked_events.extend(filters); self } + #[must_use] pub fn callback_config(mut self, cfg: CallbackConfig) -> Self { self.callback_config = cfg; self } + /// Builds the scanner + /// + /// # Errors + /// + /// Returns an error if the scanner fails to build pub async fn build(self) -> anyhow::Result { Scanner::new( self.rpc_url, diff --git a/src/event_scanner.rs b/src/event_scanner.rs new file mode 100644 index 00000000..0c2d69ed --- /dev/null +++ b/src/event_scanner.rs @@ -0,0 +1,174 @@ +#![allow(unused)] + +use std::{future, time::Duration}; + +use crate::{ + block_scanner::{BlockScanner, BlockScannerBuilder, OnBlocksFunc}, + types::{CallbackConfig, EventFilter}, +}; +use alloy::{ + eips::BlockNumberOrTag, + network::Network, + providers::{Provider, RootProvider}, + rpc::client::RpcClient, + transports::TransportError, +}; + +pub struct EventScannerBuilder { + block_scanner: BlockScannerBuilder, + tracked_events: Vec, + callback_config: CallbackConfig, +} + +impl Default for EventScannerBuilder { + fn default() -> Self { + Self::new() + } +} + +impl EventScannerBuilder { + #[must_use] + pub fn new() -> Self { + Self { + block_scanner: BlockScannerBuilder::new(), + tracked_events: Vec::new(), + callback_config: CallbackConfig::default(), + } + } + + #[must_use] + pub fn with_event_filter(mut self, filter: EventFilter) -> Self { + self.tracked_events.push(filter); + self + } + + #[must_use] + pub fn with_event_filters(mut self, filters: Vec) -> Self { + self.tracked_events.extend(filters); + self + } + + #[must_use] + pub fn with_callback_config(mut self, cfg: CallbackConfig) -> Self { + self.callback_config = cfg; + self + } + + #[must_use] + #[must_use] + pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self { + self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch); + self + } + + #[must_use] + pub fn with_start_height(&mut self, start_height: BlockNumberOrTag) -> &mut Self { + self.block_scanner.with_start_height(start_height); + self + } + + #[must_use] + pub fn with_end_height(&mut self, end_height: BlockNumberOrTag) -> &mut Self { + self.block_scanner.with_end_height(end_height); + self + } + + #[must_use] + pub fn with_on_blocks(&mut self, on_blocks: OnBlocksFunc) -> &mut Self { + self.block_scanner.with_on_blocks(on_blocks); + self + } + + #[must_use] + pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { + self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth); + self + } + + #[must_use] + pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self { + self.block_scanner.with_retry_interval(retry_interval); + self + } + + #[must_use] + pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self { + self.block_scanner.with_block_confirmations(block_confirmations); + self + } + + /// Connects to the provider via WebSocket + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ws( + self, + connect: alloy::transports::ws::WsConnect, + ) -> Result, N>, TransportError> { + let block_scanner = self.block_scanner.connect_ws(connect).await?; + Ok(EventScanner { + block_scanner, + tracked_events: self.tracked_events, + callback_config: self.callback_config, + }) + } + + /// Connects to the provider via IPC + /// + /// # Errors + /// + /// Returns an error if the connection fails + pub async fn connect_ipc( + self, + connect: alloy::transports::ipc::IpcConnect, + ) -> Result, N>, TransportError> + where + alloy::transports::ipc::IpcConnect: alloy::pubsub::PubSubConnect, + { + let block_scanner = self.block_scanner.connect_ipc(connect).await?; + Ok(EventScanner { + block_scanner, + tracked_events: self.tracked_events, + callback_config: self.callback_config, + }) + } + + #[must_use] + pub fn connect_client(self, client: RpcClient) -> EventScanner, N> { + let block_scanner = self.block_scanner.connect_client(client); + EventScanner { + block_scanner, + tracked_events: self.tracked_events, + callback_config: self.callback_config, + } + } + + #[must_use] + pub fn connect_provider(self, provider: RootProvider) -> EventScanner, N> { + let block_scanner = self.block_scanner.connect_provider(provider); + EventScanner { + block_scanner, + tracked_events: self.tracked_events, + callback_config: self.callback_config, + } + } +} + +pub struct EventScanner, N: Network> { + block_scanner: BlockScanner, + tracked_events: Vec, + callback_config: CallbackConfig, +} + +impl, N: Network> EventScanner { + /// Starts the scanner + /// + /// # Errors + /// + /// Returns an error if the scanner fails to start + pub async fn start(&mut self) -> anyhow::Result<()> { + future::ready(()).await; + todo!() + } +} diff --git a/src/lib.rs b/src/lib.rs index 1edd8c13..261bd043 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ +pub mod block_scanner; pub mod builder; pub mod callback; +pub mod event_scanner; pub mod scanner; pub mod types; diff --git a/src/scanner.rs b/src/scanner.rs index a4d8b313..4860a23f 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -29,6 +29,11 @@ pub struct Scanner { } impl Scanner { + /// Creates a new scanner + /// + /// # Errors + /// + /// Returns an error if the scanner fails to start pub async fn new( rpc_url: String, start_block: Option, @@ -70,7 +75,11 @@ impl Scanner { fn detect_provider_type(url: &str) -> anyhow::Result { if url.starts_with("ws://") || url.starts_with("wss://") { Ok(ProviderType::WebSocket) - } else if url.ends_with(".ipc") || url.contains("ipc") { + } else if std::path::Path::new(url) + .extension() + .is_some_and(|ext| ext.eq_ignore_ascii_case("ipc")) || + url.contains("ipc") + { Ok(ProviderType::Ipc) } else { Err(anyhow::anyhow!( @@ -80,29 +89,35 @@ impl Scanner { } } + /// Starts the scanner + /// + /// # Errors + /// + /// Returns an error if the scanner fails to start pub async fn start(&mut self) -> anyhow::Result<()> { match (self.start_block, self.end_block) { (None, Some(end)) => { info!("Scanning from genesis block 0 to {}", end); - self.process_historical_blocks(0, Some(end)).await? + self.process_historical_blocks(0, Some(end)).await?; } (Some(start), Some(end)) => { info!("Scanning from block {} to {}", start, end); - self.process_historical_blocks(start, Some(end)).await? + self.process_historical_blocks(start, Some(end)).await?; } (Some(start), None) => { info!("Scanning from block {} to latest, then switching to live mode", start); self.process_historical_blocks(start, None).await?; - self.subscribe_to_new_blocks().await? + self.subscribe_to_new_blocks().await?; } (None, None) => { info!("Starting in live mode only"); - self.subscribe_to_new_blocks().await? + self.subscribe_to_new_blocks().await?; } } Ok(()) } + #[must_use] pub fn current_head(&self) -> Option { self.current_head } @@ -233,7 +248,7 @@ impl Scanner { let mut last_err: Option = None; for attempt in 1..=attempts { match callback.on_event(log).await { - Ok(_) => return Ok(()), + Ok(()) => return Ok(()), Err(e) => { last_err = Some(e); if attempt < attempts { @@ -243,7 +258,6 @@ impl Scanner { "callback failed; retrying after fixed delay" ); sleep(Duration::from_millis(config.delay_ms)).await; - continue; } } }