Skip to content

Commit 41a891d

Browse files
committed
fix: rebase past changes
1 parent 9c3c648 commit 41a891d

File tree

6 files changed

+373
-42
lines changed

6 files changed

+373
-42
lines changed

src/block_range_scanner.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ use tokio::{
6767
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
6868

6969
use crate::{
70-
IntoRobustProvider, RobustProvider, ScannerMessage,
70+
IntoRobustProvider, RobustProvider, RobustSubscription, ScannerMessage,
7171
error::ScannerError,
7272
robust_provider::Error as RobustProviderError,
7373
types::{ScannerStatus, TryStream},
@@ -77,7 +77,6 @@ use alloy::{
7777
eips::BlockNumberOrTag,
7878
network::{BlockResponse, Network, primitives::HeaderResponse},
7979
primitives::{B256, BlockNumber},
80-
pubsub::Subscription,
8180
transports::{RpcError, TransportErrorKind},
8281
};
8382
use tracing::{debug, error, info, warn};
@@ -611,16 +610,29 @@ impl<N: Network> Service<N> {
611610

612611
async fn stream_live_blocks(
613612
mut range_start: BlockNumber,
614-
subscription: Subscription<N::HeaderResponse>,
613+
subscription: RobustSubscription<N>,
615614
sender: mpsc::Sender<Message>,
616615
block_confirmations: u64,
617616
max_block_range: u64,
618617
) {
619618
// ensure we start streaming only after the expected_next_block cutoff
620619
let cutoff = range_start;
621-
let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
620+
let mut stream = subscription.into_stream().skip_while(|result| match result {
621+
Ok(header) => header.number() < cutoff,
622+
Err(_) => false,
623+
});
624+
625+
while let Some(result) = stream.next().await {
626+
let incoming_block = match result {
627+
Ok(block) => block,
628+
Err(e) => {
629+
error!(error = %e, "Error receiving block from stream");
630+
// Error from subscription, exit the stream
631+
_ = sender.try_stream(e).await;
632+
return;
633+
}
634+
};
622635

623-
while let Some(incoming_block) = stream.next().await {
624636
let incoming_block_num = incoming_block.number();
625637
info!(block_number = incoming_block_num, "Received block header");
626638

src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,7 @@ pub use event_scanner::{
1616
SyncFromBlock, SyncFromLatestEvents,
1717
};
1818

19-
pub use robust_provider::{provider::RobustProvider, provider_conversion::IntoRobustProvider};
19+
pub use robust_provider::{
20+
provider::RobustProvider, provider_conversion::IntoRobustProvider,
21+
subscription::RobustSubscription,
22+
};

src/robust_provider/builder.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use crate::{
1010
// RPC retry and timeout settings
1111
/// Default timeout used by `RobustProvider`
1212
pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60);
13+
/// Default timeout for subscriptions (longer to accommodate slow block times)
14+
pub const DEFAULT_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(120);
1315
/// Default maximum number of retry attempts.
1416
pub const DEFAULT_MAX_RETRIES: usize = 3;
1517
/// Default base delay between retries.
@@ -19,27 +21,29 @@ pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);
1921
pub struct RobustProviderBuilder<N: Network, P: IntoProvider<N>> {
2022
providers: Vec<P>,
2123
max_timeout: Duration,
24+
subscription_timeout: Duration,
2225
max_retries: usize,
2326
min_delay: Duration,
2427
_network: PhantomData<N>,
2528
}
2629

2730
impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
28-
/// Create a new `RobustProvider` with default settings.
31+
/// Create a new [`RobustProvider`] with default settings.
2932
///
3033
/// The provided provider is treated as the primary provider.
3134
#[must_use]
3235
pub fn new(provider: P) -> Self {
3336
Self {
3437
providers: vec![provider],
3538
max_timeout: DEFAULT_MAX_TIMEOUT,
39+
subscription_timeout: DEFAULT_SUBSCRIPTION_TIMEOUT,
3640
max_retries: DEFAULT_MAX_RETRIES,
3741
min_delay: DEFAULT_MIN_DELAY,
3842
_network: PhantomData,
3943
}
4044
}
4145

42-
/// Create a new `RobustProvider` with no retry attempts and only timeout set.
46+
/// Create a new [`RobustProvider`] with no retry attempts and only timeout set.
4347
///
4448
/// The provided provider is treated as the primary provider.
4549
#[must_use]
@@ -63,6 +67,16 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
6367
self
6468
}
6569

70+
/// Set the timeout for subscription operations.
71+
///
72+
/// This should be set higher than [`max_timeout`](Self::max_timeout) to accommodate chains with
73+
/// slow block times. Default is [`DEFAULT_SUBSCRIPTION_TIMEOUT`].
74+
#[must_use]
75+
pub fn subscription_timeout(mut self, timeout: Duration) -> Self {
76+
self.subscription_timeout = timeout;
77+
self
78+
}
79+
6680
/// Set the maximum number of retry attempts.
6781
#[must_use]
6882
pub fn max_retries(mut self, max_retries: usize) -> Self {
@@ -92,6 +106,7 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
92106
Ok(RobustProvider {
93107
providers,
94108
max_timeout: self.max_timeout,
109+
subscription_timeout: self.subscription_timeout,
95110
max_retries: self.max_retries,
96111
min_delay: self.min_delay,
97112
})

src/robust_provider/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ pub mod builder;
22
pub mod error;
33
pub mod provider;
44
pub mod provider_conversion;
5+
pub mod subscription;
56

67
pub use error::Error;

0 commit comments

Comments
 (0)