Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e1d96dd
feat: clean up tracing in robust provider
LeoPatOZ Dec 15, 2025
dbdae73
fix: fmt
LeoPatOZ Dec 15, 2025
8e7865d
ref: add back err -> change to warn
LeoPatOZ Dec 15, 2025
2a17bb0
ref: warn to trace
LeoPatOZ Dec 15, 2025
952d034
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 16, 2025
59cc546
ref: add back trace
LeoPatOZ Dec 16, 2025
2a99eeb
feat: add trace instrument to try_fallback_providers
LeoPatOZ Dec 16, 2025
61cc2d9
ref: fix trace instrument
LeoPatOZ Dec 16, 2025
2cb228f
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 16, 2025
a8a57ef
ref: remove instrument debug on rpc methods
LeoPatOZ Dec 17, 2025
a3892fd
feat: add trace instrument to robust
LeoPatOZ Dec 17, 2025
f6fff0c
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 18, 2025
46e8c09
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 18, 2025
28493a5
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 18, 2025
c38b181
fix: trace
LeoPatOZ Dec 18, 2025
09a3d02
feat: use default traces where it makes sense
LeoPatOZ Dec 18, 2025
262b36a
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 18, 2025
24833e1
ref: remove some default traces + cfg feature on tracing for instrument
LeoPatOZ Dec 19, 2025
329cf66
Merge branch 'main' into improve-logging-robust
LeoPatOZ Dec 20, 2025
7f09509
Merge branch 'main' into improve-logging-robust
0xNeshi Dec 22, 2025
6e331b7
ref: inline instrument import
0xNeshi Dec 22, 2025
22346b8
ref: use try_fallback_providers_from instead of try_fallback_provider…
0xNeshi Dec 22, 2025
7a4be06
trace last_error in try_fallback_providers_from
0xNeshi Dec 22, 2025
603fd87
inline instrument import in subscription.rs
0xNeshi Dec 22, 2025
ae2db33
instrument switch_to_fallback
0xNeshi Dec 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 35 additions & 82 deletions src/robust_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,12 @@ impl<N: Network> RobustProvider<N> {
&self,
number: BlockNumberOrTag,
) -> Result<N::BlockResponse, Error> {
info!("eth_getBlockByNumber called");
let result = self
.try_operation_with_failover(
move |provider| async move { provider.get_block_by_number(number).await },
false,
)
.await;
if let Err(e) = &result {
error!(error = %e, "eth_getByBlockNumber failed");
}

result?.ok_or_else(|| Error::BlockNotFound(number.into()))
}
Expand All @@ -144,16 +140,12 @@ impl<N: Network> RobustProvider<N> {
/// `call_timeout`).
/// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain.
pub async fn get_block(&self, id: BlockId) -> Result<N::BlockResponse, Error> {
info!("eth_getBlock called");
let result = self
.try_operation_with_failover(
|provider| async move { provider.get_block(id).await },
false,
)
.await;
if let Err(e) = &result {
error!(error = %e, "eth_getByBlockNumber failed");
}
result?.ok_or_else(|| Error::BlockNotFound(id))
}

Expand All @@ -168,18 +160,12 @@ impl<N: Network> RobustProvider<N> {
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
/// `call_timeout`).
pub async fn get_block_number(&self) -> Result<BlockNumber, Error> {
info!("eth_getBlockNumber called");
let result = self
.try_operation_with_failover(
move |provider| async move { provider.get_block_number().await },
false,
)
.await
.map_err(Error::from);
if let Err(e) = &result {
error!(error = %e, "eth_getBlockNumber failed");
}
result
self.try_operation_with_failover(
move |provider| async move { provider.get_block_number().await },
false,
)
.await
.map_err(Error::from)
}

/// Get the block number for a given block identifier.
Expand All @@ -198,16 +184,12 @@ impl<N: Network> RobustProvider<N> {
/// `call_timeout`).
/// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain.
pub async fn get_block_number_by_id(&self, block_id: BlockId) -> Result<BlockNumber, Error> {
info!("get_block_number_by_id called");
let result = self
.try_operation_with_failover(
move |provider| async move { provider.get_block_number_by_id(block_id).await },
false,
)
.await;
if let Err(e) = &result {
error!(error = %e, "get_block_number_by_id failed");
}
result?.ok_or_else(|| Error::BlockNotFound(block_id))
}

Expand All @@ -228,7 +210,6 @@ impl<N: Network> RobustProvider<N> {
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
/// `call_timeout`).
pub async fn get_latest_confirmed(&self, confirmations: u64) -> Result<u64, Error> {
info!(configurations = confirmations, "get_latest_confirmed called");
let latest_block = self.get_block_number().await?;
let confirmed_block = latest_block.saturating_sub(confirmations);
Ok(confirmed_block)
Expand All @@ -246,16 +227,12 @@ impl<N: Network> RobustProvider<N> {
/// `call_timeout`).
/// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain.
pub async fn get_block_by_hash(&self, hash: BlockHash) -> Result<N::BlockResponse, Error> {
info!("eth_getBlockByHash called");
let result = self
.try_operation_with_failover(
move |provider| async move { provider.get_block_by_hash(hash).await },
false,
)
.await;
if let Err(e) = &result {
error!(error = %e, "eth_getBlockByHash failed");
}

result?.ok_or_else(|| Error::BlockNotFound(hash.into()))
}
Expand All @@ -271,18 +248,12 @@ impl<N: Network> RobustProvider<N> {
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
/// `call_timeout`).
pub async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, Error> {
info!("eth_getLogs called");
let result = self
.try_operation_with_failover(
move |provider| async move { provider.get_logs(filter).await },
false,
)
.await
.map_err(Error::from);
if let Err(e) = &result {
error!(error = %e, "eth_getLogs failed");
}
result
self.try_operation_with_failover(
move |provider| async move { provider.get_logs(filter).await },
false,
)
.await
.map_err(Error::from)
}

/// Subscribe to new block headers with automatic failover and reconnection.
Expand All @@ -301,7 +272,6 @@ impl<N: Network> RobustProvider<N> {
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
/// `call_timeout`).
pub async fn subscribe_blocks(&self) -> Result<RobustSubscription<N>, Error> {
info!("eth_subscribe called");
let subscription = self
.try_operation_with_failover(
move |provider| async move {
Expand All @@ -312,15 +282,9 @@ impl<N: Network> RobustProvider<N> {
},
true,
)
.await;
.await?;

match subscription {
Ok(sub) => Ok(RobustSubscription::new(sub, self.clone())),
Err(e) => {
error!(error = %e, "eth_subscribe failed");
Err(e.into())
}
}
Ok(RobustSubscription::new(subscription, self.clone()))
}

/// Execute `operation` with exponential backoff and a total timeout.
Expand Down Expand Up @@ -352,26 +316,13 @@ impl<N: Network> RobustProvider<N> {
let primary = self.primary();
self.try_provider_with_timeout(primary, &operation)
.or_else(|last_error| {
self.try_fallback_providers(&operation, require_pubsub, last_error)
self.try_fallback_providers_from(&operation, require_pubsub, last_error, 0)
.map_ok(|(value, _)| value)
})
.await
}

pub(crate) async fn try_fallback_providers<T: Debug, F, Fut>(
&self,
operation: F,
require_pubsub: bool,
last_error: CoreError,
) -> Result<T, CoreError>
where
F: Fn(RootProvider<N>) -> Fut,
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
{
self.try_fallback_providers_from(operation, require_pubsub, last_error, 0)
.await
.map(|(value, _idx)| value)
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self, operation)))]
pub(crate) async fn try_fallback_providers_from<T: Debug, F, Fut>(
&self,
operation: F,
Expand All @@ -384,34 +335,41 @@ impl<N: Network> RobustProvider<N> {
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
{
let num_fallbacks = self.fallback_providers.len();
if num_fallbacks > 0 && start_index == 0 {
info!("Primary provider failed, trying fallback provider(s)");
}

let fallback_providers = self.fallback_providers.iter().enumerate().skip(start_index);
for (fallback_idx, provider) in fallback_providers {
if require_pubsub && !Self::supports_pubsub(provider) {
info!(
fallback_index = fallback_idx + 1,
trace!(
provider_num = fallback_idx + 1,
"Fallback provider doesn't support pubsub, skipping"
);
continue;
}
info!(fallback_index = fallback_idx + 1, "Attempting fallback provider");

trace!(
fallback_provider_index = fallback_idx + 1,
total_num_fallbacks = num_fallbacks,
"Attempting fallback provider"
);

match self.try_provider_with_timeout(provider, &operation).await {
Ok(value) => {
info!(provider_num = fallback_idx + 1, "Fallback provider succeeded");
info!(
provider_num = fallback_idx + 1,
total_fallbacks = num_fallbacks,
"Switched to fallback provider"
);
return Ok((value, fallback_idx));
}
Err(e) => {
error!(provider_num = fallback_idx + 1, err = %e, "Fallback provider failed");
tracing::warn!(provider_num = fallback_idx + 1, err = %e, "Fallback provider failed");
last_error = e;
}
}
}
// All fallbacks failed / skipped, return the last error
error!("All providers failed or timed out - returning the last providers attempt's error");

tracing::error!("All providers failed");

Err(last_error)
}

Expand All @@ -431,12 +389,7 @@ impl<N: Network> RobustProvider<N> {

timeout(
self.call_timeout,
(|| operation(provider.clone()))
.retry(retry_strategy)
.notify(|err: &RpcError<TransportErrorKind>, dur: Duration| {
info!(error = %err, duration_ms = dur.as_millis(), "RPC error retrying");
})
.sleep(tokio::time::sleep),
(|| operation(provider.clone())).retry(retry_strategy).sleep(tokio::time::sleep),
)
.await
.map_err(CoreError::from)?
Expand Down
48 changes: 14 additions & 34 deletions src/robust_provider/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,8 @@ impl From<CoreError> for Error {
impl From<RecvError> for Error {
fn from(err: RecvError) -> Self {
match err {
RecvError::Closed => {
error!("Provider closed the subscription channel");
Error::Closed
}
RecvError::Lagged(count) => {
error!(skipped = count, "Receiver lagged");
Error::Lagged(count)
}
RecvError::Closed => Error::Closed,
RecvError::Lagged(count) => Error::Lagged(count),
}
}
}
Expand Down Expand Up @@ -122,17 +116,7 @@ impl<N: Network> RobustSubscription<N> {
}
return Ok(header);
}
Err(recv_error) => {
match recv_error {
RecvError::Closed => {
error!("Provider closed the subscription channel");
}
RecvError::Lagged(count) => {
error!(skipped = count, "Receiver lagged");
}
}
return Err(recv_error.into());
}
Err(recv_error) => return Err(recv_error.into()),
},
Err(elapsed_err) => {
warn!(
Expand All @@ -148,6 +132,7 @@ impl<N: Network> RobustSubscription<N> {

/// Try to reconnect to the primary provider if enough time has elapsed.
/// Returns true if reconnection was successful, false if it's not time yet or if it failed.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
async fn try_reconnect_to_primary(&mut self, force: bool) -> bool {
// Check if we should attempt reconnection
let should_reconnect = force ||
Expand All @@ -162,31 +147,26 @@ impl<N: Network> RobustSubscription<N> {
return false;
}

info!("Attempting to reconnect to primary provider");

let operation =
move |provider: RootProvider<N>| async move { provider.subscribe_blocks().await };

let primary = self.robust_provider.primary();
let subscription =
self.robust_provider.try_provider_with_timeout(primary, &operation).await;

match subscription {
Ok(sub) => {
info!("Successfully reconnected to primary provider");
self.subscription = sub;
self.current_fallback_index = None;
self.last_reconnect_attempt = None;
true
}
Err(e) => {
self.last_reconnect_attempt = Some(Instant::now());
warn!(error = %e, "Failed to reconnect to primary provider");
false
}
if let Ok(sub) = subscription {
info!("Reconnected to primary provider");
self.subscription = sub;
self.current_fallback_index = None;
self.last_reconnect_attempt = None;
true
} else {
self.last_reconnect_attempt = Some(Instant::now());
false
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
async fn switch_to_fallback(&mut self, last_error: CoreError) -> Result<(), Error> {
// If we're on a fallback, try primary first before moving to next fallback
if self.is_on_fallback() && self.try_reconnect_to_primary(true).await {
Expand Down