diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index 269bfaf0..f749449a 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -118,16 +118,12 @@ impl RobustProvider { &self, number: BlockNumberOrTag, ) -> Result { - info!("eth_getBlockByNumber called"); let result = self .try_operation_with_failover( move |provider| async move { provider.get_block_by_number(number).await }, false, ) .await; - if let Err(e) = &result { - error!(error = %e, "eth_getByBlockNumber failed"); - } result?.ok_or_else(|| Error::BlockNotFound(number.into())) } @@ -144,16 +140,12 @@ impl RobustProvider { /// `call_timeout`). /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block(&self, id: BlockId) -> Result { - info!("eth_getBlock called"); let result = self .try_operation_with_failover( |provider| async move { provider.get_block(id).await }, false, ) .await; - if let Err(e) = &result { - error!(error = %e, "eth_getByBlockNumber failed"); - } result?.ok_or_else(|| Error::BlockNotFound(id)) } @@ -168,18 +160,12 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn get_block_number(&self) -> Result { - info!("eth_getBlockNumber called"); - let result = self - .try_operation_with_failover( - move |provider| async move { provider.get_block_number().await }, - false, - ) - .await - .map_err(Error::from); - if let Err(e) = &result { - error!(error = %e, "eth_getBlockNumber failed"); - } - result + self.try_operation_with_failover( + move |provider| async move { provider.get_block_number().await }, + false, + ) + .await + .map_err(Error::from) } /// Get the block number for a given block identifier. @@ -198,16 +184,12 @@ impl RobustProvider { /// `call_timeout`). /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block_number_by_id(&self, block_id: BlockId) -> Result { - info!("get_block_number_by_id called"); let result = self .try_operation_with_failover( move |provider| async move { provider.get_block_number_by_id(block_id).await }, false, ) .await; - if let Err(e) = &result { - error!(error = %e, "get_block_number_by_id failed"); - } result?.ok_or_else(|| Error::BlockNotFound(block_id)) } @@ -228,7 +210,6 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn get_latest_confirmed(&self, confirmations: u64) -> Result { - info!(configurations = confirmations, "get_latest_confirmed called"); let latest_block = self.get_block_number().await?; let confirmed_block = latest_block.saturating_sub(confirmations); Ok(confirmed_block) @@ -246,16 +227,12 @@ impl RobustProvider { /// `call_timeout`). /// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain. pub async fn get_block_by_hash(&self, hash: BlockHash) -> Result { - info!("eth_getBlockByHash called"); let result = self .try_operation_with_failover( move |provider| async move { provider.get_block_by_hash(hash).await }, false, ) .await; - if let Err(e) = &result { - error!(error = %e, "eth_getBlockByHash failed"); - } result?.ok_or_else(|| Error::BlockNotFound(hash.into())) } @@ -271,18 +248,12 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn get_logs(&self, filter: &Filter) -> Result, Error> { - info!("eth_getLogs called"); - let result = self - .try_operation_with_failover( - move |provider| async move { provider.get_logs(filter).await }, - false, - ) - .await - .map_err(Error::from); - if let Err(e) = &result { - error!(error = %e, "eth_getLogs failed"); - } - result + self.try_operation_with_failover( + move |provider| async move { provider.get_logs(filter).await }, + false, + ) + .await + .map_err(Error::from) } /// Subscribe to new block headers with automatic failover and reconnection. @@ -301,7 +272,6 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn subscribe_blocks(&self) -> Result, Error> { - info!("eth_subscribe called"); let subscription = self .try_operation_with_failover( move |provider| async move { @@ -312,15 +282,9 @@ impl RobustProvider { }, true, ) - .await; + .await?; - match subscription { - Ok(sub) => Ok(RobustSubscription::new(sub, self.clone())), - Err(e) => { - error!(error = %e, "eth_subscribe failed"); - Err(e.into()) - } - } + Ok(RobustSubscription::new(subscription, self.clone())) } /// Execute `operation` with exponential backoff and a total timeout. @@ -352,26 +316,13 @@ impl RobustProvider { let primary = self.primary(); self.try_provider_with_timeout(primary, &operation) .or_else(|last_error| { - self.try_fallback_providers(&operation, require_pubsub, last_error) + self.try_fallback_providers_from(&operation, require_pubsub, last_error, 0) + .map_ok(|(value, _)| value) }) .await } - pub(crate) async fn try_fallback_providers( - &self, - operation: F, - require_pubsub: bool, - last_error: CoreError, - ) -> Result - where - F: Fn(RootProvider) -> Fut, - Fut: Future>>, - { - self.try_fallback_providers_from(operation, require_pubsub, last_error, 0) - .await - .map(|(value, _idx)| value) - } - + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self, operation)))] pub(crate) async fn try_fallback_providers_from( &self, operation: F, @@ -384,34 +335,41 @@ impl RobustProvider { Fut: Future>>, { let num_fallbacks = self.fallback_providers.len(); - if num_fallbacks > 0 && start_index == 0 { - info!("Primary provider failed, trying fallback provider(s)"); - } let fallback_providers = self.fallback_providers.iter().enumerate().skip(start_index); for (fallback_idx, provider) in fallback_providers { if require_pubsub && !Self::supports_pubsub(provider) { - info!( - fallback_index = fallback_idx + 1, + trace!( + provider_num = fallback_idx + 1, "Fallback provider doesn't support pubsub, skipping" ); continue; } - info!(fallback_index = fallback_idx + 1, "Attempting fallback provider"); + + trace!( + fallback_provider_index = fallback_idx + 1, + total_num_fallbacks = num_fallbacks, + "Attempting fallback provider" + ); match self.try_provider_with_timeout(provider, &operation).await { Ok(value) => { - info!(provider_num = fallback_idx + 1, "Fallback provider succeeded"); + info!( + provider_num = fallback_idx + 1, + total_fallbacks = num_fallbacks, + "Switched to fallback provider" + ); return Ok((value, fallback_idx)); } Err(e) => { - error!(provider_num = fallback_idx + 1, err = %e, "Fallback provider failed"); + tracing::warn!(provider_num = fallback_idx + 1, err = %e, "Fallback provider failed"); last_error = e; } } } - // All fallbacks failed / skipped, return the last error - error!("All providers failed or timed out - returning the last providers attempt's error"); + + tracing::error!("All providers failed"); + Err(last_error) } @@ -431,12 +389,7 @@ impl RobustProvider { timeout( self.call_timeout, - (|| operation(provider.clone())) - .retry(retry_strategy) - .notify(|err: &RpcError, dur: Duration| { - info!(error = %err, duration_ms = dur.as_millis(), "RPC error retrying"); - }) - .sleep(tokio::time::sleep), + (|| operation(provider.clone())).retry(retry_strategy).sleep(tokio::time::sleep), ) .await .map_err(CoreError::from)? diff --git a/src/robust_provider/subscription.rs b/src/robust_provider/subscription.rs index a624ce6c..11ff4b70 100644 --- a/src/robust_provider/subscription.rs +++ b/src/robust_provider/subscription.rs @@ -43,14 +43,8 @@ impl From for Error { impl From for Error { fn from(err: RecvError) -> Self { match err { - RecvError::Closed => { - error!("Provider closed the subscription channel"); - Error::Closed - } - RecvError::Lagged(count) => { - error!(skipped = count, "Receiver lagged"); - Error::Lagged(count) - } + RecvError::Closed => Error::Closed, + RecvError::Lagged(count) => Error::Lagged(count), } } } @@ -122,17 +116,7 @@ impl RobustSubscription { } return Ok(header); } - Err(recv_error) => { - match recv_error { - RecvError::Closed => { - error!("Provider closed the subscription channel"); - } - RecvError::Lagged(count) => { - error!(skipped = count, "Receiver lagged"); - } - } - return Err(recv_error.into()); - } + Err(recv_error) => return Err(recv_error.into()), }, Err(elapsed_err) => { warn!( @@ -148,6 +132,7 @@ impl RobustSubscription { /// Try to reconnect to the primary provider if enough time has elapsed. /// Returns true if reconnection was successful, false if it's not time yet or if it failed. + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] async fn try_reconnect_to_primary(&mut self, force: bool) -> bool { // Check if we should attempt reconnection let should_reconnect = force || @@ -162,8 +147,6 @@ impl RobustSubscription { return false; } - info!("Attempting to reconnect to primary provider"); - let operation = move |provider: RootProvider| async move { provider.subscribe_blocks().await }; @@ -171,22 +154,19 @@ impl RobustSubscription { let subscription = self.robust_provider.try_provider_with_timeout(primary, &operation).await; - match subscription { - Ok(sub) => { - info!("Successfully reconnected to primary provider"); - self.subscription = sub; - self.current_fallback_index = None; - self.last_reconnect_attempt = None; - true - } - Err(e) => { - self.last_reconnect_attempt = Some(Instant::now()); - warn!(error = %e, "Failed to reconnect to primary provider"); - false - } + if let Ok(sub) = subscription { + info!("Reconnected to primary provider"); + self.subscription = sub; + self.current_fallback_index = None; + self.last_reconnect_attempt = None; + true + } else { + self.last_reconnect_attempt = Some(Instant::now()); + false } } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] async fn switch_to_fallback(&mut self, last_error: CoreError) -> Result<(), Error> { // If we're on a fallback, try primary first before moving to next fallback if self.is_on_fallback() && self.try_reconnect_to_primary(true).await {