Skip to content

Commit 8763c45

Browse files
committed
remove subsc. lagged error variant from scannererror
1 parent b7506bb commit 8763c45

File tree

4 files changed

+75
-49
lines changed

4 files changed

+75
-49
lines changed

src/block_range_scanner.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
//! }
6060
//! ```
6161
62-
use crate::robust_provider::subscription::RobustSubscription;
62+
use crate::robust_provider::subscription::{self, RobustSubscription};
6363
use std::{cmp::Ordering, ops::RangeInclusive};
6464
use tokio::{
6565
sync::{mpsc, oneshot},
@@ -600,9 +600,25 @@ impl<N: Network> Service<N> {
600600
Ok(block) => block,
601601
Err(e) => {
602602
error!(error = %e, "Error receiving block from stream");
603-
// Error from subscription, exit the stream
604-
_ = sender.try_stream(e).await;
605-
return;
603+
match e {
604+
subscription::Error::Lagged(_) => {
605+
// scanner already accounts for skipped block numbers
606+
// next block will be the actual incoming block
607+
continue;
608+
}
609+
subscription::Error::Timeout => {
610+
_ = sender.try_stream(ScannerError::Timeout).await;
611+
return;
612+
}
613+
subscription::Error::RpcError(rpc_err) => {
614+
_ = sender.try_stream(ScannerError::RpcError(rpc_err)).await;
615+
return;
616+
}
617+
subscription::Error::Closed => {
618+
_ = sender.try_stream(ScannerError::SubscriptionClosed).await;
619+
return;
620+
}
621+
}
606622
}
607623
};
608624

src/error.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,7 @@ use alloy::{
66
};
77
use thiserror::Error;
88

9-
use crate::{
10-
robust_provider::{
11-
provider::Error as RobustProviderError, subscription::Error as RobustSubscriptionError,
12-
},
13-
types::ScannerResult,
14-
};
9+
use crate::{robust_provider::provider::Error as RobustProviderError, types::ScannerResult};
1510

1611
#[derive(Error, Debug, Clone)]
1712
pub enum ScannerError {
@@ -38,9 +33,6 @@ pub enum ScannerError {
3833

3934
#[error("Subscription closed")]
4035
SubscriptionClosed,
41-
42-
#[error("Subscription Lagged by {0}")]
43-
SubscriptionLagged(u64),
4436
}
4537

4638
impl From<RobustProviderError> for ScannerError {
@@ -53,22 +45,12 @@ impl From<RobustProviderError> for ScannerError {
5345
}
5446
}
5547

56-
impl From<RobustSubscriptionError> for ScannerError {
57-
fn from(error: RobustSubscriptionError) -> ScannerError {
58-
match error {
59-
RobustSubscriptionError::Timeout => ScannerError::Timeout,
60-
RobustSubscriptionError::RpcError(err) => ScannerError::RpcError(err),
61-
RobustSubscriptionError::Closed => ScannerError::SubscriptionClosed,
62-
RobustSubscriptionError::Lagged(count) => ScannerError::SubscriptionLagged(count),
63-
}
64-
}
65-
}
66-
6748
impl From<RpcError<TransportErrorKind>> for ScannerError {
6849
fn from(error: RpcError<TransportErrorKind>) -> Self {
6950
ScannerError::RpcError(Arc::new(error))
7051
}
7152
}
53+
7254
impl<T: Clone> PartialEq<ScannerError> for ScannerResult<T> {
7355
fn eq(&self, other: &ScannerError) -> bool {
7456
match self {

src/robust_provider/provider.rs

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,36 @@ pub enum Error {
2626
BlockNotFound(BlockId),
2727
}
2828

29+
/// Errors that can occur when using [`RobustProvider`].
30+
#[derive(Error, Debug, Clone)]
31+
pub(crate) enum CommonError {
32+
#[error("Operation timed out")]
33+
Timeout,
34+
#[error("RPC call failed after exhausting all retry attempts: {0}")]
35+
RpcError(Arc<RpcError<TransportErrorKind>>),
36+
}
37+
38+
impl From<RpcError<TransportErrorKind>> for CommonError {
39+
fn from(err: RpcError<TransportErrorKind>) -> Self {
40+
CommonError::RpcError(Arc::new(err))
41+
}
42+
}
43+
44+
impl From<CommonError> for Error {
45+
fn from(err: CommonError) -> Self {
46+
match err {
47+
CommonError::Timeout => Error::Timeout,
48+
CommonError::RpcError(e) => Error::RpcError(e),
49+
}
50+
}
51+
}
52+
53+
impl From<TokioError::Elapsed> for CommonError {
54+
fn from(_: TokioError::Elapsed) -> Self {
55+
CommonError::Timeout
56+
}
57+
}
58+
2959
impl From<RpcError<TransportErrorKind>> for Error {
3060
fn from(err: RpcError<TransportErrorKind>) -> Self {
3161
Error::RpcError(Arc::new(err))
@@ -134,7 +164,8 @@ impl<N: Network> RobustProvider<N> {
134164
move |provider| async move { provider.get_block_number().await },
135165
false,
136166
)
137-
.await;
167+
.await
168+
.map_err(Error::from);
138169
if let Err(e) = &result {
139170
error!(error = %e, "eth_getBlockNumber failed");
140171
}
@@ -222,7 +253,8 @@ impl<N: Network> RobustProvider<N> {
222253
move |provider| async move { provider.get_logs(filter).await },
223254
false,
224255
)
225-
.await;
256+
.await
257+
.map_err(Error::from);
226258
if let Err(e) = &result {
227259
error!(error = %e, "eth_getLogs failed");
228260
}
@@ -256,7 +288,7 @@ impl<N: Network> RobustProvider<N> {
256288
Ok(sub) => Ok(RobustSubscription::new(sub, self.clone())),
257289
Err(e) => {
258290
error!(error = %e, "eth_subscribe failed");
259-
Err(e)
291+
Err(e.into())
260292
}
261293
}
262294
}
@@ -285,7 +317,7 @@ impl<N: Network> RobustProvider<N> {
285317
&self,
286318
operation: F,
287319
require_pubsub: bool,
288-
) -> Result<T, Error>
320+
) -> Result<T, CommonError>
289321
where
290322
F: Fn(RootProvider<N>) -> Fut,
291323
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -306,8 +338,8 @@ impl<N: Network> RobustProvider<N> {
306338
&self,
307339
operation: F,
308340
require_pubsub: bool,
309-
last_error: Error,
310-
) -> Result<T, Error>
341+
last_error: CommonError,
342+
) -> Result<T, CommonError>
311343
where
312344
F: Fn(RootProvider<N>) -> Fut,
313345
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -321,9 +353,9 @@ impl<N: Network> RobustProvider<N> {
321353
&self,
322354
operation: F,
323355
require_pubsub: bool,
324-
mut last_error: Error,
356+
mut last_error: CommonError,
325357
start_index: usize,
326-
) -> Result<(T, usize), Error>
358+
) -> Result<(T, usize), CommonError>
327359
where
328360
F: Fn(RootProvider<N>) -> Fut,
329361
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -362,7 +394,7 @@ impl<N: Network> RobustProvider<N> {
362394
&self,
363395
provider: &RootProvider<N>,
364396
operation: F,
365-
) -> Result<T, Error>
397+
) -> Result<T, CommonError>
366398
where
367399
F: Fn(RootProvider<N>) -> Fut,
368400
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
@@ -381,8 +413,8 @@ impl<N: Network> RobustProvider<N> {
381413
.sleep(tokio::time::sleep),
382414
)
383415
.await
384-
.map_err(Error::from)?
385-
.map_err(Error::from)
416+
.map_err(CommonError::from)?
417+
.map_err(CommonError::from)
386418
}
387419

388420
/// Check if a provider supports pubsub
@@ -484,7 +516,7 @@ mod tests {
484516

485517
let call_count = AtomicUsize::new(0);
486518

487-
let result: Result<(), Error> = provider
519+
let result: Result<(), CommonError> = provider
488520
.try_operation_with_failover(
489521
|_| async {
490522
call_count.fetch_add(1, Ordering::SeqCst);
@@ -494,7 +526,7 @@ mod tests {
494526
)
495527
.await;
496528

497-
assert!(matches!(result, Err(Error::RpcError(_))));
529+
assert!(matches!(result, Err(CommonError::RpcError(_))));
498530
assert_eq!(call_count.load(Ordering::SeqCst), 3);
499531
}
500532

@@ -513,7 +545,7 @@ mod tests {
513545
)
514546
.await;
515547

516-
assert!(matches!(result, Err(Error::Timeout)));
548+
assert!(matches!(result, Err(CommonError::Timeout)));
517549
}
518550

519551
#[tokio::test]

src/robust_provider/subscription.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tokio_stream::Stream;
1717
use tokio_util::sync::ReusableBoxFuture;
1818
use tracing::{error, info, warn};
1919

20-
use crate::robust_provider::{RobustProvider, provider::Error as ProviderError};
20+
use crate::robust_provider::{RobustProvider, provider::CommonError};
2121

2222
/// Errors that can occur when using [`RobustSubscription`].
2323
#[derive(Error, Debug, Clone)]
@@ -32,15 +32,11 @@ pub enum Error {
3232
Lagged(u64),
3333
}
3434

35-
impl From<ProviderError> for Error {
36-
fn from(err: ProviderError) -> Self {
35+
impl From<CommonError> for Error {
36+
fn from(err: CommonError) -> Self {
3737
match err {
38-
ProviderError::Timeout => Error::Timeout,
39-
ProviderError::RpcError(e) => Error::RpcError(e),
40-
ProviderError::BlockNotFound(_) => {
41-
// This shouldn't happen in subscription context, but we need to handle it
42-
Error::RpcError(Arc::new(RpcError::NullResp))
43-
}
38+
CommonError::Timeout => Error::Timeout,
39+
CommonError::RpcError(e) => Error::RpcError(e),
4440
}
4541
}
4642
}
@@ -184,7 +180,7 @@ impl<N: Network> RobustSubscription<N> {
184180
}
185181
}
186182

187-
async fn switch_to_fallback(&mut self, last_error: Error) -> Result<(), Error> {
183+
async fn switch_to_fallback(&mut self, last_error: CommonError) -> Result<(), Error> {
188184
// If we're on a fallback, try primary first before moving to next fallback
189185
if self.is_on_fallback() && self.try_reconnect_to_primary(true).await {
190186
return Ok(());
@@ -202,7 +198,7 @@ impl<N: Network> RobustSubscription<N> {
202198

203199
let (sub, fallback_idx) = self
204200
.robust_provider
205-
.try_fallback_providers_from(&operation, true, last_error.into(), start_index)
201+
.try_fallback_providers_from(&operation, true, last_error, start_index)
206202
.await?;
207203

208204
self.subscription = sub;

0 commit comments

Comments
 (0)