Skip to content

Commit 9404d5d

Browse files
committed
Revert "remove subsc. lagged error variant from scannererror"
This reverts commit 8763c45.
1 parent 8763c45 commit 9404d5d

File tree

4 files changed

+49
-75
lines changed

4 files changed

+49
-75
lines changed

src/block_range_scanner.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
//! }
6060
//! ```
6161
62-
use crate::robust_provider::subscription::{self, RobustSubscription};
62+
use crate::robust_provider::subscription::RobustSubscription;
6363
use std::{cmp::Ordering, ops::RangeInclusive};
6464
use tokio::{
6565
sync::{mpsc, oneshot},
@@ -600,25 +600,9 @@ impl<N: Network> Service<N> {
600600
Ok(block) => block,
601601
Err(e) => {
602602
error!(error = %e, "Error receiving block from stream");
603-
match e {
604-
subscription::Error::Lagged(_) => {
605-
// scanner already accounts for skipped block numbers
606-
// next block will be the actual incoming block
607-
continue;
608-
}
609-
subscription::Error::Timeout => {
610-
_ = sender.try_stream(ScannerError::Timeout).await;
611-
return;
612-
}
613-
subscription::Error::RpcError(rpc_err) => {
614-
_ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
615-
return;
616-
}
617-
subscription::Error::Closed => {
618-
_ = sender.try_stream(ScannerError::SubscriptionClosed).await;
619-
return;
620-
}
621-
}
603+
// Error from subscription, exit the stream
604+
_ = sender.try_stream(e).await;
605+
return;
622606
}
623607
};
624608

src/error.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ use alloy::{
66
};
77
use thiserror::Error;
88

9-
use crate::{robust_provider::provider::Error as RobustProviderError, types::ScannerResult};
9+
use crate::{
10+
robust_provider::{
11+
provider::Error as RobustProviderError, subscription::Error as RobustSubscriptionError,
12+
},
13+
types::ScannerResult,
14+
};
1015

1116
#[derive(Error, Debug, Clone)]
1217
pub enum ScannerError {
@@ -33,6 +38,9 @@ pub enum ScannerError {
3338

3439
#[error("Subscription closed")]
3540
SubscriptionClosed,
41+
42+
#[error("Subscription Lagged by {0}")]
43+
SubscriptionLagged(u64),
3644
}
3745

3846
impl From<RobustProviderError> for ScannerError {
@@ -45,12 +53,22 @@ impl From<RobustProviderError> for ScannerError {
4553
}
4654
}
4755

56+
impl From<RobustSubscriptionError> for ScannerError {
57+
fn from(error: RobustSubscriptionError) -> ScannerError {
58+
match error {
59+
RobustSubscriptionError::Timeout => ScannerError::Timeout,
60+
RobustSubscriptionError::RpcError(err) => ScannerError::RpcError(err),
61+
RobustSubscriptionError::Closed => ScannerError::SubscriptionClosed,
62+
RobustSubscriptionError::Lagged(count) => ScannerError::SubscriptionLagged(count),
63+
}
64+
}
65+
}
66+
4867
impl From<RpcError<TransportErrorKind>> for ScannerError {
4968
fn from(error: RpcError<TransportErrorKind>) -> Self {
5069
ScannerError::RpcError(Arc::new(error))
5170
}
5271
}
53-
5472
impl<T: Clone> PartialEq<ScannerError> for ScannerResult<T> {
5573
fn eq(&self, other: &ScannerError) -> bool {
5674
match self {

src/robust_provider/provider.rs

Lines changed: 14 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,36 +26,6 @@ pub enum Error {
2626
BlockNotFound(BlockId),
2727
}
2828

29-
/// Errors that can occur when using [`RobustProvider`].
30-
#[derive(Error, Debug, Clone)]
31-
pub(crate) enum CommonError {
32-
#[error("Operation timed out")]
33-
Timeout,
34-
#[error("RPC call failed after exhausting all retry attempts: {0}")]
35-
RpcError(Arc<RpcError<TransportErrorKind>>),
36-
}
37-
38-
impl From<RpcError<TransportErrorKind>> for CommonError {
39-
fn from(err: RpcError<TransportErrorKind>) -> Self {
40-
CommonError::RpcError(Arc::new(err))
41-
}
42-
}
43-
44-
impl From<CommonError> for Error {
45-
fn from(err: CommonError) -> Self {
46-
match err {
47-
CommonError::Timeout => Error::Timeout,
48-
CommonError::RpcError(e) => Error::RpcError(e),
49-
}
50-
}
51-
}
52-
53-
impl From<TokioError::Elapsed> for CommonError {
54-
fn from(_: TokioError::Elapsed) -> Self {
55-
CommonError::Timeout
56-
}
57-
}
58-
5929
impl From<RpcError<TransportErrorKind>> for Error {
6030
fn from(err: RpcError<TransportErrorKind>) -> Self {
6131
Error::RpcError(Arc::new(err))
@@ -164,8 +134,7 @@ impl<N: Network> RobustProvider<N> {
164134
move |provider| async move { provider.get_block_number().await },
165135
false,
166136
)
167-
.await
168-
.map_err(Error::from);
137+
.await;
169138
if let Err(e) = &result {
170139
error!(error = %e, "eth_getBlockNumber failed");
171140
}
@@ -253,8 +222,7 @@ impl<N: Network> RobustProvider<N> {
253222
move |provider| async move { provider.get_logs(filter).await },
254223
false,
255224
)
256-
.await
257-
.map_err(Error::from);
225+
.await;
258226
if let Err(e) = &result {
259227
error!(error = %e, "eth_getLogs failed");
260228
}
@@ -288,7 +256,7 @@ impl<N: Network> RobustProvider<N> {
288256
Ok(sub) => Ok(RobustSubscription::new(sub, self.clone())),
289257
Err(e) => {
290258
error!(error = %e, "eth_subscribe failed");
291-
Err(e.into())
259+
Err(e)
292260
}
293261
}
294262
}
@@ -317,7 +285,7 @@ impl<N: Network> RobustProvider<N> {
317285
&self,
318286
operation: F,
319287
require_pubsub: bool,
320-
) -> Result<T, CommonError>
288+
) -> Result<T, Error>
321289
where
322290
F: Fn(RootProvider<N>) -> Fut,
323291
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -338,8 +306,8 @@ impl<N: Network> RobustProvider<N> {
338306
&self,
339307
operation: F,
340308
require_pubsub: bool,
341-
last_error: CommonError,
342-
) -> Result<T, CommonError>
309+
last_error: Error,
310+
) -> Result<T, Error>
343311
where
344312
F: Fn(RootProvider<N>) -> Fut,
345313
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -353,9 +321,9 @@ impl<N: Network> RobustProvider<N> {
353321
&self,
354322
operation: F,
355323
require_pubsub: bool,
356-
mut last_error: CommonError,
324+
mut last_error: Error,
357325
start_index: usize,
358-
) -> Result<(T, usize), CommonError>
326+
) -> Result<(T, usize), Error>
359327
where
360328
F: Fn(RootProvider<N>) -> Fut,
361329
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -394,7 +362,7 @@ impl<N: Network> RobustProvider<N> {
394362
&self,
395363
provider: &RootProvider<N>,
396364
operation: F,
397-
) -> Result<T, CommonError>
365+
) -> Result<T, Error>
398366
where
399367
F: Fn(RootProvider<N>) -> Fut,
400368
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -413,8 +381,8 @@ impl<N: Network> RobustProvider<N> {
413381
.sleep(tokio::time::sleep),
414382
)
415383
.await
416-
.map_err(CommonError::from)?
417-
.map_err(CommonError::from)
384+
.map_err(Error::from)?
385+
.map_err(Error::from)
418386
}
419387

420388
/// Check if a provider supports pubsub
@@ -516,7 +484,7 @@ mod tests {
516484

517485
let call_count = AtomicUsize::new(0);
518486

519-
let result: Result<(), CommonError> = provider
487+
let result: Result<(), Error> = provider
520488
.try_operation_with_failover(
521489
|_| async {
522490
call_count.fetch_add(1, Ordering::SeqCst);
@@ -526,7 +494,7 @@ mod tests {
526494
)
527495
.await;
528496

529-
assert!(matches!(result, Err(CommonError::RpcError(_))));
497+
assert!(matches!(result, Err(Error::RpcError(_))));
530498
assert_eq!(call_count.load(Ordering::SeqCst), 3);
531499
}
532500

@@ -545,7 +513,7 @@ mod tests {
545513
)
546514
.await;
547515

548-
assert!(matches!(result, Err(CommonError::Timeout)));
516+
assert!(matches!(result, Err(Error::Timeout)));
549517
}
550518

551519
#[tokio::test]

src/robust_provider/subscription.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tokio_stream::Stream;
1717
use tokio_util::sync::ReusableBoxFuture;
1818
use tracing::{error, info, warn};
1919

20-
use crate::robust_provider::{RobustProvider, provider::CommonError};
20+
use crate::robust_provider::{RobustProvider, provider::Error as ProviderError};
2121

2222
/// Errors that can occur when using [`RobustSubscription`].
2323
#[derive(Error, Debug, Clone)]
@@ -32,11 +32,15 @@ pub enum Error {
3232
Lagged(u64),
3333
}
3434

35-
impl From<CommonError> for Error {
36-
fn from(err: CommonError) -> Self {
35+
impl From<ProviderError> for Error {
36+
fn from(err: ProviderError) -> Self {
3737
match err {
38-
CommonError::Timeout => Error::Timeout,
39-
CommonError::RpcError(e) => Error::RpcError(e),
38+
ProviderError::Timeout => Error::Timeout,
39+
ProviderError::RpcError(e) => Error::RpcError(e),
40+
ProviderError::BlockNotFound(_) => {
41+
// This shouldn't happen in subscription context, but we need to handle it
42+
Error::RpcError(Arc::new(RpcError::NullResp))
43+
}
4044
}
4145
}
4246
}
@@ -180,7 +184,7 @@ impl<N: Network> RobustSubscription<N> {
180184
}
181185
}
182186

183-
async fn switch_to_fallback(&mut self, last_error: CommonError) -> Result<(), Error> {
187+
async fn switch_to_fallback(&mut self, last_error: Error) -> Result<(), Error> {
184188
// If we're on a fallback, try primary first before moving to next fallback
185189
if self.is_on_fallback() && self.try_reconnect_to_primary(true).await {
186190
return Ok(());
@@ -198,7 +202,7 @@ impl<N: Network> RobustSubscription<N> {
198202

199203
let (sub, fallback_idx) = self
200204
.robust_provider
201-
.try_fallback_providers_from(&operation, true, last_error, start_index)
205+
.try_fallback_providers_from(&operation, true, last_error.into(), start_index)
202206
.await?;
203207

204208
self.subscription = sub;

0 commit comments

Comments
 (0)