Skip to content

Commit 2498939

Browse files
committed
feat: shared error enum accross both scanners
1 parent ddaa921 commit 2498939

File tree

10 files changed

+174
-213
lines changed

10 files changed

+174
-213
lines changed

src/block_range_scanner.rs

Lines changed: 76 additions & 178 deletions
Large diffs are not rendered by default.

src/error.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use std::{ops::RangeInclusive, sync::Arc};
2+
3+
use alloy::{
4+
eips::BlockNumberOrTag,
5+
primitives::BlockNumber,
6+
transports::{RpcError, TransportErrorKind, http::reqwest},
7+
};
8+
use thiserror::Error;
9+
10+
use crate::block_range_scanner::Message;
11+
12+
#[derive(Error, Debug, Clone)]
13+
pub enum ScannerError {
14+
#[error("HTTP request failed: {0}")]
15+
HttpError(Arc<reqwest::Error>),
16+
17+
// #[error("WebSocket error: {0}")]
18+
// WebSocketError(#[from] tokio_tungstenite::tungstenite::Error),
19+
#[error("Serialization error: {0}")]
20+
SerializationError(Arc<serde_json::Error>),
21+
22+
#[error("RPC error: {0}")]
23+
RpcError(Arc<RpcError<TransportErrorKind>>),
24+
25+
#[error("Channel send error")]
26+
ChannelError,
27+
28+
#[error("Service is shutting down")]
29+
ServiceShutdown,
30+
31+
#[error("Only one subscriber allowed at a time")]
32+
MultipleSubscribers,
33+
34+
#[error("No subscriber set for streaming")]
35+
NoSubscriber,
36+
37+
#[error("Historical sync failed: {0}")]
38+
HistoricalSyncError(String),
39+
40+
#[error("WebSocket connection failed after {0} attempts")]
41+
WebSocketConnectionFailed(usize),
42+
43+
#[error("Block not found, block number: {0}")]
44+
BlockNotFound(BlockNumberOrTag),
45+
}
46+
47+
impl From<Result<RangeInclusive<BlockNumber>, ScannerError>> for Message {
48+
fn from(logs: Result<RangeInclusive<BlockNumber>, ScannerError>) -> Self {
49+
match logs {
50+
Ok(logs) => Message::Data(logs),
51+
Err(e) => Message::Error(e),
52+
}
53+
}
54+
}
55+
56+
impl From<reqwest::Error> for ScannerError {
57+
fn from(error: reqwest::Error) -> Self {
58+
ScannerError::HttpError(Arc::new(error))
59+
}
60+
}
61+
62+
impl From<serde_json::Error> for ScannerError {
63+
fn from(error: serde_json::Error) -> Self {
64+
ScannerError::SerializationError(Arc::new(error))
65+
}
66+
}
67+
68+
impl From<RpcError<TransportErrorKind>> for ScannerError {
69+
fn from(error: RpcError<TransportErrorKind>) -> Self {
70+
ScannerError::RpcError(Arc::new(error))
71+
}
72+
}
73+
74+
impl From<ScannerError> for Message {
75+
fn from(error: ScannerError) -> Self {
76+
Message::Error(error)
77+
}
78+
}

src/event_scanner/error.rs

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,19 @@
1-
use std::sync::Arc;
2-
31
use alloy::{
42
rpc::types::Log,
53
transports::{RpcError, TransportErrorKind},
64
};
7-
use thiserror::Error;
8-
9-
use crate::{block_range_scanner::BlockRangeScannerError, event_scanner::message::Message};
10-
11-
#[derive(Error, Debug, Clone)]
12-
pub enum EventScannerError {
13-
#[error("Block range scanner error: {0}")]
14-
BlockRangeScanner(#[from] BlockRangeScannerError),
15-
#[error("Provider error: {0}")]
16-
Provider(Arc<RpcError<TransportErrorKind>>),
17-
}
185

19-
impl From<RpcError<TransportErrorKind>> for EventScannerError {
20-
fn from(e: RpcError<TransportErrorKind>) -> Self {
21-
EventScannerError::Provider(Arc::new(e))
22-
}
23-
}
6+
use crate::{Message, ScannerError};
247

258
impl From<RpcError<TransportErrorKind>> for Message {
269
fn from(e: RpcError<TransportErrorKind>) -> Self {
2710
Message::Error(e.into())
2811
}
2912
}
3013

31-
impl From<BlockRangeScannerError> for Message {
32-
fn from(e: BlockRangeScannerError) -> Self {
33-
Message::Error(e.into())
14+
impl From<ScannerError> for Message {
15+
fn from(error: ScannerError) -> Self {
16+
Message::Error(error)
3417
}
3518
}
3619

src/event_scanner/message.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use alloy::{rpc::types::Log, sol_types::SolEvent};
22

3-
use crate::{EventScannerError, ScannerMessage};
3+
use crate::{ScannerError, ScannerMessage};
44

5-
pub type Message = ScannerMessage<Vec<Log>, EventScannerError>;
5+
pub type Message = ScannerMessage<Vec<Log>, ScannerError>;
66

77
impl From<Vec<Log>> for Message {
88
fn from(logs: Vec<Log>) -> Self {

src/event_scanner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub mod listener;
44
pub mod message;
55
pub mod modes;
66

7-
pub use error::EventScannerError;
7+
// pub use error::EventScannerError;
88
pub use filter::EventFilter;
99
pub use message::Message;
1010
pub use modes::{

src/event_scanner/modes/historic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use tokio::sync::mpsc;
99
use tokio_stream::wrappers::ReceiverStream;
1010

1111
use crate::{
12+
ScannerError,
1213
block_range_scanner::{BlockRangeScanner, ConnectedBlockRangeScanner, MAX_BUFFERED_MESSAGES},
1314
event_scanner::{
14-
EventScannerError,
1515
filter::EventFilter,
1616
listener::EventListener,
1717
message::Message,
@@ -119,7 +119,7 @@ impl<N: Network> HistoricEventScanner<N> {
119119
/// # Errors
120120
///
121121
/// - `EventScannerMessage::ServiceShutdown` if the service is already shutting down.
122-
pub async fn start(self) -> Result<(), EventScannerError> {
122+
pub async fn start(self) -> Result<(), ScannerError> {
123123
let client = self.block_range_scanner.run()?;
124124
let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?;
125125
handle_stream(

src/event_scanner/modes/latest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use tokio::sync::mpsc;
99
use tokio_stream::wrappers::ReceiverStream;
1010

1111
use crate::{
12+
ScannerError,
1213
block_range_scanner::{
1314
BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS,
1415
MAX_BUFFERED_MESSAGES,
1516
},
1617
event_scanner::{
17-
EventScannerError,
1818
filter::EventFilter,
1919
listener::EventListener,
2020
message::Message,
@@ -145,7 +145,7 @@ impl<N: Network> LatestEventScanner<N> {
145145
/// - Returns `EventScannerError` if the scanner fails to start or fetching logs fails.
146146
///
147147
/// [`ScannerStatus::ReorgDetected`]: crate::types::ScannerStatus::ReorgDetected
148-
pub async fn start(self) -> Result<(), EventScannerError> {
148+
pub async fn start(self) -> Result<(), ScannerError> {
149149
let client = self.block_range_scanner.run()?;
150150
let stream = client.rewind(self.config.from_block, self.config.to_block).await?;
151151
handle_stream(

src/event_scanner/modes/live.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use tokio::sync::mpsc;
88
use tokio_stream::wrappers::ReceiverStream;
99

1010
use crate::{
11+
ScannerError,
1112
block_range_scanner::{
1213
BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS,
1314
MAX_BUFFERED_MESSAGES,
1415
},
1516
event_scanner::{
16-
EventScannerError,
1717
filter::EventFilter,
1818
listener::EventListener,
1919
message::Message,
@@ -115,7 +115,7 @@ impl<N: Network> LiveEventScanner<N> {
115115
/// # Errors
116116
///
117117
/// - `EventScannerMessage::ServiceShutdown` if the service is already shutting down.
118-
pub async fn start(self) -> Result<(), EventScannerError> {
118+
pub async fn start(self) -> Result<(), ScannerError> {
119119
let client = self.block_range_scanner.run()?;
120120
let stream = client.stream_live(self.config.block_confirmations).await?;
121121
handle_stream(

src/event_scanner/modes/sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use tokio::sync::mpsc;
99
use tokio_stream::wrappers::ReceiverStream;
1010

1111
use crate::{
12+
ScannerError,
1213
block_range_scanner::{
1314
BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS,
1415
MAX_BUFFERED_MESSAGES,
1516
},
1617
event_scanner::{
17-
EventScannerError,
1818
filter::EventFilter,
1919
listener::EventListener,
2020
message::Message,
@@ -124,7 +124,7 @@ impl<N: Network> SyncEventScanner<N> {
124124
/// # Errors
125125
///
126126
/// - `EventScannerMessage::ServiceShutdown` if the service is already shutting down.
127-
pub async fn start(self) -> Result<(), EventScannerError> {
127+
pub async fn start(self) -> Result<(), ScannerError> {
128128
let client = self.block_range_scanner.run()?;
129129
let stream =
130130
client.stream_from(self.config.from_block, self.config.block_confirmations).await?;

src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
pub mod block_range_scanner;
2+
pub mod error;
23
pub mod event_scanner;
34
#[cfg(any(test, feature = "test-utils"))]
45
pub mod test_utils;
56
pub mod types;
67

8+
pub use error::ScannerError;
79
pub use types::{ScannerMessage, ScannerStatus};
810

911
pub use event_scanner::{
10-
EventFilter, EventScanner, EventScannerError, HistoricEventScanner, HistoricScannerBuilder,
11-
LatestEventScanner, LatestScannerBuilder, LiveEventScanner, LiveScannerBuilder, Message,
12-
SyncEventScanner, SyncScannerBuilder,
12+
EventFilter, EventScanner, HistoricEventScanner, HistoricScannerBuilder, LatestEventScanner,
13+
LatestScannerBuilder, LiveEventScanner, LiveScannerBuilder, Message, SyncEventScanner,
14+
SyncScannerBuilder,
1315
};

0 commit comments

Comments
 (0)