From 63720599a1e1ab308cb53abd898c18c68bf84481 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Thu, 18 Sep 2025 13:14:24 +0200 Subject: [PATCH] Align type names --- src/block_range_scanner.rs | 119 ++++++++++++++++++------------------- src/event_scanner.rs | 8 +-- 2 files changed, 62 insertions(+), 65 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index c538b6ec..e3ac8980 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -2,12 +2,13 @@ //! //! ```rust,no_run //! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber}; -//! use event_scanner::block_range_scanner::BlockScannerError; //! use std::ops::Range; //! use tokio_stream::{StreamExt, wrappers::ReceiverStream}; //! //! use alloy::transports::http::reqwest::Url; -//! use event_scanner::block_range_scanner::{BlockRangeScanner, BlockScannerClient}; +//! use event_scanner::block_range_scanner::{ +//! BlockRangeScanner, BlockRangeScannerClient, Error as BlockRangeScannerError, +//! }; //! use tokio::time::Duration; //! use tracing::{error, info}; //! @@ -26,10 +27,10 @@ //! .await?; //! //! // Create client to send subscribe command to block scanner -//! let subscription_client: BlockScannerClient = block_range_scanner.run()?; +//! let client: BlockRangeScannerClient = block_range_scanner.run()?; //! -//! let mut receiver: ReceiverStream, BlockScannerError>> = -//! subscription_client +//! let mut receiver: ReceiverStream, BlockRangeScannerError>> = +//! client //! .subscribe( //! BlockNumberOrTag::Latest, //! None, // just subscribe to new blocks @@ -46,8 +47,8 @@ //! //! // Decide whether to continue or break based on error type //! match e { -//! BlockScannerError::ServiceShutdown => break, -//! BlockScannerError::WebSocketConnectionFailed(_) => { +//! BlockRangeScannerError::ServiceShutdown => break, +//! BlockRangeScannerError::WebSocketConnectionFailed(_) => { //! // Maybe implement backoff and retry logic here //! error!( //! "WebSocket connection failed, continuing to listen for reconnection" @@ -106,7 +107,7 @@ const DEFAULT_REORG_REWIND_DEPTH: u64 = 0; // const STATE_SYNC_MAX_RETRIES: u64 = 12; #[derive(Error, Debug)] -pub enum BlockScannerError { +pub enum Error { #[error("HTTP request failed: {0}")] HttpError(#[from] reqwest::Error), @@ -140,19 +141,19 @@ pub enum BlockScannerError { #[derive(Debug)] pub enum Command { Subscribe { - sender: mpsc::Sender, BlockScannerError>>, + sender: mpsc::Sender, Error>>, start_height: BlockNumberOrTag, end_height: Option, - response: oneshot::Sender>, + response: oneshot::Sender>, }, Unsubscribe { - response: oneshot::Sender>, + response: oneshot::Sender>, }, GetStatus { response: oneshot::Sender, }, Shutdown { - response: oneshot::Sender>, + response: oneshot::Sender>, }, } @@ -243,10 +244,10 @@ impl BlockRangeScanner { pub async fn connect_ws( self, ws_url: Url, - ) -> TransportResult> { + ) -> TransportResult> { let provider = RootProvider::::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?); - Ok(ConnectedBlockScanner { + Ok(ConnectedBlockRangeScanner { provider, config: Config { blocks_read_per_epoch: self.blocks_read_per_epoch, @@ -265,9 +266,9 @@ impl BlockRangeScanner { pub async fn connect_ipc( self, ipc_path: String, - ) -> TransportResult> { + ) -> TransportResult> { let provider = RootProvider::::new(ClientBuilder::default().ipc(ipc_path.into()).await?); - Ok(ConnectedBlockScanner { + Ok(ConnectedBlockRangeScanner { provider, config: Config { blocks_read_per_epoch: self.blocks_read_per_epoch, @@ -279,12 +280,12 @@ impl BlockRangeScanner { } } -pub struct ConnectedBlockScanner { +pub struct ConnectedBlockRangeScanner { provider: RootProvider, config: Config, } -impl ConnectedBlockScanner { +impl ConnectedBlockRangeScanner { /// Returns the underlying Provider. #[must_use] pub fn provider(&self) -> &impl Provider { @@ -297,20 +298,19 @@ impl ConnectedBlockScanner { /// # Errors /// /// Returns an error if the subscription service fails to start. - pub fn run(&self) -> anyhow::Result { - let (service, cmd_tx) = - BlockScannerService::new(self.config.clone(), self.provider.clone()); + pub fn run(&self) -> anyhow::Result { + let (service, cmd_tx) = Service::new(self.config.clone(), self.provider.clone()); tokio::spawn(async move { service.run().await; }); - Ok(BlockScannerClient::new(cmd_tx)) + Ok(BlockRangeScannerClient::new(cmd_tx)) } } -struct BlockScannerService { +struct Service { config: Config, provider: RootProvider, - subscriber: Option, BlockScannerError>>>, + subscriber: Option, Error>>>, current: Option, websocket_connected: bool, processed_count: u64, @@ -319,7 +319,7 @@ struct BlockScannerService { shutdown: bool, } -impl BlockScannerService { +impl Service { pub fn new(config: Config, provider: RootProvider) -> (Self, mpsc::Sender) { let (cmd_tx, cmd_rx) = mpsc::channel(100); @@ -360,7 +360,7 @@ impl BlockScannerService { info!("Subscription service stopped"); } - async fn handle_command(&mut self, command: Command) -> Result<(), BlockScannerError> { + async fn handle_command(&mut self, command: Command) -> Result<(), Error> { match command { Command::Subscribe { sender, start_height, end_height, response } => { let result = self.handle_subscribe(sender, start_height, end_height).await; @@ -385,12 +385,12 @@ impl BlockScannerService { async fn handle_subscribe( &mut self, - sender: mpsc::Sender, BlockScannerError>>, + sender: mpsc::Sender, Error>>, start_height: BlockNumberOrTag, end_height: Option, - ) -> Result<(), BlockScannerError> { + ) -> Result<(), Error> { if self.subscriber.is_some() { - return Err(BlockScannerError::MultipleSubscribers); + return Err(Error::MultipleSubscribers); } // TODO: update local state relate to reorg and validate data @@ -407,7 +407,7 @@ impl BlockScannerService { &mut self, start_height: BlockNumberOrTag, end_height: Option, - ) -> Result<(), BlockScannerError> { + ) -> Result<(), Error> { // Step 1: Establish WebSocket connection let (buffer_tx, buffer_rx) = mpsc::channel(MAX_BUFFERED_MESSAGES); @@ -451,7 +451,7 @@ impl BlockScannerService { if let Err(e) = self.sync_historical_data(start_block, sync_end_block).await { warn!("aborting ws_task"); ws_task.abort(); - return Err(BlockScannerError::HistoricalSyncError(e.to_string())); + return Err(Error::HistoricalSyncError(e.to_string())); } // Step 3: Process buffered WebSocket messages @@ -459,7 +459,7 @@ impl BlockScannerService { tokio::spawn(async move { if end_height.is_none() { Self::process_buffered_messages(buffer_rx, sender, cutoff).await; - } else if sender.send(Err(BlockScannerError::Eof)).await.is_err() { + } else if sender.send(Err(Error::Eof)).await.is_err() { warn!("Subscriber channel closed, cleaning up"); } }); @@ -477,7 +477,7 @@ impl BlockScannerService { &mut self, start: N::BlockResponse, end: N::BlockResponse, - ) -> Result<(), BlockScannerError> { + ) -> Result<(), Error> { let mut batch_count = 0; self.current = Some(BlockHashAndNumber::from_header::(start.header())); @@ -514,7 +514,7 @@ impl BlockScannerService { Ok(()) } - async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockScannerError> { + async fn ensure_current_not_reorged(&mut self) -> Result<(), Error> { let current_block = self.provider.get_block_by_hash(self.current.as_ref().unwrap().hash).await?; if current_block.is_some() { @@ -524,7 +524,7 @@ impl BlockScannerService { self.rewind_on_reorg_detected().await } - async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockScannerError> { + async fn rewind_on_reorg_detected(&mut self) -> Result<(), Error> { let mut new_current_height = if self.current.as_ref().unwrap().number <= self.config.reorg_rewind_depth { 0 @@ -596,7 +596,7 @@ impl BlockScannerService { async fn process_buffered_messages( mut buffer_rx: mpsc::Receiver>, - sender: mpsc::Sender, BlockScannerError>>, + sender: mpsc::Sender, Error>>, cutoff: BlockNumber, ) { let mut processed = 0; @@ -631,16 +631,14 @@ impl BlockScannerService { async fn get_block_subscription( provider: &impl Provider, - ) -> Result, BlockScannerError> { - let ws_stream = provider - .subscribe_blocks() - .await - .map_err(|_| BlockScannerError::WebSocketConnectionFailed(1))?; + ) -> Result, Error> { + let ws_stream = + provider.subscribe_blocks().await.map_err(|_| Error::WebSocketConnectionFailed(1))?; Ok(ws_stream) } - async fn send_to_subscriber(&mut self, result: Result, BlockScannerError>) { + async fn send_to_subscriber(&mut self, result: Result, Error>) { if let Some(ref sender) = self.subscriber { if sender.send(result).await.is_err() { self.subscriber = None; @@ -669,11 +667,11 @@ impl BlockScannerService { } } -pub struct BlockScannerClient { +pub struct BlockRangeScannerClient { command_sender: mpsc::Sender, } -impl BlockScannerClient { +impl BlockRangeScannerClient { /// Creates a new subscription client. /// /// # Arguments @@ -693,13 +691,12 @@ impl BlockScannerClient { /// /// # Errors /// - /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. + /// * `Error::ServiceShutdown` - if the service is already shutting down. pub async fn subscribe( &self, start_height: BlockNumberOrTag, end_height: Option, - ) -> Result, BlockScannerError>>, BlockScannerError> - { + ) -> Result, Error>>, Error> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -710,9 +707,9 @@ impl BlockScannerClient { response: response_tx, }; - self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?; - response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)??; + response_rx.await.map_err(|_| Error::ServiceShutdown)??; let stream = ReceiverStream::new(blocks_receiver); @@ -723,45 +720,45 @@ impl BlockScannerClient { /// /// # Errors /// - /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn unsubscribe(&self) -> Result<(), BlockScannerError> { + /// * `Error::ServiceShutdown` - if the service is already shutting down. + pub async fn unsubscribe(&self) -> Result<(), Error> { let (response_tx, response_rx) = oneshot::channel(); let command = Command::Unsubscribe { response: response_tx }; - self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?; - response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)? + response_rx.await.map_err(|_| Error::ServiceShutdown)? } /// Returns the current status of the subscription service. /// /// # Errors /// - /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn get_status(&self) -> Result { + /// * `Error::ServiceShutdown` - if the service is already shutting down. + pub async fn get_status(&self) -> Result { let (response_tx, response_rx) = oneshot::channel(); let command = Command::GetStatus { response: response_tx }; - self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?; - response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown) + response_rx.await.map_err(|_| Error::ServiceShutdown) } /// Shuts down the subscription service and unsubscribes the current subscriber. /// /// # Errors /// - /// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn shutdown(&self) -> Result<(), BlockScannerError> { + /// * `Error::ServiceShutdown` - if the service is already shutting down. + pub async fn shutdown(&self) -> Result<(), Error> { let (response_tx, response_rx) = oneshot::channel(); let command = Command::Shutdown { response: response_tx }; - self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?; + self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?; - response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)? + response_rx.await.map_err(|_| Error::ServiceShutdown)? } } diff --git a/src/event_scanner.rs b/src/event_scanner.rs index e73d8e2f..3dca86ec 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use crate::{ - block_range_scanner::{BlockRangeScanner, BlockScannerError, ConnectedBlockScanner}, + block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner}, callback::strategy::{CallbackStrategy, StateSyncAwareStrategy}, types::EventFilter, }; @@ -100,7 +100,7 @@ impl EventScannerBuilder { pub async fn connect_ws( self, ws_url: Url, - ) -> Result, BlockScannerError> { + ) -> Result, block_range_scanner::Error> { let block_range_scanner = self.block_range_scanner.connect_ws(ws_url).await?; Ok(EventScanner { block_range_scanner, @@ -117,7 +117,7 @@ impl EventScannerBuilder { pub async fn connect_ipc( self, ipc_path: impl Into, - ) -> Result, BlockScannerError> { + ) -> Result, block_range_scanner::Error> { let block_range_scanner = self.block_range_scanner.connect_ipc(ipc_path.into()).await?; Ok(EventScanner { block_range_scanner, @@ -134,7 +134,7 @@ impl EventScannerBuilder { } pub struct EventScanner { - block_range_scanner: ConnectedBlockScanner, + block_range_scanner: ConnectedBlockRangeScanner, tracked_events: Vec, callback_strategy: Arc, }