Skip to content

Commit 6b1b2e0

Browse files
authored
Refactor robust provider (#173)
1 parent eb8ae9a commit 6b1b2e0

File tree

6 files changed

+240
-209
lines changed

6 files changed

+240
-209
lines changed

src/robust_provider/builder.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::{marker::PhantomData, time::Duration};
2+
3+
use alloy::{network::Network, providers::Provider};
4+
5+
use crate::robust_provider::{Error, IntoProvider, RobustProvider};
6+
7+
// RPC retry and timeout settings
8+
/// Default timeout used by `RobustProvider`
9+
pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60);
10+
/// Default maximum number of retry attempts.
11+
pub const DEFAULT_MAX_RETRIES: usize = 3;
12+
/// Default base delay between retries.
13+
pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);
14+
15+
#[derive(Clone)]
16+
pub struct RobustProviderBuilder<N: Network, P: IntoProvider<N>> {
17+
providers: Vec<P>,
18+
max_timeout: Duration,
19+
max_retries: usize,
20+
min_delay: Duration,
21+
_network: PhantomData<N>,
22+
}
23+
24+
impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
25+
/// Create a new `RobustProvider` with default settings.
26+
///
27+
/// The provided provider is treated as the primary provider.
28+
#[must_use]
29+
pub fn new(provider: P) -> Self {
30+
Self {
31+
providers: vec![provider],
32+
max_timeout: DEFAULT_MAX_TIMEOUT,
33+
max_retries: DEFAULT_MAX_RETRIES,
34+
min_delay: DEFAULT_MIN_DELAY,
35+
_network: PhantomData,
36+
}
37+
}
38+
39+
/// Create a new `RobustProvider` with no retry attempts and only timeout set.
40+
///
41+
/// The provided provider is treated as the primary provider.
42+
#[must_use]
43+
pub fn fragile(provider: P) -> Self {
44+
Self::new(provider).max_retries(0).min_delay(Duration::ZERO)
45+
}
46+
47+
/// Add a fallback provider to the list.
48+
///
49+
/// Fallback providers are used when the primary provider times out or fails.
50+
#[must_use]
51+
pub fn fallback(mut self, provider: P) -> Self {
52+
self.providers.push(provider);
53+
self
54+
}
55+
56+
/// Set the maximum timeout for RPC operations.
57+
#[must_use]
58+
pub fn max_timeout(mut self, timeout: Duration) -> Self {
59+
self.max_timeout = timeout;
60+
self
61+
}
62+
63+
/// Set the maximum number of retry attempts.
64+
#[must_use]
65+
pub fn max_retries(mut self, max_retries: usize) -> Self {
66+
self.max_retries = max_retries;
67+
self
68+
}
69+
70+
/// Set the base delay for exponential backoff retries.
71+
#[must_use]
72+
pub fn min_delay(mut self, min_delay: Duration) -> Self {
73+
self.min_delay = min_delay;
74+
self
75+
}
76+
77+
/// Build the `RobustProvider`.
78+
///
79+
/// Final builder method: consumes the builder and returns the built [`RobustProvider`].
80+
///
81+
/// # Errors
82+
///
83+
/// Returns an error if any of the providers fail to connect.
84+
pub async fn build(self) -> Result<RobustProvider<N>, Error> {
85+
let mut providers = vec![];
86+
for p in self.providers {
87+
providers.push(p.into_provider().await?.root().to_owned());
88+
}
89+
Ok(RobustProvider {
90+
providers,
91+
max_timeout: self.max_timeout,
92+
max_retries: self.max_retries,
93+
min_delay: self.min_delay,
94+
})
95+
}
96+
}

src/robust_provider/error.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use std::sync::Arc;
2+
3+
use alloy::{
4+
eips::BlockId,
5+
transports::{RpcError, TransportErrorKind},
6+
};
7+
use thiserror::Error;
8+
use tokio::time::error as TokioError;
9+
10+
#[derive(Error, Debug, Clone)]
11+
pub enum Error {
12+
#[error("Operation timed out")]
13+
Timeout,
14+
#[error("RPC call failed after exhausting all retry attempts: {0}")]
15+
RpcError(Arc<RpcError<TransportErrorKind>>),
16+
#[error("Block not found, Block Id: {0}")]
17+
BlockNotFound(BlockId),
18+
}
19+
20+
impl From<RpcError<TransportErrorKind>> for Error {
21+
fn from(err: RpcError<TransportErrorKind>) -> Self {
22+
Error::RpcError(Arc::new(err))
23+
}
24+
}
25+
26+
impl From<TokioError::Elapsed> for Error {
27+
fn from(_: TokioError::Elapsed) -> Self {
28+
Error::Timeout
29+
}
30+
}

src/robust_provider/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
pub mod builder;
2+
pub mod error;
3+
pub mod provider;
4+
pub mod provider_conversion;
5+
6+
pub use builder::*;
7+
pub use error::Error;
8+
pub use provider::RobustProvider;
9+
pub use provider_conversion::{IntoProvider, IntoRobustProvider};
Lines changed: 11 additions & 208 deletions
Original file line numberDiff line numberDiff line change
@@ -1,216 +1,18 @@
1-
use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc, time::Duration};
1+
use std::{fmt::Debug, time::Duration};
22

33
use alloy::{
4-
eips::{BlockId, BlockNumberOrTag},
4+
eips::BlockNumberOrTag,
55
network::{Ethereum, Network},
6-
providers::{
7-
DynProvider, Provider, RootProvider,
8-
fillers::{FillProvider, TxFiller},
9-
layers::{CacheProvider, CallBatchProvider},
10-
},
6+
providers::{Provider, RootProvider},
117
pubsub::Subscription,
128
rpc::types::{Filter, Log},
13-
transports::{RpcError, TransportErrorKind, http::reqwest::Url},
9+
transports::{RpcError, TransportErrorKind},
1410
};
1511
use backon::{ExponentialBuilder, Retryable};
16-
use thiserror::Error;
17-
use tokio::time::{error as TokioError, timeout};
12+
use tokio::time::timeout;
1813
use tracing::{error, info};
1914

20-
#[derive(Error, Debug, Clone)]
21-
pub enum Error {
22-
#[error("Operation timed out")]
23-
Timeout,
24-
#[error("RPC call failed after exhausting all retry attempts: {0}")]
25-
RpcError(Arc<RpcError<TransportErrorKind>>),
26-
#[error("Block not found, Block Id: {0}")]
27-
BlockNotFound(BlockId),
28-
}
29-
30-
impl From<RpcError<TransportErrorKind>> for Error {
31-
fn from(err: RpcError<TransportErrorKind>) -> Self {
32-
Error::RpcError(Arc::new(err))
33-
}
34-
}
35-
36-
impl From<TokioError::Elapsed> for Error {
37-
fn from(_: TokioError::Elapsed) -> Self {
38-
Error::Timeout
39-
}
40-
}
41-
42-
pub trait IntoProvider<N: Network = Ethereum> {
43-
fn into_provider(
44-
self,
45-
) -> impl std::future::Future<Output = Result<impl Provider<N>, Error>> + Send;
46-
}
47-
48-
impl<N: Network> IntoProvider<N> for RobustProvider<N> {
49-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
50-
Ok(self.primary().to_owned())
51-
}
52-
}
53-
54-
impl<N: Network> IntoProvider<N> for RootProvider<N> {
55-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
56-
Ok(self)
57-
}
58-
}
59-
60-
impl<N: Network> IntoProvider<N> for &str {
61-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
62-
Ok(RootProvider::connect(self).await?)
63-
}
64-
}
65-
66-
impl<N: Network> IntoProvider<N> for Url {
67-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
68-
Ok(RootProvider::connect(self.as_str()).await?)
69-
}
70-
}
71-
72-
impl<F, P, N> IntoProvider<N> for FillProvider<F, P, N>
73-
where
74-
F: TxFiller<N>,
75-
P: Provider<N>,
76-
N: Network,
77-
{
78-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
79-
Ok(self)
80-
}
81-
}
82-
83-
impl<P, N> IntoProvider<N> for CacheProvider<P, N>
84-
where
85-
P: Provider<N>,
86-
N: Network,
87-
{
88-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
89-
Ok(self)
90-
}
91-
}
92-
93-
impl<N> IntoProvider<N> for DynProvider<N>
94-
where
95-
N: Network,
96-
{
97-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
98-
Ok(self)
99-
}
100-
}
101-
102-
impl<P, N> IntoProvider<N> for CallBatchProvider<P, N>
103-
where
104-
P: Provider<N> + 'static,
105-
N: Network,
106-
{
107-
async fn into_provider(self) -> Result<impl Provider<N>, Error> {
108-
Ok(self)
109-
}
110-
}
111-
112-
pub trait IntoRobustProvider<N: Network = Ethereum> {
113-
fn into_robust_provider(
114-
self,
115-
) -> impl std::future::Future<Output = Result<RobustProvider<N>, Error>> + Send;
116-
}
117-
118-
impl<N: Network, P: IntoProvider<N> + Send> IntoRobustProvider<N> for P {
119-
async fn into_robust_provider(self) -> Result<RobustProvider<N>, Error> {
120-
RobustProviderBuilder::new(self).build().await
121-
}
122-
}
123-
124-
// RPC retry and timeout settings
125-
/// Default timeout used by `RobustProvider`
126-
pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60);
127-
/// Default maximum number of retry attempts.
128-
pub const DEFAULT_MAX_RETRIES: usize = 3;
129-
/// Default base delay between retries.
130-
pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);
131-
132-
#[derive(Clone)]
133-
pub struct RobustProviderBuilder<N: Network, P: IntoProvider<N>> {
134-
providers: Vec<P>,
135-
max_timeout: Duration,
136-
max_retries: usize,
137-
min_delay: Duration,
138-
_network: PhantomData<N>,
139-
}
140-
141-
impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
142-
/// Create a new `RobustProvider` with default settings.
143-
///
144-
/// The provided provider is treated as the primary provider.
145-
#[must_use]
146-
pub fn new(provider: P) -> Self {
147-
Self {
148-
providers: vec![provider],
149-
max_timeout: DEFAULT_MAX_TIMEOUT,
150-
max_retries: DEFAULT_MAX_RETRIES,
151-
min_delay: DEFAULT_MIN_DELAY,
152-
_network: PhantomData,
153-
}
154-
}
155-
156-
/// Create a new `RobustProvider` with no retry attempts and only timeout set.
157-
///
158-
/// The provided provider is treated as the primary provider.
159-
#[must_use]
160-
pub fn fragile(provider: P) -> Self {
161-
Self::new(provider).max_retries(0).min_delay(Duration::ZERO)
162-
}
163-
164-
/// Add a fallback provider to the list.
165-
///
166-
/// Fallback providers are used when the primary provider times out or fails.
167-
#[must_use]
168-
pub fn fallback(mut self, provider: P) -> Self {
169-
self.providers.push(provider);
170-
self
171-
}
172-
173-
/// Set the maximum timeout for RPC operations.
174-
#[must_use]
175-
pub fn max_timeout(mut self, timeout: Duration) -> Self {
176-
self.max_timeout = timeout;
177-
self
178-
}
179-
180-
/// Set the maximum number of retry attempts.
181-
#[must_use]
182-
pub fn max_retries(mut self, max_retries: usize) -> Self {
183-
self.max_retries = max_retries;
184-
self
185-
}
186-
187-
/// Set the base delay for exponential backoff retries.
188-
#[must_use]
189-
pub fn min_delay(mut self, min_delay: Duration) -> Self {
190-
self.min_delay = min_delay;
191-
self
192-
}
193-
194-
/// Build the `RobustProvider`.
195-
///
196-
/// Final builder method: consumes the builder and returns the built [`RobustProvider`].
197-
///
198-
/// # Errors
199-
///
200-
/// Returns an error if any of the providers fail to connect.
201-
pub async fn build(self) -> Result<RobustProvider<N>, Error> {
202-
let mut providers = vec![];
203-
for p in self.providers {
204-
providers.push(p.into_provider().await?.root().to_owned());
205-
}
206-
Ok(RobustProvider {
207-
providers,
208-
max_timeout: self.max_timeout,
209-
max_retries: self.max_retries,
210-
min_delay: self.min_delay,
211-
})
212-
}
213-
}
15+
use crate::robust_provider::Error;
21416

21517
/// Provider wrapper with built-in retry and timeout mechanisms.
21618
///
@@ -219,10 +21,10 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
21921
/// The first provider in the vector is treated as the primary provider.
22022
#[derive(Clone)]
22123
pub struct RobustProvider<N: Network = Ethereum> {
222-
providers: Vec<RootProvider<N>>,
223-
max_timeout: Duration,
224-
max_retries: usize,
225-
min_delay: Duration,
24+
pub(crate) providers: Vec<RootProvider<N>>,
25+
pub(crate) max_timeout: Duration,
26+
pub(crate) max_retries: usize,
27+
pub(crate) min_delay: Duration,
22628
}
22729

22830
impl<N: Network> RobustProvider<N> {
@@ -468,6 +270,7 @@ impl<N: Network> RobustProvider<N> {
468270
#[cfg(test)]
469271
mod tests {
470272
use super::*;
273+
use crate::robust_provider::RobustProviderBuilder;
471274
use alloy::{
472275
consensus::BlockHeader,
473276
providers::{ProviderBuilder, WsConnect, ext::AnvilApi},

0 commit comments

Comments
 (0)