diff --git a/.gitignore b/.gitignore index 34f380f9..2ce8acbf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /examples/**/target .DS_Store +.vscode diff --git a/Cargo.toml b/Cargo.toml index 4f3bfb00..a0754536 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,5 @@ [workspace] -members = [ - ".", - "examples/historical_scanning", - "examples/simple_counter", -] +members = [".", "examples/historical_scanning", "examples/simple_counter"] resolver = "2" [lints.clippy] diff --git a/examples/historical_scanning/main.rs b/examples/historical_scanning/main.rs index db9870b4..630fe1dc 100644 --- a/examples/historical_scanning/main.rs +++ b/examples/historical_scanning/main.rs @@ -1,9 +1,12 @@ use std::{sync::Arc, time::Duration}; -use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent}; +use alloy::{ + eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol, + sol_types::SolEvent, +}; use alloy_node_bindings::Anvil; use async_trait::async_trait; -use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder}; +use event_scanner::{EventCallback, EventFilter, event_scanner::EventScannerBuilder}; use tokio::time::sleep; use tracing::info; @@ -64,15 +67,14 @@ async fn main() -> anyhow::Result<()> { let _ = counter_contract.increase().send().await?.get_receipt().await?; - let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url()) - .add_event_filter(increase_filter) - .callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 }) - .start_block(0) - .build() - .await?; + let mut builder = EventScannerBuilder::new(); + + builder.with_event_filter(increase_filter); + + let mut scanner = builder.connect_ws::(anvil.ws_endpoint_url()).await?; sleep(Duration::from_secs(10)).await; - scanner.start().await.expect("failed to start scanner"); + scanner.start(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner"); Ok(()) } diff --git a/examples/simple_counter/main.rs b/examples/simple_counter/main.rs index 19534edc..b0472234 100644 --- a/examples/simple_counter/main.rs +++ b/examples/simple_counter/main.rs @@ -1,9 +1,12 @@ use std::{sync::Arc, time::Duration}; -use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent}; +use alloy::{ + eips::BlockNumberOrTag, network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol, + sol_types::SolEvent, +}; use alloy_node_bindings::Anvil; use async_trait::async_trait; -use event_scanner::{EventCallback, EventFilter, FixedRetryConfig, ScannerBuilder}; +use event_scanner::{EventCallback, EventFilter, event_scanner::EventScannerBuilder}; use tokio::time::sleep; use tracing::info; @@ -62,14 +65,15 @@ async fn main() -> anyhow::Result<()> { callback: Arc::new(CounterCallback), }; - let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url()) - .add_event_filter(increase_filter) - .callback_config(FixedRetryConfig { max_attempts: 3, delay_ms: 200 }) - .build() - .await?; + let mut builder = EventScannerBuilder::new(); + + builder.with_event_filter(increase_filter); + + let scanner = builder.connect_ws::(anvil.ws_endpoint_url()).await?; let task_1 = tokio::spawn(async move { - scanner.start().await.expect("failed to start scanner"); + let mut scanner = scanner; + scanner.start(BlockNumberOrTag::Latest, None).await.expect("failed to start scanner"); }); let task_2 = tokio::spawn(async move { diff --git a/src/block_scanner.rs b/src/block_scanner.rs index b1c9953b..77e606d7 100644 --- a/src/block_scanner.rs +++ b/src/block_scanner.rs @@ -1,78 +1,210 @@ -#![allow(unused)] - -use std::{future, marker::PhantomData, ops::Range, time::Duration}; - -use tokio::sync::mpsc::{self, Receiver, Sender}; +//! Example usage: +//! +//! ```rust,no_run +//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber}; +//! use event_scanner::block_scanner::BlockScannerError; +//! use std::ops::Range; +//! use tokio_stream::{StreamExt, wrappers::ReceiverStream}; +//! +//! use alloy::transports::http::reqwest::Url; +//! use event_scanner::block_scanner::{BlockScanner, BlockScannerClient}; +//! use tokio::time::Duration; +//! use tracing::{error, info}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! // Initialize logging +//! tracing_subscriber::fmt::init(); +//! +//! // Configuration +//! let block_scanner = BlockScanner::new() +//! .with_blocks_read_per_epoch(1000) +//! .with_reorg_rewind_depth(5) +//! .with_retry_interval(Duration::from_secs(12)) +//! .with_block_confirmations(5) +//! .connect_ws::(Url::parse("ws://localhost:8546").unwrap()) +//! .await?; +//! +//! // Create client to send subscribe command to block scanner +//! let subscription_client: BlockScannerClient = block_scanner.run()?; +//! +//! let mut receiver: ReceiverStream, BlockScannerError>> = +//! subscription_client +//! .subscribe( +//! BlockNumberOrTag::Latest, +//! None, // just subscribe to new blocks +//! ) +//! .await?; +//! +//! while let Some(result) = receiver.next().await { +//! match result { +//! Ok(range) => { +//! // process range +//! } +//! Err(e) => { +//! error!("Received error from subscription: {e}"); +//! +//! // Decide whether to continue or break based on error type +//! match e { +//! BlockScannerError::ServiceShutdown => break, +//! BlockScannerError::WebSocketConnectionFailed(_) => { +//! // Maybe implement backoff and retry logic here +//! error!( +//! "WebSocket connection failed, continuing to listen for reconnection" +//! ); +//! } +//! _ => { +//! // Continue processing for other errors +//! error!("Non-fatal error, continuing: {e}"); +//! } +//! } +//! } +//! } +//! } +//! +//! info!("Data processing stopped."); +//! +//! Ok(()) +//! } +//! ``` + +use std::{ops::Range, time::Duration}; + +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use alloy::{ - eips::BlockNumberOrTag, - network::Network, + consensus::BlockHeader, + eips::{BlockId, BlockNumberOrTag}, + network::{BlockResponse, Network, primitives::HeaderResponse}, + primitives::{BlockHash, BlockNumber}, providers::{Provider, RootProvider}, - pubsub::PubSubConnect, - rpc::{ - client::{ClientBuilder, RpcClient}, - types::Header, + pubsub::Subscription, + rpc::client::ClientBuilder, + transports::{ + TransportResult, + http::reqwest::{self, Url}, + ws::WsConnect, }, - transports::{TransportError, ipc::IpcConnect, ws::WsConnect}, }; +use thiserror::Error; +use tracing::{debug, error, info, warn}; // copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19 const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000; const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12); const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; +// const BACK_OFF_MAX_RETRIES: u64 = 5; + +const MAX_BUFFERED_MESSAGES: usize = 50000; // TODO: determine check exact default value const DEFAULT_REORG_REWIND_DEPTH: u64 = 0; -#[derive(Debug)] +// // State sync aware retry settings +// const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(30); +// const STATE_SYNC_MAX_RETRIES: u64 = 12; + +#[derive(Error, Debug)] pub enum BlockScannerError { - ErrEOF, - ErrContinue, - TerminalError(u64), + #[error("HTTP request failed: {0}")] + HttpError(#[from] reqwest::Error), + + // #[error("WebSocket error: {0}")] + // WebSocketError(#[from] tokio_tungstenite::tungstenite::Error), + #[error("Serialization error: {0}")] + SerializationError(#[from] serde_json::Error), + + #[error("RPC error: {0}")] + RpcError(#[from] alloy::transports::RpcError), + + #[error("Channel send error")] + ChannelError, + + #[error("Service is shutting down")] + ServiceShutdown, + + #[error("Only one subscriber allowed at a time")] + MultipleSubscribers, + + #[error("Historical sync failed: {0}")] + HistoricalSyncError(String), + + #[error("WebSocket connection failed after {0} attempts")] + WebSocketConnectionFailed(usize), + + #[error("End of block batch")] + Eof, } -impl std::fmt::Display for BlockScannerError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - BlockScannerError::ErrEOF => write!(f, "end of block batch iterator"), - BlockScannerError::ErrContinue => write!(f, "continue"), - BlockScannerError::TerminalError(height) => { - write!(f, "terminal error at block height {height}") - } - } +#[derive(Debug)] +pub enum Command { + Subscribe { + sender: mpsc::Sender, BlockScannerError>>, + start_height: BlockNumberOrTag, + end_height: Option, + response: oneshot::Sender>, + }, + Unsubscribe { + response: oneshot::Sender>, + }, + GetStatus { + response: oneshot::Sender, + }, + Shutdown { + response: oneshot::Sender>, + }, +} + +#[derive(Debug, Clone)] +pub struct ServiceStatus { + pub is_subscribed: bool, + pub last_synced_block: Option, + pub websocket_connected: bool, + pub processed_count: u64, + pub error_count: u64, +} + +#[derive(Debug, Clone)] +pub struct BlockHashAndNumber { + hash: BlockHash, + number: BlockNumber, +} + +impl BlockHashAndNumber { + fn from_header(header: &N::HeaderResponse) -> Self { + Self { hash: header.hash(), number: header.number() } } } -type EndIterFunc = fn(); -type UpdateCurrentFunc = fn(Header); -pub type OnBlocksFunc = - fn(::BlockResponse, UpdateCurrentFunc, EndIterFunc) -> anyhow::Result<()>; +#[derive(Clone)] +struct Config { + blocks_read_per_epoch: usize, + reorg_rewind_depth: u64, + #[allow(dead_code, reason = "TODO: will be used in smart retry mechanism")] + retry_interval: Duration, + #[allow(dead_code, reason = "TODO: will be used in reorg mechanism")] + block_confirmations: u64, +} -pub struct BlockScannerBuilder { +pub struct BlockScanner { blocks_read_per_epoch: usize, - start_height: BlockNumberOrTag, - end_height: Option, - on_blocks: OnBlocksFunc, reorg_rewind_depth: u64, retry_interval: Duration, block_confirmations: u64, } -impl Default for BlockScannerBuilder { +impl Default for BlockScanner { fn default() -> Self { Self::new() } } -impl BlockScannerBuilder { +impl BlockScanner { #[must_use] pub fn new() -> Self { Self { blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH, - start_height: BlockNumberOrTag::Latest, - end_height: None, - on_blocks: |_, _, _| Ok(()), reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH, retry_interval: DEFAULT_RETRY_INTERVAL, block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, @@ -85,24 +217,6 @@ impl BlockScannerBuilder { self } - #[must_use] - pub fn with_start_height(&mut self, start_height: BlockNumberOrTag) -> &mut Self { - self.start_height = start_height; - self - } - - #[must_use] - pub fn with_end_height(&mut self, end_height: BlockNumberOrTag) -> &mut Self { - self.end_height = Some(end_height); - self - } - - #[must_use] - pub fn with_on_blocks(&mut self, on_blocks: OnBlocksFunc) -> &mut Self { - self.on_blocks = on_blocks; - self - } - #[must_use] pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { self.reorg_rewind_depth = reorg_rewind_depth; @@ -126,12 +240,21 @@ impl BlockScannerBuilder { /// # Errors /// /// Returns an error if the connection fails - pub async fn connect_ws( - self, - connect: WsConnect, - ) -> Result, N>, TransportError> { - let client = ClientBuilder::default().ws(connect).await?; - Ok(self.connect_client(client)) + pub async fn connect_ws( + &self, + ws_url: Url, + ) -> TransportResult> { + let provider = + RootProvider::::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?); + Ok(ConnectedBlockScanner { + provider, + config: Config { + blocks_read_per_epoch: self.blocks_read_per_epoch, + reorg_rewind_depth: self.reorg_rewind_depth, + retry_interval: self.retry_interval, + block_confirmations: self.block_confirmations, + }, + }) } /// Connects to the provider via IPC @@ -139,153 +262,556 @@ impl BlockScannerBuilder { /// # Errors /// /// Returns an error if the connection fails - pub async fn connect_ipc( - self, - connect: IpcConnect, - ) -> Result, N>, TransportError> - where - IpcConnect: PubSubConnect, - { - let client = ClientBuilder::default().ipc(connect).await?; - Ok(self.connect_client(client)) + pub async fn connect_ipc( + &self, + ipc_path: String, + ) -> TransportResult> { + let provider = RootProvider::::new(ClientBuilder::default().ipc(ipc_path.into()).await?); + Ok(ConnectedBlockScanner { + provider, + config: Config { + blocks_read_per_epoch: self.blocks_read_per_epoch, + reorg_rewind_depth: self.reorg_rewind_depth, + retry_interval: self.retry_interval, + block_confirmations: self.block_confirmations, + }, + }) } +} +pub struct ConnectedBlockScanner { + provider: RootProvider, + config: Config, +} + +impl ConnectedBlockScanner { + /// Returns the underlying Provider. #[must_use] - pub fn connect_client(self, client: RpcClient) -> BlockScanner, N> { - let provider = RootProvider::new(client); - self.connect_provider(provider) + pub fn provider(&self) -> &impl Provider { + &self.provider } - pub fn connect_provider

(self, provider: P) -> BlockScanner - where - P: Provider, - { - BlockScanner { - provider, - current: Header::default(), - is_end: false, - blocks_read_per_epoch: self.blocks_read_per_epoch, - start_height: self.start_height, - end_height: self.end_height, - on_blocks: self.on_blocks, - reorg_rewind_depth: self.reorg_rewind_depth, - retry_interval: self.retry_interval, - block_confirmations: self.block_confirmations, - network: PhantomData, - } + // TODO: use wrapper errors + /// Starts the subscription service and returns a client for sending commands. + /// + /// # Errors + /// + /// Returns an error if the subscription service fails to start. + pub fn run(&self) -> anyhow::Result { + let (service, cmd_tx) = + BlockScannerService::new(self.config.clone(), self.provider.clone()); + tokio::spawn(async move { + service.run().await; + }); + Ok(BlockScannerClient::new(cmd_tx)) } } -// BlockScanner iterates the blocks in batches between the given start and end heights, -// with the awareness of reorganization. -pub struct BlockScanner, N: Network> { - provider: P, - blocks_read_per_epoch: usize, - start_height: BlockNumberOrTag, - end_height: Option, - current: Header, - on_blocks: OnBlocksFunc, - is_end: bool, - reorg_rewind_depth: u64, - retry_interval: Duration, - block_confirmations: u64, - network: PhantomData N>, +struct BlockScannerService { + config: Config, + provider: RootProvider, + subscriber: Option, BlockScannerError>>>, + current: Option, + websocket_connected: bool, + processed_count: u64, + error_count: u64, + command_receiver: mpsc::Receiver, + shutdown: bool, } -impl BlockScanner -where - P: Provider, - N: Network, -{ - pub fn provider(&self) -> &P { - &self.provider +impl BlockScannerService { + pub fn new(config: Config, provider: RootProvider) -> (Self, mpsc::Sender) { + let (cmd_tx, cmd_rx) = mpsc::channel(100); + + let service = Self { + config, + provider, + subscriber: None, + current: None, + websocket_connected: false, + processed_count: 0, + error_count: 0, + command_receiver: cmd_rx, + shutdown: false, + }; + + (service, cmd_tx) } - pub async fn start(&self) -> ReceiverStream, BlockScannerError>> { - let (sender, receiver) = mpsc::channel(self.blocks_read_per_epoch); + pub async fn run(mut self) { + info!("Starting subscription service"); + + while !self.shutdown { + tokio::select! { + cmd = self.command_receiver.recv() => { + if let Some(command) = cmd { + if let Err(e) = self.handle_command(command).await { + error!("Command handling error: {}", e); + self.error_count += 1; + } + } else { + info!("Command channel closed, shutting down"); + break; + } + } + } + } - let receiver_stream = ReceiverStream::new(receiver); + info!("Subscription service stopped"); + } - future::ready(()).await; + async fn handle_command(&mut self, command: Command) -> Result<(), BlockScannerError> { + match command { + Command::Subscribe { sender, start_height, end_height, response } => { + let result = self.handle_subscribe(sender, start_height, end_height).await; + let _ = response.send(result); + } + Command::Unsubscribe { response } => { + self.handle_unsubscribe(); + let _ = response.send(Ok(())); + } + Command::GetStatus { response } => { + let status = self.get_status(); + let _ = response.send(status); + } + Command::Shutdown { response } => { + self.shutdown = true; + self.handle_unsubscribe(); + let _ = response.send(Ok(())); + } + } + Ok(()) + } + + async fn handle_subscribe( + &mut self, + sender: mpsc::Sender, BlockScannerError>>, + start_height: BlockNumberOrTag, + end_height: Option, + ) -> Result<(), BlockScannerError> { + if self.subscriber.is_some() { + return Err(BlockScannerError::MultipleSubscribers); + } + + // TODO: update local state relate to reorg and validate data + + info!("Starting subscription from point: {start_height:?}"); + self.subscriber = Some(sender); + + self.sync_with_transition(start_height, end_height).await?; - tokio::spawn( - async move { if sender.send(Err(BlockScannerError::ErrEOF {})).await.is_err() {} }, + Ok(()) + } + + async fn sync_with_transition( + &mut self, + start_height: BlockNumberOrTag, + end_height: Option, + ) -> Result<(), BlockScannerError> { + // Step 1: Establish WebSocket connection + let (buffer_tx, buffer_rx) = mpsc::channel(MAX_BUFFERED_MESSAGES); + + // Step 2: Perform historical sync + let (start_block, sync_end_block) = if let Some(end_height) = end_height { + let start_block = + self.provider.get_block_by_number(start_height).await?.expect("already checked"); + let end_block = self + .provider + .get_block(end_height.into()) + .await? + .expect("TODO: check if really valid"); + (start_block, end_block) + } else { + let start_block = + self.provider.get_block_by_number(start_height).await?.expect("already checked"); + let end_block = self + .provider + .get_block(BlockId::Number(BlockNumberOrTag::Latest)) + .await? + .expect("TODO: check if really valid"); + (start_block, end_block) + }; + + info!( + "Syncing historical data from {} to {}", + start_block.header().number(), + sync_end_block.header().number() ); - receiver_stream + // start buffering the subscription data + let provider = self.provider.clone(); + let cutoff = sync_end_block.header().number(); + let ws_task = tokio::spawn(async move { + if end_height.is_none() { + Self::websocket_buffer_task(cutoff, provider, buffer_tx).await; + } + }); + + // TODO: invoke with smart retry mechanism with backoff + if let Err(e) = self.sync_historical_data(start_block, sync_end_block).await { + warn!("aborting ws_task"); + ws_task.abort(); + return Err(BlockScannerError::HistoricalSyncError(e.to_string())); + } + + // Step 3: Process buffered WebSocket messages + let sender = self.subscriber.clone().expect("subscriber should be set"); + tokio::spawn(async move { + if end_height.is_none() { + Self::process_buffered_messages(buffer_rx, sender, cutoff).await; + } else if sender.send(Err(BlockScannerError::Eof)).await.is_err() { + warn!("Subscriber channel closed, cleaning up"); + } + }); + + if end_height.is_none() { + info!("Successfully transitioned from historical to live data"); + } else { + info!("Successfully synced historical data"); + } + + Ok(()) } -} -#[cfg(test)] -mod tests { - use super::*; - use alloy::network::{Ethereum, Network}; - use alloy_node_bindings::Anvil; - use tokio_stream::StreamExt; + async fn sync_historical_data( + &mut self, + start: N::BlockResponse, + end: N::BlockResponse, + ) -> Result<(), BlockScannerError> { + let mut batch_count = 0; + + self.current = Some(BlockHashAndNumber::from_header::(start.header())); + + while self.current.as_ref().unwrap().number < end.header().number() { + self.ensure_current_not_reorged().await?; + + let batch_to = if self.current.as_ref().unwrap().number + + self.config.blocks_read_per_epoch as u64 > + end.header().number() + { + end.header().number() + } else { + self.current.as_ref().unwrap().number + self.config.blocks_read_per_epoch as u64 + }; + + let batch_end_block = self + .provider + .get_block_by_number(batch_to.into()) + .await? + .expect("TODO: check if really valid"); + + self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..batch_to)).await; - #[allow(clippy::unnecessary_wraps)] - fn no_op_on_blocks( - _block: ::BlockResponse, - _update_current: UpdateCurrentFunc, - _end_iter: EndIterFunc, - ) -> anyhow::Result<()> { + self.current = Some(BlockHashAndNumber::from_header::(batch_end_block.header())); + + batch_count += 1; + if batch_count % 10 == 0 { + debug!("Processed {batch_count} historical batches"); + } + } + + info!("Historical sync completed: {batch_count} batches processed"); Ok(()) } - #[test] - fn test_block_scanner_error_display() { - assert_eq!(format!("{}", BlockScannerError::ErrEOF), "end of block batch iterator"); - assert_eq!(format!("{}", BlockScannerError::ErrContinue), "continue"); - assert_eq!( - format!("{}", BlockScannerError::TerminalError(42)), - "terminal error at block height 42" + async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockScannerError> { + let current_block = + self.provider.get_block_by_hash(self.current.as_ref().unwrap().hash).await?; + if current_block.is_some() { + return Ok(()); + } + + self.rewind_on_reorg_detected().await + } + + async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockScannerError> { + let mut new_current_height = + if self.current.as_ref().unwrap().number <= self.config.reorg_rewind_depth { + 0 + } else { + self.current.as_ref().unwrap().number - self.config.reorg_rewind_depth + }; + + let head = self.provider.get_block_number().await?; + if head < new_current_height { + new_current_height = head; + } + + let current = self + .provider + .get_block_by_number(new_current_height.into()) + .await? + .map(|block| BlockHashAndNumber::from_header::(block.header())) + .expect("block should exist"); + + info!( + "Rewind on reorg detected\noldCurrent: {}, newCurrent: {}", + self.current.as_ref().unwrap().number, + current.number ); + + self.current = Some(current); + + Ok(()) + } + + async fn websocket_buffer_task>( + mut current: BlockNumber, + provider: P, + buffer_sender: mpsc::Sender>, + ) { + // TODO: consider passing errors to the caller for handling + + // TODO: use smart retry mechanism + match Self::get_block_subscription(&provider).await { + Ok(mut ws_stream) => { + info!("WebSocket connected for buffering"); + + // TODO: if latest != ws_stream.next(), then return latest.number and empty the + // ws_stream backlog + while let Ok(header_resp) = ws_stream.recv().await { + info!("Received block header: {}", header_resp.number()); + // TODO: handle reorgs + if current == header_resp.number() { + continue; + } + + // we add 1 to include the latest block + #[allow(clippy::range_plus_one)] + if let Err(e) = buffer_sender.send(current..header_resp.number() + 1).await { + error!("Buffer channel closed, stopping buffer task: {e}"); + + return; + } + + // next block will be processed in the next batch + current = header_resp.number() + 1; + } + } + Err(e) => { + error!("Failed to connect WebSocket for buffering: {e}"); + } + } } - #[test] - fn test_builder_defaults() { - let builder = BlockScannerBuilder::::new(); - assert_eq!(builder.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH); - assert!(matches!(builder.start_height, BlockNumberOrTag::Latest)); - assert!(builder.end_height.is_none()); - assert_eq!(builder.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH); - assert_eq!(builder.retry_interval, DEFAULT_RETRY_INTERVAL); - assert_eq!(builder.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); + async fn process_buffered_messages( + mut buffer_rx: mpsc::Receiver>, + sender: mpsc::Sender, BlockScannerError>>, + cutoff: BlockNumber, + ) { + let mut processed = 0; + let mut discarded = 0; + + // Process all buffered messages + while let Some(range) = buffer_rx.recv().await { + let (start, end) = (range.start, range.end); + if start >= cutoff { + if sender.send(Ok(range)).await.is_err() { + warn!("Subscriber channel closed, cleaning up"); + return; + } + processed += end - start; + } else if end > cutoff { + // TODO: verify the math + discarded += cutoff - start; + + let start = cutoff; + if sender.send(Ok(start..end)).await.is_err() { + warn!("Subscriber channel closed, cleaning up"); + return; + } + processed += end - start; + } else { + discarded += end - start; + } + } + + info!("Processed buffered messages: {processed} forwarded, {discarded} discarded"); + } + + async fn get_block_subscription( + provider: &impl Provider, + ) -> Result, BlockScannerError> { + let ws_stream = provider + .subscribe_blocks() + .await + .map_err(|_| BlockScannerError::WebSocketConnectionFailed(1))?; + + Ok(ws_stream) + } + + async fn send_to_subscriber(&mut self, result: Result, BlockScannerError>) { + if let Some(ref sender) = self.subscriber { + if sender.send(result).await.is_err() { + self.subscriber = None; + self.websocket_connected = false; + } else { + self.processed_count += 1; + } + } + } + + fn handle_unsubscribe(&mut self) { + if self.subscriber.take().is_some() { + info!("Unsubscribing current subscriber"); + self.websocket_connected = false; + } + } + + fn get_status(&self) -> ServiceStatus { + ServiceStatus { + is_subscribed: self.subscriber.is_some(), + last_synced_block: self.current.clone(), + websocket_connected: self.websocket_connected, + processed_count: self.processed_count, + error_count: self.error_count, + } } +} + +pub struct BlockScannerClient { + command_sender: mpsc::Sender, +} - #[test] - fn test_builder_setters() { - let mut builder = BlockScannerBuilder::::new(); - builder.with_blocks_read_per_epoch(25); - builder.with_start_height(BlockNumberOrTag::Earliest); - builder.with_end_height(BlockNumberOrTag::Latest); - builder.with_on_blocks(no_op_on_blocks::); - builder.with_reorg_rewind_depth(5); - let interval = Duration::from_secs(3); - builder.with_retry_interval(interval); - builder.with_block_confirmations(12); - - assert_eq!(builder.blocks_read_per_epoch, 25); - assert!(matches!(builder.start_height, BlockNumberOrTag::Earliest)); - assert!(matches!(builder.end_height, Some(BlockNumberOrTag::Latest))); - assert_eq!(builder.reorg_rewind_depth, 5); - assert_eq!(builder.retry_interval, interval); - assert_eq!(builder.block_confirmations, 12); +impl BlockScannerClient { + /// Creates a new subscription client. + /// + /// # Arguments + /// + /// * `command_sender` - The sender for sending commands to the subscription service. + #[must_use] + pub fn new(command_sender: mpsc::Sender) -> Self { + Self { command_sender } } + /// Subscribes to new blocks. + /// + /// # Arguments + /// + /// * `start_height` - The block number to start from. + /// * `end_height` - The block number to end at. + /// + /// # Errors + /// + /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. + pub async fn subscribe( + &self, + start_height: BlockNumberOrTag, + end_height: Option, + ) -> Result, BlockScannerError>>, BlockScannerError> + { + let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); + let (response_tx, response_rx) = oneshot::channel(); + + let command = Command::Subscribe { + sender: blocks_sender, + start_height, + end_height, + response: response_tx, + }; + + self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + + response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)??; + + let stream = ReceiverStream::new(blocks_receiver); + + Ok(stream) + } + + /// Unsubscribes the current subscriber. + /// + /// # Errors + /// + /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. + pub async fn unsubscribe(&self) -> Result<(), BlockScannerError> { + let (response_tx, response_rx) = oneshot::channel(); + + let command = Command::Unsubscribe { response: response_tx }; + + self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + + response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)? + } + + /// Returns the current status of the subscription service. + /// + /// # Errors + /// + /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. + pub async fn get_status(&self) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + + let command = Command::GetStatus { response: response_tx }; + + self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + + response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown) + } + + /// Shuts down the subscription service and unsubscribes the current subscriber. + /// + /// # Errors + /// + /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. + pub async fn shutdown(&self) -> Result<(), BlockScannerError> { + let (response_tx, response_rx) = oneshot::channel(); + + let command = Command::Shutdown { response: response_tx }; + + self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + + response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)? + } +} + +#[cfg(test)] +mod tests { + use alloy::network::Ethereum; + use alloy_node_bindings::Anvil; + + use tokio_stream::StreamExt; + + use super::*; + #[tokio::test] - async fn test_connect_ws_and_start_stream_eof() { - let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil"); - let ws = WsConnect::new(anvil.ws_endpoint_url()); - - let builder = BlockScannerBuilder::::new(); - let scanner = builder.connect_ws(ws).await.expect("failed to connect ws"); - - let mut stream = scanner.start().await; - let first = stream.next().await; - match first { - Some(Err(BlockScannerError::ErrEOF)) => {} - other => panic!("expected first stream item to be ErrEOF, got: {other:?}"), + async fn live_mode_processes_all_blocks() -> anyhow::Result<()> { + let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?; + + let sub_client = BlockScanner::new() + .with_blocks_read_per_epoch(3) + .with_reorg_rewind_depth(5) + .with_retry_interval(Duration::from_secs(1)) + .with_block_confirmations(1) + .connect_ws::(anvil.ws_endpoint_url()) + .await? + .run()?; + + let expected_blocks = 10; + + let mut receiver = + sub_client.subscribe(BlockNumberOrTag::Latest, None).await?.take(expected_blocks); + + let mut block_range_start = 0; + + while let Some(result) = receiver.next().await { + match result { + Ok(range) => { + println!("Received block range: {} - {}", range.start, range.end); + if block_range_start == 0 { + block_range_start = range.start; + } + + assert_eq!(block_range_start, range.start); + assert!(range.end >= range.start); + block_range_start = range.end; + } + Err(e) => { + panic!("Received error from subscription: {e}"); + } + } } + + Ok(()) } } diff --git a/src/builder.rs b/src/builder.rs deleted file mode 100644 index 0b9efefa..00000000 --- a/src/builder.rs +++ /dev/null @@ -1,279 +0,0 @@ -use crate::{FixedRetryConfig, scanner::Scanner, types::EventFilter}; - -pub struct ScannerBuilder { - rpc_url: String, - start_block: Option, - end_block: Option, - max_blocks_per_filter: u64, - tracked_events: Vec, - callback_config: FixedRetryConfig, -} - -impl ScannerBuilder { - pub fn new>(rpc_url: S) -> Self { - Self { - rpc_url: rpc_url.into(), - start_block: None, - end_block: None, - max_blocks_per_filter: 1000, - tracked_events: Vec::new(), - callback_config: FixedRetryConfig::default(), - } - } - - #[must_use] - pub fn start_block(mut self, start_block: u64) -> Self { - self.start_block = Some(start_block); - self - } - - #[must_use] - pub fn end_block(mut self, end_block: u64) -> Self { - self.end_block = Some(end_block); - self - } - - #[must_use] - pub fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self { - self.max_blocks_per_filter = max_blocks; - self - } - - #[must_use] - pub fn add_event_filter(mut self, filter: EventFilter) -> Self { - self.tracked_events.push(filter); - self - } - - #[must_use] - pub fn add_event_filters(mut self, filters: Vec) -> Self { - self.tracked_events.extend(filters); - self - } - - #[must_use] - pub fn callback_config(mut self, cfg: FixedRetryConfig) -> Self { - self.callback_config = cfg; - self - } - - /// Builds the scanner - /// - /// # Errors - /// - /// Returns an error if the scanner fails to build - pub async fn build(self) -> anyhow::Result { - Scanner::new( - self.rpc_url, - self.start_block, - self.end_block, - self.max_blocks_per_filter, - self.tracked_events, - self.callback_config, - ) - .await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - FixedRetryConfig, callback::EventCallback, callback_strategy::BACK_OFF_MAX_RETRIES, - }; - use alloy::{primitives::address, rpc::types::Log}; - use async_trait::async_trait; - use std::sync::Arc; - - struct MockCallback; - - #[async_trait] - impl EventCallback for MockCallback { - async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { - Ok(()) - } - } - - const WS_URL: &str = "ws://localhost:8545"; - - #[test] - fn test_builder_new_defaults() { - let builder = ScannerBuilder::new(WS_URL); - assert_eq!(builder.rpc_url, WS_URL); - assert_eq!(builder.start_block, None); - assert_eq!(builder.end_block, None); - assert_eq!(builder.max_blocks_per_filter, 1000); - assert!(builder.tracked_events.is_empty()); - } - - #[test] - fn test_builder_start_block() { - let start_block = 100; - let builder = ScannerBuilder::new(WS_URL).start_block(start_block); - assert_eq!(builder.start_block, Some(start_block)); - } - - #[test] - fn test_builder_end_block() { - let end_block = 500; - let builder = ScannerBuilder::new(WS_URL).end_block(end_block); - assert_eq!(builder.end_block, Some(end_block)); - } - - #[test] - fn test_builder_block_range() { - let start_block = 100; - let end_block = 500; - let builder = ScannerBuilder::new(WS_URL).start_block(start_block).end_block(end_block); - assert_eq!(builder.start_block, Some(start_block)); - assert_eq!(builder.end_block, Some(end_block)); - } - - #[test] - fn test_builder_max_blocks_per_filter() { - let max_blocks = 5000; - let builder = ScannerBuilder::new(WS_URL).max_blocks_per_filter(max_blocks); - assert_eq!(builder.max_blocks_per_filter, max_blocks); - } - - #[test] - fn test_builder_callback_config() { - let max_attempts = 5; - let delay_ms = 500; - let config = FixedRetryConfig { max_attempts, delay_ms }; - - let builder = ScannerBuilder::new(WS_URL).callback_config(config); - - assert_eq!(builder.callback_config.max_attempts, max_attempts); - assert_eq!(builder.callback_config.delay_ms, delay_ms); - } - - #[test] - fn test_builder_default_callback_config() { - let builder = ScannerBuilder::new(WS_URL); - - assert_eq!(builder.callback_config.max_attempts, BACK_OFF_MAX_RETRIES); - assert_eq!(builder.callback_config.delay_ms, 200); - } - - #[test] - fn test_builder_add_event_filter() { - let addr = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); - let event = "Transfer(address,address,uint256)".to_string(); - let filter = EventFilter { - contract_address: addr, - event: event.clone(), - callback: Arc::new(MockCallback), - }; - let builder = ScannerBuilder::new(WS_URL).add_event_filter(filter.clone()); - - assert_eq!(builder.tracked_events.len(), 1); - assert_eq!(builder.tracked_events[0].contract_address, addr); - assert_eq!(builder.tracked_events[0].event, event); - } - - #[test] - fn test_builder_add_multiple_event_filters() { - let addr1 = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); - let event1 = "Transfer(address,address,uint256)".to_string(); - let addr2 = address!("70997970C51812dc3A010C7d01b50e0d17dc79C8"); - let event2 = "Approval(address,address,uint256)".to_string(); - - let filter1 = EventFilter { - contract_address: addr1, - event: event1.clone(), - callback: Arc::new(MockCallback), - }; - let filter2 = EventFilter { - contract_address: addr2, - event: event2.clone(), - callback: Arc::new(MockCallback), - }; - - let builder = ScannerBuilder::new(WS_URL) - .add_event_filter(filter1.clone()) - .add_event_filter(filter2.clone()); - - assert_eq!(builder.tracked_events.len(), 2); - for (i, expected_filter) in builder.tracked_events.iter().enumerate() { - assert_eq!( - builder.tracked_events[i].contract_address, - expected_filter.contract_address - ); - assert_eq!(builder.tracked_events[i].event, expected_filter.event); - } - } - - #[test] - fn test_builder_add_event_filters_batch() { - let addr1 = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); - let event1 = "Transfer(address,address,uint256)".to_string(); - let addr2 = address!("70997970C51812dc3A010C7d01b50e0d17dc79C8"); - let event2 = "Approval(address,address,uint256)".to_string(); - let addr3 = address!("3C44CdDdB6a900fa2b585dd299e03d12FA4293BC"); - let event3 = "Mint(address,uint256)".to_string(); - - let filter_1 = EventFilter { - contract_address: addr1, - event: event1.clone(), - callback: Arc::new(MockCallback), - }; - let filter_2 = EventFilter { - contract_address: addr2, - event: event2.clone(), - callback: Arc::new(MockCallback), - }; - let filter_3 = EventFilter { - contract_address: addr3, - event: event3.clone(), - callback: Arc::new(MockCallback), - }; - - let filters = vec![filter_1.clone(), filter_2.clone(), filter_3.clone()]; - let builder = ScannerBuilder::new(WS_URL).add_event_filters(filters.clone()); - - assert_eq!(builder.tracked_events.len(), filters.len()); - - for (i, expected_filter) in filters.iter().enumerate() { - assert_eq!( - builder.tracked_events[i].contract_address, - expected_filter.contract_address - ); - assert_eq!(builder.tracked_events[i].event, expected_filter.event); - } - } - - #[test] - fn test_builder_chain_all_methods() { - let start_block = 100; - let end_block = 500; - - let addr = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); - let event = "Transfer(address,address,uint256)".to_string(); - - let filter = EventFilter { - contract_address: addr, - event: event.clone(), - callback: Arc::new(MockCallback), - }; - - let max_attempts = 5; - let delay_ms = 500; - let config = FixedRetryConfig { max_attempts, delay_ms }; - - let max_blocks_per_filter = 2000; - let builder = ScannerBuilder::new(WS_URL) - .start_block(start_block) - .end_block(end_block) - .max_blocks_per_filter(max_blocks_per_filter) - .add_event_filter(filter.clone()) - .callback_config(config); - - assert_eq!(builder.start_block, Some(start_block)); - assert_eq!(builder.end_block, Some(end_block)); - assert_eq!(builder.max_blocks_per_filter, max_blocks_per_filter); - assert_eq!(builder.tracked_events.len(), 1); - assert_eq!(builder.callback_config.max_attempts, max_attempts); - assert_eq!(builder.callback_config.delay_ms, delay_ms); - } -} diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 6ceb06e6..1f9cc164 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -1,42 +1,38 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use crate::{ - block_scanner::{BlockScanner, BlockScannerBuilder, OnBlocksFunc}, + block_scanner::{BlockScanner, BlockScannerError, ConnectedBlockScanner}, callback::strategy::{CallbackStrategy, StateSyncAwareStrategy}, types::EventFilter, }; use alloy::{ eips::BlockNumberOrTag, network::Network, - providers::{IpcConnect, Provider, RootProvider, WsConnect}, - pubsub::PubSubConnect, - rpc::{ - client::RpcClient, - types::{Filter, Log}, - }, - transports::TransportError, + providers::Provider, + rpc::types::{Filter, Log}, + transports::http::reqwest::Url, }; use tokio::sync::mpsc::{self, Receiver}; use tokio_stream::StreamExt; use tracing::{error, info, warn}; -pub struct EventScannerBuilder { - block_scanner: BlockScannerBuilder, +pub struct EventScannerBuilder { + block_scanner: BlockScanner, tracked_events: Vec, callback_strategy: Arc, } -impl Default for EventScannerBuilder { +impl Default for EventScannerBuilder { fn default() -> Self { Self::new() } } -impl EventScannerBuilder { +impl EventScannerBuilder { #[must_use] pub fn new() -> Self { Self { - block_scanner: BlockScannerBuilder::new(), + block_scanner: BlockScanner::new(), tracked_events: Vec::new(), callback_strategy: Self::get_default_callback_strategy(), } @@ -62,21 +58,6 @@ impl EventScannerBuilder { self } - pub fn with_start_height(&mut self, start_height: BlockNumberOrTag) -> &mut Self { - let _ = self.block_scanner.with_start_height(start_height); - self - } - - pub fn with_end_height(&mut self, end_height: BlockNumberOrTag) -> &mut Self { - let _ = self.block_scanner.with_end_height(end_height); - self - } - - pub fn with_on_blocks(&mut self, on_blocks: OnBlocksFunc) -> &mut Self { - let _ = self.block_scanner.with_on_blocks(on_blocks); - self - } - pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { let _ = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth); self @@ -97,11 +78,11 @@ impl EventScannerBuilder { /// # Errors /// /// Returns an error if the connection fails - pub async fn connect_ws( + pub async fn connect_ws( self, - connect: WsConnect, - ) -> Result, N>, TransportError> { - let block_scanner = self.block_scanner.connect_ws(connect).await?; + ws_url: Url, + ) -> Result, BlockScannerError> { + let block_scanner = self.block_scanner.connect_ws(ws_url).await?; Ok(EventScanner { block_scanner, tracked_events: self.tracked_events, @@ -114,14 +95,11 @@ impl EventScannerBuilder { /// # Errors /// /// Returns an error if the connection fails - pub async fn connect_ipc( + pub async fn connect_ipc( self, - connect: IpcConnect, - ) -> Result, N>, TransportError> - where - IpcConnect: PubSubConnect, - { - let block_scanner = self.block_scanner.connect_ipc(connect).await?; + ipc_path: impl Into, + ) -> Result, BlockScannerError> { + let block_scanner = self.block_scanner.connect_ipc(ipc_path.into()).await?; Ok(EventScanner { block_scanner, tracked_events: self.tracked_events, @@ -129,45 +107,29 @@ impl EventScannerBuilder { }) } - #[must_use] - pub fn connect_client(self, client: RpcClient) -> EventScanner, N> { - let block_scanner = self.block_scanner.connect_client(client); - EventScanner { - block_scanner, - tracked_events: self.tracked_events, - callback_strategy: self.callback_strategy, - } - } - - #[must_use] - pub fn connect_provider(self, provider: RootProvider) -> EventScanner, N> { - let block_scanner = self.block_scanner.connect_provider(provider); - EventScanner { - block_scanner, - tracked_events: self.tracked_events, - callback_strategy: self.callback_strategy, - } - } - fn get_default_callback_strategy() -> Arc { let state_sync_aware_strategy = StateSyncAwareStrategy::new(); Arc::new(state_sync_aware_strategy) } } -pub struct EventScanner, N: Network> { - block_scanner: BlockScanner, +pub struct EventScanner { + block_scanner: ConnectedBlockScanner, tracked_events: Vec, callback_strategy: Arc, } -impl, N: Network> EventScanner { +impl EventScanner { /// Starts the scanner /// /// # Errors /// /// Returns an error if the scanner fails to start - pub async fn start(&mut self) -> anyhow::Result<()> { + pub async fn start( + &mut self, + start_height: BlockNumberOrTag, + end_height: Option, + ) -> anyhow::Result<()> { let mut event_channels: HashMap> = HashMap::new(); for filter in &self.tracked_events { @@ -193,7 +155,9 @@ impl, N: Network> EventScanner { event_channels.insert(event_name, sender); } - let mut stream = self.block_scanner.start().await; + let client = self.block_scanner.run()?; + let mut stream = client.subscribe(start_height, end_height).await?; + while let Some(range) = stream.next().await { match range { Ok(range) => { diff --git a/src/lib.rs b/src/lib.rs index 7c4f4845..4b4f4ae7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,9 @@ pub mod block_scanner; -pub mod builder; pub mod callback; pub mod event_scanner; -pub mod scanner; pub mod types; pub use crate::{ - builder::ScannerBuilder, callback::{ EventCallback, strategy as callback_strategy, strategy::{ @@ -14,6 +11,5 @@ pub use crate::{ StateSyncConfig, }, }, - scanner::Scanner, types::EventFilter, }; diff --git a/src/scanner.rs b/src/scanner.rs deleted file mode 100644 index d5eb4ec5..00000000 --- a/src/scanner.rs +++ /dev/null @@ -1,313 +0,0 @@ -use std::sync::Arc; - -use alloy::{ - providers::{IpcConnect, Provider, ProviderBuilder, WsConnect}, - rpc::types::{Filter, Log}, -}; -use tokio::time::{Duration, sleep}; -use tokio_stream::StreamExt; -use tracing::{error, info, warn}; - -use crate::{FixedRetryConfig, callback::EventCallback, types::EventFilter}; - -enum ProviderType { - WebSocket, - Ipc, -} - -pub struct Scanner { - provider: Arc, - current_head: Option, - start_block: Option, - end_block: Option, - max_blocks_per_filter: u64, - tracked_events: Vec, - callback_config: FixedRetryConfig, -} - -impl Scanner { - /// Creates a new scanner - /// - /// # Errors - /// - /// Returns an error if the scanner fails to start - pub async fn new( - rpc_url: String, - start_block: Option, - end_block: Option, - max_blocks_per_filter: u64, - tracked_events: Vec, - callback_config: FixedRetryConfig, - ) -> anyhow::Result { - let provider = Self::get_provider(&rpc_url).await?; - - Ok(Self { - provider, - current_head: start_block, - start_block, - end_block, - max_blocks_per_filter, - tracked_events, - callback_config, - }) - } - - async fn get_provider(url: &str) -> anyhow::Result> { - let provider = match Self::detect_provider_type(url) { - Ok(ProviderType::WebSocket) => { - info!("connecting to provider via WebSocket: {}", url); - let ws = WsConnect::new(url); - ProviderBuilder::new().connect_ws(ws).await? - } - Ok(ProviderType::Ipc) => { - info!("connecting to provider via IPC: {}", url); - let ipc = IpcConnect::new(url.to_string()); - ProviderBuilder::new().connect_ipc(ipc).await? - } - Err(e) => return Err(e), - }; - Ok(Arc::new(provider)) - } - - fn detect_provider_type(url: &str) -> anyhow::Result { - if url.starts_with("ws://") || url.starts_with("wss://") { - Ok(ProviderType::WebSocket) - } else if std::path::Path::new(url) - .extension() - .is_some_and(|ext| ext.eq_ignore_ascii_case("ipc")) || - url.contains("ipc") - { - Ok(ProviderType::Ipc) - } else { - Err(anyhow::anyhow!( - "Unknown provider type for URL: {}. Expected ws://, wss://, or IPC path", - url - )) - } - } - - /// Starts the scanner - /// - /// # Errors - /// - /// Returns an error if the scanner fails to start - pub async fn start(&mut self) -> anyhow::Result<()> { - match (self.start_block, self.end_block) { - (None, Some(end)) => { - info!("Scanning from genesis block 0 to {}", end); - self.process_historical_blocks(0, Some(end)).await?; - } - (Some(start), Some(end)) => { - info!("Scanning from block {} to {}", start, end); - self.process_historical_blocks(start, Some(end)).await?; - } - (Some(start), None) => { - info!("Scanning from block {} to latest, then switching to live mode", start); - self.process_historical_blocks(start, None).await?; - self.subscribe_to_new_blocks().await?; - } - (None, None) => { - info!("Starting in live mode only"); - self.subscribe_to_new_blocks().await?; - } - } - Ok(()) - } - - #[must_use] - pub fn current_head(&self) -> Option { - self.current_head - } - - pub fn add_event_filter(&mut self, filter: EventFilter) { - self.tracked_events.push(filter); - } - - async fn process_historical_blocks( - &mut self, - start_block: u64, - end_block: Option, - ) -> anyhow::Result<()> { - info!(start_block, end_block = ?end_block, "starting historical block processing"); - - let mut current = start_block; - let max_blocks = self.max_blocks_per_filter; - - let target_end = match end_block { - Some(end) => end, - None => self.provider.get_block_number().await?, - }; - - loop { - if current > target_end { - self.current_head = Some(target_end); - info!(last_block = target_end, "Historical processing completed"); - break; - } - - let to_block = (current + max_blocks - 1).min(target_end); - - info!(from_block = current, to_block, target_end, "processing historical block range"); - - self.process_block_events(current, to_block).await?; - - current = to_block + 1; - } - - Ok(()) - } - - async fn subscribe_to_new_blocks(&mut self) -> anyhow::Result<()> { - info!("starting scanner in live mode"); - let sub = self.provider.subscribe_blocks().await?; - let mut stream = sub.into_stream(); - - while let Some(block) = stream.next().await { - let latest_block_number = block.number; - - // TODO: Handle reorgs - let from_block = match self.current_head { - Some(head) => head, - None => latest_block_number, - }; - let to_block = latest_block_number; - - info!(from_block, to_block, "processing blocks:"); - - self.process_block_events(from_block, to_block).await?; - - self.current_head = Some(latest_block_number + 1); - } - - Ok(()) - } - - async fn process_block_events(&self, from_block: u64, to_block: u64) -> anyhow::Result<()> { - for event_filter in &self.tracked_events { - let filter = Filter::new() - .address(event_filter.contract_address) - .event(event_filter.event.as_str()) - .from_block(from_block) - .to_block(to_block); - - match self.provider.get_logs(&filter).await { - Ok(logs) => { - for log in &logs { - info!( - contract = ?event_filter.contract_address, - event = event_filter.event, - log_count = &logs.len(), - at_block = log.block_number, - from_block, - to_block, - "found logs for event in block range" - ); - if let Err(e) = self.invoke_with_retry(&event_filter.callback, log).await { - error!( - contract = ?event_filter.contract_address, - event = event_filter.event, - error = %e, - "failed to invoke callback after retries" - ); - } - } - } - Err(e) => { - error!( - contract = ?event_filter.contract_address, - event = event_filter.event, - error = %e, - from_block, - to_block, - "failed to get logs for block range" - ); - } - } - } - - Ok(()) - } - - async fn invoke_with_retry( - &self, - callback: &Arc, - log: &Log, - ) -> anyhow::Result<()> { - Self::invoke_with_retry_static(callback, log, &self.callback_config).await - } - - async fn invoke_with_retry_static( - callback: &Arc, - log: &Log, - config: &FixedRetryConfig, - ) -> anyhow::Result<()> { - let attempts = config.max_attempts.max(1); - let mut last_err: Option = None; - for attempt in 1..=attempts { - match callback.on_event(log).await { - Ok(()) => return Ok(()), - Err(e) => { - last_err = Some(e); - if attempt < attempts { - warn!( - attempt, - max_attempts = attempts, - "callback failed; retrying after fixed delay" - ); - sleep(Duration::from_millis(config.delay_ms)).await; - } - } - } - } - Err(last_err.unwrap_or_else(|| anyhow::anyhow!("callback failed with unknown error"))) - } -} - -#[cfg(test)] -mod tests { - use alloy_node_bindings::Anvil; - - use crate::ScannerBuilder; - - use super::*; - - #[test] - fn test_detect_provider_type_websocket() { - assert!(matches!( - Scanner::detect_provider_type("ws://localhost:8545"), - Ok(ProviderType::WebSocket) - )); - assert!(matches!( - Scanner::detect_provider_type("wss://mainnet.infura.io/ws"), - Ok(ProviderType::WebSocket) - )); - } - - #[test] - fn test_detect_provider_type_ipc() { - assert!(matches!(Scanner::detect_provider_type("/tmp/geth.ipc"), Ok(ProviderType::Ipc))); - assert!(matches!( - Scanner::detect_provider_type("./path/to/node.ipc"), - Ok(ProviderType::Ipc) - )); - assert!(matches!( - Scanner::detect_provider_type("/var/run/geth/ipc"), - Ok(ProviderType::Ipc) - )); - } - - #[test] - fn test_detect_provider_type_invalid() { - assert!(Scanner::detect_provider_type("http://localhost:8545").is_err()); - assert!(Scanner::detect_provider_type("https://mainnet.infura.io").is_err()); - assert!(Scanner::detect_provider_type("invalid-url").is_err()); - } - - #[tokio::test] - async fn test_builds_ws_provider() { - let anvil = Anvil::new().try_spawn().unwrap(); - let rpc_url = anvil.ws_endpoint_url(); - let scanner = ScannerBuilder::new(rpc_url).build().await; - assert!(scanner.is_ok()); - } -} diff --git a/tests/live_scanning_tests.rs b/tests/live_scanning_tests.rs deleted file mode 100644 index 19586386..00000000 --- a/tests/live_scanning_tests.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::{ - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, - time::Duration, -}; - -mod common; -use alloy::sol_types::SolEvent; -use common::{TestCounter, build_provider, deploy_counter, spawn_anvil}; -use event_scanner::{EventFilter, ScannerBuilder}; -use tokio::time::sleep; - -use crate::common::{EventCounter, SlowProcessor}; - -#[tokio::test] -async fn test_live_scanning_basic() -> anyhow::Result<()> { - let anvil = spawn_anvil(1)?; - let provider = build_provider(&anvil).await?; - let contract = deploy_counter(provider.clone()).await?; - let contract_address = *contract.address(); - - let event_count = Arc::new(AtomicUsize::new(0)); - let callback = Arc::new(EventCounter { count: event_count.clone() }); - - let filter = EventFilter { - contract_address, - event: TestCounter::CountIncreased::SIGNATURE.to_owned(), - callback, - }; - - let mut scanner = - ScannerBuilder::new(anvil.ws_endpoint_url()).add_event_filter(filter).build().await?; - - let scanner_handle = tokio::spawn(async move { scanner.start().await }); - - for _ in 0..5 { - let _ = contract.increase().send().await?.get_receipt().await?; - } - - sleep(Duration::from_millis(200)).await; - - scanner_handle.abort(); - - assert_eq!(event_count.load(Ordering::SeqCst), 5); - - Ok(()) -} - -#[tokio::test] -async fn test_live_scanning_multiple_events() -> anyhow::Result<()> { - let anvil = spawn_anvil(1)?; - let provider = build_provider(&anvil).await?; - let contract = deploy_counter(provider).await?; - let contract_address = *contract.address(); - - let increase_count = Arc::new(AtomicUsize::new(0)); - let decrease_count = Arc::new(AtomicUsize::new(0)); - - let increase_callback = Arc::new(EventCounter { count: increase_count.clone() }); - - let decrease_callback = Arc::new(EventCounter { count: decrease_count.clone() }); - - let increase_filter = EventFilter { - contract_address, - event: TestCounter::CountIncreased::SIGNATURE.to_owned(), - callback: increase_callback, - }; - - let decrease_filter = EventFilter { - contract_address, - event: TestCounter::CountDecreased::SIGNATURE.to_owned(), - callback: decrease_callback, - }; - - let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url()) - .add_event_filter(increase_filter) - .add_event_filter(decrease_filter) - .build() - .await?; - - let scanner_handle = tokio::spawn(async move { scanner.start().await }); - - for i in 0..6 { - let _ = contract.increase().send().await?.get_receipt().await?; - if i >= 4 { - let _ = contract.decrease().send().await?.get_receipt().await?; - } - } - - sleep(Duration::from_millis(200)).await; - - scanner_handle.abort(); - - assert_eq!(increase_count.load(Ordering::SeqCst), 6); - assert_eq!(decrease_count.load(Ordering::SeqCst), 2); - - Ok(()) -} - -#[tokio::test] -async fn test_live_scanning_with_slow_processor() -> anyhow::Result<()> { - let anvil = spawn_anvil(1)?; - let provider = build_provider(&anvil).await?; - let contract = deploy_counter(provider.clone()).await?; - let contract_address = *contract.address(); - - let processed = Arc::new(AtomicUsize::new(0)); - let callback = Arc::new(SlowProcessor { delay_ms: 100, processed: processed.clone() }); - - let filter = EventFilter { - contract_address, - event: TestCounter::CountIncreased::SIGNATURE.to_owned(), - callback, - }; - - let mut scanner = - ScannerBuilder::new(anvil.ws_endpoint_url()).add_event_filter(filter).build().await?; - - let scanner_handle = tokio::spawn(async move { scanner.start().await }); - - for _ in 0..3 { - let _ = contract.increase().send().await?.get_receipt().await?; - // Less than processor delay - sleep(Duration::from_millis(50)).await; - } - - sleep(Duration::from_millis(200)).await; - - scanner_handle.abort(); - - assert_eq!(processed.load(Ordering::SeqCst), 3); - - Ok(()) -}