Skip to content

Commit 1076a18

Browse files
authored
Align block scanner type names (#39)
1 parent 9f775c3 commit 1076a18

File tree

2 files changed

+62
-65
lines changed

2 files changed

+62
-65
lines changed

src/block_range_scanner.rs

Lines changed: 58 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
//!
33
//! ```rust,no_run
44
//! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber};
5-
//! use event_scanner::block_range_scanner::BlockScannerError;
65
//! use std::ops::Range;
76
//! use tokio_stream::{StreamExt, wrappers::ReceiverStream};
87
//!
98
//! use alloy::transports::http::reqwest::Url;
10-
//! use event_scanner::block_range_scanner::{BlockRangeScanner, BlockScannerClient};
9+
//! use event_scanner::block_range_scanner::{
10+
//! BlockRangeScanner, BlockRangeScannerClient, Error as BlockRangeScannerError,
11+
//! };
1112
//! use tokio::time::Duration;
1213
//! use tracing::{error, info};
1314
//!
@@ -26,10 +27,10 @@
2627
//! .await?;
2728
//!
2829
//! // Create client to send subscribe command to block scanner
29-
//! let subscription_client: BlockScannerClient = block_range_scanner.run()?;
30+
//! let client: BlockRangeScannerClient = block_range_scanner.run()?;
3031
//!
31-
//! let mut receiver: ReceiverStream<Result<Range<BlockNumber>, BlockScannerError>> =
32-
//! subscription_client
32+
//! let mut receiver: ReceiverStream<Result<Range<BlockNumber>, BlockRangeScannerError>> =
33+
//! client
3334
//! .subscribe(
3435
//! BlockNumberOrTag::Latest,
3536
//! None, // just subscribe to new blocks
@@ -46,8 +47,8 @@
4647
//!
4748
//! // Decide whether to continue or break based on error type
4849
//! match e {
49-
//! BlockScannerError::ServiceShutdown => break,
50-
//! BlockScannerError::WebSocketConnectionFailed(_) => {
50+
//! BlockRangeScannerError::ServiceShutdown => break,
51+
//! BlockRangeScannerError::WebSocketConnectionFailed(_) => {
5152
//! // Maybe implement backoff and retry logic here
5253
//! error!(
5354
//! "WebSocket connection failed, continuing to listen for reconnection"
@@ -106,7 +107,7 @@ const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;
106107
// const STATE_SYNC_MAX_RETRIES: u64 = 12;
107108

108109
#[derive(Error, Debug)]
109-
pub enum BlockScannerError {
110+
pub enum Error {
110111
#[error("HTTP request failed: {0}")]
111112
HttpError(#[from] reqwest::Error),
112113

@@ -140,19 +141,19 @@ pub enum BlockScannerError {
140141
#[derive(Debug)]
141142
pub enum Command {
142143
Subscribe {
143-
sender: mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>,
144+
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
144145
start_height: BlockNumberOrTag,
145146
end_height: Option<BlockNumberOrTag>,
146-
response: oneshot::Sender<Result<(), BlockScannerError>>,
147+
response: oneshot::Sender<Result<(), Error>>,
147148
},
148149
Unsubscribe {
149-
response: oneshot::Sender<Result<(), BlockScannerError>>,
150+
response: oneshot::Sender<Result<(), Error>>,
150151
},
151152
GetStatus {
152153
response: oneshot::Sender<ServiceStatus>,
153154
},
154155
Shutdown {
155-
response: oneshot::Sender<Result<(), BlockScannerError>>,
156+
response: oneshot::Sender<Result<(), Error>>,
156157
},
157158
}
158159

@@ -243,10 +244,10 @@ impl BlockRangeScanner {
243244
pub async fn connect_ws<N: Network>(
244245
self,
245246
ws_url: Url,
246-
) -> TransportResult<ConnectedBlockScanner<N>> {
247+
) -> TransportResult<ConnectedBlockRangeScanner<N>> {
247248
let provider =
248249
RootProvider::<N>::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?);
249-
Ok(ConnectedBlockScanner {
250+
Ok(ConnectedBlockRangeScanner {
250251
provider,
251252
config: Config {
252253
blocks_read_per_epoch: self.blocks_read_per_epoch,
@@ -265,9 +266,9 @@ impl BlockRangeScanner {
265266
pub async fn connect_ipc<N: Network>(
266267
self,
267268
ipc_path: String,
268-
) -> TransportResult<ConnectedBlockScanner<N>> {
269+
) -> TransportResult<ConnectedBlockRangeScanner<N>> {
269270
let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
270-
Ok(ConnectedBlockScanner {
271+
Ok(ConnectedBlockRangeScanner {
271272
provider,
272273
config: Config {
273274
blocks_read_per_epoch: self.blocks_read_per_epoch,
@@ -279,12 +280,12 @@ impl BlockRangeScanner {
279280
}
280281
}
281282

282-
pub struct ConnectedBlockScanner<N: Network> {
283+
pub struct ConnectedBlockRangeScanner<N: Network> {
283284
provider: RootProvider<N>,
284285
config: Config,
285286
}
286287

287-
impl<N: Network> ConnectedBlockScanner<N> {
288+
impl<N: Network> ConnectedBlockRangeScanner<N> {
288289
/// Returns the underlying Provider.
289290
#[must_use]
290291
pub fn provider(&self) -> &impl Provider<N> {
@@ -297,20 +298,19 @@ impl<N: Network> ConnectedBlockScanner<N> {
297298
/// # Errors
298299
///
299300
/// Returns an error if the subscription service fails to start.
300-
pub fn run(&self) -> anyhow::Result<BlockScannerClient> {
301-
let (service, cmd_tx) =
302-
BlockScannerService::new(self.config.clone(), self.provider.clone());
301+
pub fn run(&self) -> anyhow::Result<BlockRangeScannerClient> {
302+
let (service, cmd_tx) = Service::new(self.config.clone(), self.provider.clone());
303303
tokio::spawn(async move {
304304
service.run().await;
305305
});
306-
Ok(BlockScannerClient::new(cmd_tx))
306+
Ok(BlockRangeScannerClient::new(cmd_tx))
307307
}
308308
}
309309

310-
struct BlockScannerService<N: Network> {
310+
struct Service<N: Network> {
311311
config: Config,
312312
provider: RootProvider<N>,
313-
subscriber: Option<mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>>,
313+
subscriber: Option<mpsc::Sender<Result<Range<BlockNumber>, Error>>>,
314314
current: Option<BlockHashAndNumber>,
315315
websocket_connected: bool,
316316
processed_count: u64,
@@ -319,7 +319,7 @@ struct BlockScannerService<N: Network> {
319319
shutdown: bool,
320320
}
321321

322-
impl<N: Network> BlockScannerService<N> {
322+
impl<N: Network> Service<N> {
323323
pub fn new(config: Config, provider: RootProvider<N>) -> (Self, mpsc::Sender<Command>) {
324324
let (cmd_tx, cmd_rx) = mpsc::channel(100);
325325

@@ -360,7 +360,7 @@ impl<N: Network> BlockScannerService<N> {
360360
info!("Subscription service stopped");
361361
}
362362

363-
async fn handle_command(&mut self, command: Command) -> Result<(), BlockScannerError> {
363+
async fn handle_command(&mut self, command: Command) -> Result<(), Error> {
364364
match command {
365365
Command::Subscribe { sender, start_height, end_height, response } => {
366366
let result = self.handle_subscribe(sender, start_height, end_height).await;
@@ -385,12 +385,12 @@ impl<N: Network> BlockScannerService<N> {
385385

386386
async fn handle_subscribe(
387387
&mut self,
388-
sender: mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>,
388+
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
389389
start_height: BlockNumberOrTag,
390390
end_height: Option<BlockNumberOrTag>,
391-
) -> Result<(), BlockScannerError> {
391+
) -> Result<(), Error> {
392392
if self.subscriber.is_some() {
393-
return Err(BlockScannerError::MultipleSubscribers);
393+
return Err(Error::MultipleSubscribers);
394394
}
395395

396396
// TODO: update local state relate to reorg and validate data
@@ -407,7 +407,7 @@ impl<N: Network> BlockScannerService<N> {
407407
&mut self,
408408
start_height: BlockNumberOrTag,
409409
end_height: Option<BlockNumberOrTag>,
410-
) -> Result<(), BlockScannerError> {
410+
) -> Result<(), Error> {
411411
// Step 1: Establish WebSocket connection
412412
let (buffer_tx, buffer_rx) = mpsc::channel(MAX_BUFFERED_MESSAGES);
413413

@@ -451,15 +451,15 @@ impl<N: Network> BlockScannerService<N> {
451451
if let Err(e) = self.sync_historical_data(start_block, sync_end_block).await {
452452
warn!("aborting ws_task");
453453
ws_task.abort();
454-
return Err(BlockScannerError::HistoricalSyncError(e.to_string()));
454+
return Err(Error::HistoricalSyncError(e.to_string()));
455455
}
456456

457457
// Step 3: Process buffered WebSocket messages
458458
let sender = self.subscriber.clone().expect("subscriber should be set");
459459
tokio::spawn(async move {
460460
if end_height.is_none() {
461461
Self::process_buffered_messages(buffer_rx, sender, cutoff).await;
462-
} else if sender.send(Err(BlockScannerError::Eof)).await.is_err() {
462+
} else if sender.send(Err(Error::Eof)).await.is_err() {
463463
warn!("Subscriber channel closed, cleaning up");
464464
}
465465
});
@@ -477,7 +477,7 @@ impl<N: Network> BlockScannerService<N> {
477477
&mut self,
478478
start: N::BlockResponse,
479479
end: N::BlockResponse,
480-
) -> Result<(), BlockScannerError> {
480+
) -> Result<(), Error> {
481481
let mut batch_count = 0;
482482

483483
self.current = Some(BlockHashAndNumber::from_header::<N>(start.header()));
@@ -514,7 +514,7 @@ impl<N: Network> BlockScannerService<N> {
514514
Ok(())
515515
}
516516

517-
async fn ensure_current_not_reorged(&mut self) -> Result<(), BlockScannerError> {
517+
async fn ensure_current_not_reorged(&mut self) -> Result<(), Error> {
518518
let current_block =
519519
self.provider.get_block_by_hash(self.current.as_ref().unwrap().hash).await?;
520520
if current_block.is_some() {
@@ -524,7 +524,7 @@ impl<N: Network> BlockScannerService<N> {
524524
self.rewind_on_reorg_detected().await
525525
}
526526

527-
async fn rewind_on_reorg_detected(&mut self) -> Result<(), BlockScannerError> {
527+
async fn rewind_on_reorg_detected(&mut self) -> Result<(), Error> {
528528
let mut new_current_height =
529529
if self.current.as_ref().unwrap().number <= self.config.reorg_rewind_depth {
530530
0
@@ -596,7 +596,7 @@ impl<N: Network> BlockScannerService<N> {
596596

597597
async fn process_buffered_messages(
598598
mut buffer_rx: mpsc::Receiver<Range<BlockNumber>>,
599-
sender: mpsc::Sender<Result<Range<BlockNumber>, BlockScannerError>>,
599+
sender: mpsc::Sender<Result<Range<BlockNumber>, Error>>,
600600
cutoff: BlockNumber,
601601
) {
602602
let mut processed = 0;
@@ -631,16 +631,14 @@ impl<N: Network> BlockScannerService<N> {
631631

632632
async fn get_block_subscription(
633633
provider: &impl Provider<N>,
634-
) -> Result<Subscription<N::HeaderResponse>, BlockScannerError> {
635-
let ws_stream = provider
636-
.subscribe_blocks()
637-
.await
638-
.map_err(|_| BlockScannerError::WebSocketConnectionFailed(1))?;
634+
) -> Result<Subscription<N::HeaderResponse>, Error> {
635+
let ws_stream =
636+
provider.subscribe_blocks().await.map_err(|_| Error::WebSocketConnectionFailed(1))?;
639637

640638
Ok(ws_stream)
641639
}
642640

643-
async fn send_to_subscriber(&mut self, result: Result<Range<BlockNumber>, BlockScannerError>) {
641+
async fn send_to_subscriber(&mut self, result: Result<Range<BlockNumber>, Error>) {
644642
if let Some(ref sender) = self.subscriber {
645643
if sender.send(result).await.is_err() {
646644
self.subscriber = None;
@@ -669,11 +667,11 @@ impl<N: Network> BlockScannerService<N> {
669667
}
670668
}
671669

672-
pub struct BlockScannerClient {
670+
pub struct BlockRangeScannerClient {
673671
command_sender: mpsc::Sender<Command>,
674672
}
675673

676-
impl BlockScannerClient {
674+
impl BlockRangeScannerClient {
677675
/// Creates a new subscription client.
678676
///
679677
/// # Arguments
@@ -693,13 +691,12 @@ impl BlockScannerClient {
693691
///
694692
/// # Errors
695693
///
696-
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
694+
/// * `Error::ServiceShutdown` - if the service is already shutting down.
697695
pub async fn subscribe(
698696
&self,
699697
start_height: BlockNumberOrTag,
700698
end_height: Option<BlockNumberOrTag>,
701-
) -> Result<ReceiverStream<Result<Range<BlockNumber>, BlockScannerError>>, BlockScannerError>
702-
{
699+
) -> Result<ReceiverStream<Result<Range<BlockNumber>, Error>>, Error> {
703700
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
704701
let (response_tx, response_rx) = oneshot::channel();
705702

@@ -710,9 +707,9 @@ impl BlockScannerClient {
710707
response: response_tx,
711708
};
712709

713-
self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
710+
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;
714711

715-
response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)??;
712+
response_rx.await.map_err(|_| Error::ServiceShutdown)??;
716713

717714
let stream = ReceiverStream::new(blocks_receiver);
718715

@@ -723,45 +720,45 @@ impl BlockScannerClient {
723720
///
724721
/// # Errors
725722
///
726-
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
727-
pub async fn unsubscribe(&self) -> Result<(), BlockScannerError> {
723+
/// * `Error::ServiceShutdown` - if the service is already shutting down.
724+
pub async fn unsubscribe(&self) -> Result<(), Error> {
728725
let (response_tx, response_rx) = oneshot::channel();
729726

730727
let command = Command::Unsubscribe { response: response_tx };
731728

732-
self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
729+
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;
733730

734-
response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)?
731+
response_rx.await.map_err(|_| Error::ServiceShutdown)?
735732
}
736733

737734
/// Returns the current status of the subscription service.
738735
///
739736
/// # Errors
740737
///
741-
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
742-
pub async fn get_status(&self) -> Result<ServiceStatus, BlockScannerError> {
738+
/// * `Error::ServiceShutdown` - if the service is already shutting down.
739+
pub async fn get_status(&self) -> Result<ServiceStatus, Error> {
743740
let (response_tx, response_rx) = oneshot::channel();
744741

745742
let command = Command::GetStatus { response: response_tx };
746743

747-
self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
744+
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;
748745

749-
response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)
746+
response_rx.await.map_err(|_| Error::ServiceShutdown)
750747
}
751748

752749
/// Shuts down the subscription service and unsubscribes the current subscriber.
753750
///
754751
/// # Errors
755752
///
756-
/// * `BlockScannerError::ServiceShutdown` - if the service is already shutting down.
757-
pub async fn shutdown(&self) -> Result<(), BlockScannerError> {
753+
/// * `Error::ServiceShutdown` - if the service is already shutting down.
754+
pub async fn shutdown(&self) -> Result<(), Error> {
758755
let (response_tx, response_rx) = oneshot::channel();
759756

760757
let command = Command::Shutdown { response: response_tx };
761758

762-
self.command_sender.send(command).await.map_err(|_| BlockScannerError::ServiceShutdown)?;
759+
self.command_sender.send(command).await.map_err(|_| Error::ServiceShutdown)?;
763760

764-
response_rx.await.map_err(|_| BlockScannerError::ServiceShutdown)?
761+
response_rx.await.map_err(|_| Error::ServiceShutdown)?
765762
}
766763
}
767764

src/event_scanner.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use crate::{
4-
block_range_scanner::{BlockRangeScanner, BlockScannerError, ConnectedBlockScanner},
4+
block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner},
55
callback::strategy::{CallbackStrategy, StateSyncAwareStrategy},
66
types::EventFilter,
77
};
@@ -100,7 +100,7 @@ impl EventScannerBuilder {
100100
pub async fn connect_ws<N: Network>(
101101
self,
102102
ws_url: Url,
103-
) -> Result<EventScanner<N>, BlockScannerError> {
103+
) -> Result<EventScanner<N>, block_range_scanner::Error> {
104104
let block_range_scanner = self.block_range_scanner.connect_ws(ws_url).await?;
105105
Ok(EventScanner {
106106
block_range_scanner,
@@ -117,7 +117,7 @@ impl EventScannerBuilder {
117117
pub async fn connect_ipc<N: Network>(
118118
self,
119119
ipc_path: impl Into<String>,
120-
) -> Result<EventScanner<N>, BlockScannerError> {
120+
) -> Result<EventScanner<N>, block_range_scanner::Error> {
121121
let block_range_scanner = self.block_range_scanner.connect_ipc(ipc_path.into()).await?;
122122
Ok(EventScanner {
123123
block_range_scanner,
@@ -134,7 +134,7 @@ impl EventScannerBuilder {
134134
}
135135

136136
pub struct EventScanner<N: Network> {
137-
block_range_scanner: ConnectedBlockScanner<N>,
137+
block_range_scanner: ConnectedBlockRangeScanner<N>,
138138
tracked_events: Vec<EventFilter>,
139139
callback_strategy: Arc<dyn CallbackStrategy>,
140140
}

0 commit comments

Comments
 (0)