Skip to content

Commit 4ba97ee

Browse files
committed
ref: rebase onto main
1 parent 8304024 commit 4ba97ee

File tree

5 files changed

+20
-248
lines changed

5 files changed

+20
-248
lines changed

src/block_range_scanner.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ use tokio::{
6767
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
6868

6969
use crate::{
70-
IntoRobustProvider, RobustProvider, RobustSubscription, ScannerMessage,
70+
IntoRobustProvider, RobustProvider, ScannerMessage,
7171
error::ScannerError,
7272
robust_provider::Error as RobustProviderError,
7373
types::{ScannerStatus, TryStream},
@@ -77,6 +77,7 @@ use alloy::{
7777
eips::BlockNumberOrTag,
7878
network::{BlockResponse, Network, primitives::HeaderResponse},
7979
primitives::{B256, BlockNumber},
80+
pubsub::Subscription,
8081
transports::{RpcError, TransportErrorKind},
8182
};
8283
use tracing::{debug, error, info, warn};
@@ -610,29 +611,16 @@ impl<N: Network> Service<N> {
610611

611612
async fn stream_live_blocks(
612613
mut range_start: BlockNumber,
613-
subscription: RobustSubscription<N>,
614+
subscription: Subscription<N::HeaderResponse>,
614615
sender: mpsc::Sender<Message>,
615616
block_confirmations: u64,
616617
max_block_range: u64,
617618
) {
618619
// ensure we start streaming only after the expected_next_block cutoff
619620
let cutoff = range_start;
620-
let mut stream = subscription.into_stream().skip_while(|result| match result {
621-
Ok(header) => header.number() < cutoff,
622-
Err(_) => false,
623-
});
624-
625-
while let Some(result) = stream.next().await {
626-
let incoming_block = match result {
627-
Ok(block) => block,
628-
Err(e) => {
629-
error!(error = %e, "Error receiving block from stream");
630-
// Error from subscription, exit the stream
631-
_ = sender.try_stream(e).await;
632-
return;
633-
}
634-
};
621+
let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
635622

623+
while let Some(incoming_block) = stream.next().await {
636624
let incoming_block_num = incoming_block.number();
637625
info!(block_number = incoming_block_num, "Received block header");
638626

src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,4 @@ pub use event_scanner::{
1616
SyncFromBlock, SyncFromLatestEvents,
1717
};
1818

19-
pub use robust_provider::{
20-
provider::RobustProvider, provider_conversion::IntoRobustProvider,
21-
subscription::RobustSubscription,
22-
};
19+
pub use robust_provider::{provider::RobustProvider, provider_conversion::IntoRobustProvider};

src/robust_provider/builder.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,40 +10,36 @@ use crate::{
1010
// RPC retry and timeout settings
1111
/// Default timeout used by `RobustProvider`
1212
pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60);
13-
/// Default timeout for subscriptions (longer to accommodate slow block times)
14-
pub const DEFAULT_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(120);
1513
/// Default maximum number of retry attempts.
1614
pub const DEFAULT_MAX_RETRIES: usize = 3;
1715
/// Default base delay between retries.
1816
pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);
1917

2018
#[derive(Clone)]
2119
pub struct RobustProviderBuilder<N: Network, P: IntoProvider<N>> {
22-
pub(crate) providers: Vec<P>,
20+
providers: Vec<P>,
2321
max_timeout: Duration,
24-
subscription_timeout: Duration,
2522
max_retries: usize,
2623
min_delay: Duration,
2724
_network: PhantomData<N>,
2825
}
2926

3027
impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
31-
/// Create a new [`RobustProvider`] with default settings.
28+
/// Create a new `RobustProvider` with default settings.
3229
///
3330
/// The provided provider is treated as the primary provider.
3431
#[must_use]
3532
pub fn new(provider: P) -> Self {
3633
Self {
3734
providers: vec![provider],
3835
max_timeout: DEFAULT_MAX_TIMEOUT,
39-
subscription_timeout: DEFAULT_SUBSCRIPTION_TIMEOUT,
4036
max_retries: DEFAULT_MAX_RETRIES,
4137
min_delay: DEFAULT_MIN_DELAY,
4238
_network: PhantomData,
4339
}
4440
}
4541

46-
/// Create a new [`RobustProvider`] with no retry attempts and only timeout set.
42+
/// Create a new `RobustProvider` with no retry attempts and only timeout set.
4743
///
4844
/// The provided provider is treated as the primary provider.
4945
#[must_use]
@@ -67,16 +63,6 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
6763
self
6864
}
6965

70-
/// Set the timeout for subscription operations.
71-
///
72-
/// This should be set higher than `max_timeout` to accommodate chains with slow block times.
73-
/// Default is 2 minutes.
74-
#[must_use]
75-
pub fn subscription_timeout(mut self, timeout: Duration) -> Self {
76-
self.subscription_timeout = timeout;
77-
self
78-
}
79-
8066
/// Set the maximum number of retry attempts.
8167
#[must_use]
8268
pub fn max_retries(mut self, max_retries: usize) -> Self {
@@ -106,7 +92,6 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
10692
Ok(RobustProvider {
10793
providers,
10894
max_timeout: self.max_timeout,
109-
subscription_timeout: self.subscription_timeout,
11095
max_retries: self.max_retries,
11196
min_delay: self.min_delay,
11297
})

src/robust_provider/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@ pub mod builder;
22
pub mod error;
33
pub mod provider;
44
pub mod provider_conversion;
5-
pub mod subscription;
65

76
pub use error::Error;

src/robust_provider/provider.rs

Lines changed: 11 additions & 208 deletions
Original file line numberDiff line numberDiff line change
@@ -1,216 +1,18 @@
1-
use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc, time::Duration};
1+
use std::{fmt::Debug, time::Duration};
22

33
use alloy::{
4-
eips::{BlockId, BlockNumberOrTag},
4+
eips::BlockNumberOrTag,
55
network::{Ethereum, Network},
6-
providers::{
7-
DynProvider, Provider, RootProvider,
8-
fillers::{FillProvider, TxFiller},
9-
layers::{CacheProvider, CallBatchProvider},
10-
},
6+
providers::{Provider, RootProvider},
117
pubsub::Subscription,
128
rpc::types::{Filter, Log},
13-
transports::{RpcError, TransportErrorKind, http::reqwest::Url},
9+
transports::{RpcError, TransportErrorKind},
1410
};
1511
use backon::{ExponentialBuilder, Retryable};
16-
use thiserror::Error;
17-
use tokio::time::{error as TokioError, timeout};
12+
use tokio::time::timeout;
1813
use tracing::{error, info};
1914

20-
#[derive(Error, Debug, Clone)]
21-
pub enum Error {
22-
#[error("Operation timed out")]
23-
Timeout,
24-
#[error("RPC call failed after exhausting all retry attempts: {0}")]
25-
RpcError(Arc<RpcError<TransportErrorKind>>),
26-
#[error("Block not found, Block Id: {0}")]
27-
BlockNotFound(BlockId),
28-
}
29-
30-
impl From<RpcError<TransportErrorKind>> for Error {
31-
fn from(err: RpcError<TransportErrorKind>) -> Self {
32-
Error::RpcError(Arc::new(err))
33-
}
34-
}
35-
36-
impl From<TokioError::Elapsed> for Error {
37-
fn from(_: TokioError::Elapsed) -> Self {
38-
Error::Timeout
39-
}
40-
}
41-
42-
pub trait IntoProvider<N: Network = Ethereum> {
43-
fn into_provider(
44-
self,
45-
) -> impl std::future::Future<Output = Result<impl Provider<N>, Error>> + Send;
46-
}
47-
48-
impl<N: Network> IntoProvider<N> for RobustProvider<N> {
49-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
50-
Ok(self.primary().to_owned())
51-
}
52-
}
53-
54-
impl<N: Network> IntoProvider<N> for RootProvider<N> {
55-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
56-
Ok(self)
57-
}
58-
}
59-
60-
impl<N: Network> IntoProvider<N> for &str {
61-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
62-
Ok(RootProvider::connect(self).await?)
63-
}
64-
}
65-
66-
impl<N: Network> IntoProvider<N> for Url {
67-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
68-
Ok(RootProvider::connect(self.as_str()).await?)
69-
}
70-
}
71-
72-
impl<F, P, N> IntoProvider<N> for FillProvider<F, P, N>
73-
where
74-
F: TxFiller<N>,
75-
P: Provider<N>,
76-
N: Network,
77-
{
78-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
79-
Ok(self)
80-
}
81-
}
82-
83-
impl<P, N> IntoProvider<N> for CacheProvider<P, N>
84-
where
85-
P: Provider<N>,
86-
N: Network,
87-
{
88-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
89-
Ok(self)
90-
}
91-
}
92-
93-
impl<N> IntoProvider<N> for DynProvider<N>
94-
where
95-
N: Network,
96-
{
97-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
98-
Ok(self)
99-
}
100-
}
101-
102-
impl<P, N> IntoProvider<N> for CallBatchProvider<P, N>
103-
where
104-
P: Provider<N> + 'static,
105-
N: Network,
106-
{
107-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
108-
Ok(self)
109-
}
110-
}
111-
112-
pub trait IntoRobustProvider<N: Network = Ethereum> {
113-
fn into_robust_provider(
114-
self,
115-
) -> impl std::future::Future<Output = Result<RobustProvider<N>, Error>> + Send;
116-
}
117-
118-
impl<N: Network, P: IntoProvider<N> + Send> IntoRobustProvider<N> for P {
119-
async fn into_robust_provider(self) -> Result<RobustProvider<N>, Error> {
120-
RobustProviderBuilder::new(self).build().await
121-
}
122-
}
123-
124-
// RPC retry and timeout settings
125-
/// Default timeout used by `RobustProvider`
126-
pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60);
127-
/// Default maximum number of retry attempts.
128-
pub const DEFAULT_MAX_RETRIES: usize = 3;
129-
/// Default base delay between retries.
130-
pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);
131-
132-
#[derive(Clone)]
133-
pub struct RobustProviderBuilder<N: Network, P: IntoProvider<N>> {
134-
providers: Vec<P>,
135-
max_timeout: Duration,
136-
max_retries: usize,
137-
min_delay: Duration,
138-
_network: PhantomData<N>,
139-
}
140-
141-
impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
142-
/// Create a new `RobustProvider` with default settings.
143-
///
144-
/// The provided provider is treated as the primary provider.
145-
#[must_use]
146-
pub fn new(provider: P) -> Self {
147-
Self {
148-
providers: vec![provider],
149-
max_timeout: DEFAULT_MAX_TIMEOUT,
150-
max_retries: DEFAULT_MAX_RETRIES,
151-
min_delay: DEFAULT_MIN_DELAY,
152-
_network: PhantomData,
153-
}
154-
}
155-
156-
/// Create a new `RobustProvider` with no retry attempts and only timeout set.
157-
///
158-
/// The provided provider is treated as the primary provider.
159-
#[must_use]
160-
pub fn fragile(provider: P) -> Self {
161-
Self::new(provider).max_retries(0).min_delay(Duration::ZERO)
162-
}
163-
164-
/// Add a fallback provider to the list.
165-
///
166-
/// Fallback providers are used when the primary provider times out or fails.
167-
#[must_use]
168-
pub fn fallback(mut self, provider: P) -> Self {
169-
self.providers.push(provider);
170-
self
171-
}
172-
173-
/// Set the maximum timeout for RPC operations.
174-
#[must_use]
175-
pub fn max_timeout(mut self, timeout: Duration) -> Self {
176-
self.max_timeout = timeout;
177-
self
178-
}
179-
180-
/// Set the maximum number of retry attempts.
181-
#[must_use]
182-
pub fn max_retries(mut self, max_retries: usize) -> Self {
183-
self.max_retries = max_retries;
184-
self
185-
}
186-
187-
/// Set the base delay for exponential backoff retries.
188-
#[must_use]
189-
pub fn min_delay(mut self, min_delay: Duration) -> Self {
190-
self.min_delay = min_delay;
191-
self
192-
}
193-
194-
/// Build the `RobustProvider`.
195-
///
196-
/// Final builder method: consumes the builder and returns the built [`RobustProvider`].
197-
///
198-
/// # Errors
199-
///
200-
/// Returns an error if any of the providers fail to connect.
201-
pub async fn build(self) -> Result<RobustProvider<N>, Error> {
202-
let mut providers = vec![];
203-
for p in self.providers {
204-
providers.push(p.into_provider().await?.root().to_owned());
205-
}
206-
Ok(RobustProvider {
207-
providers,
208-
max_timeout: self.max_timeout,
209-
max_retries: self.max_retries,
210-
min_delay: self.min_delay,
211-
})
212-
}
213-
}
15+
use crate::robust_provider::Error;
21416

21517
/// Provider wrapper with built-in retry and timeout mechanisms.
21618
///
@@ -219,10 +21,10 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
21921
/// The first provider in the vector is treated as the primary provider.
22022
#[derive(Clone)]
22123
pub struct RobustProvider<N: Network = Ethereum> {
222-
providers: Vec<RootProvider<N>>,
223-
max_timeout: Duration,
224-
max_retries: usize,
225-
min_delay: Duration,
24+
pub(crate) providers: Vec<RootProvider<N>>,
25+
pub(crate) max_timeout: Duration,
26+
pub(crate) max_retries: usize,
27+
pub(crate) min_delay: Duration,
22628
}
22729

22830
impl<N: Network> RobustProvider<N> {
@@ -458,6 +260,7 @@ impl<N: Network> RobustProvider<N> {
458260
#[cfg(test)]
459261
mod tests {
460262
use super::*;
263+
use crate::robust_provider::builder::RobustProviderBuilder;
461264
use alloy::{
462265
consensus::BlockHeader,
463266
providers::{ProviderBuilder, WsConnect, ext::AnvilApi},

0 commit comments

Comments
 (0)