Skip to content

Commit 7a7aee6

Browse files
LeoPatOZ0xNeshi
andauthored
feat: clean up tracing in robust provider (#258)
Co-authored-by: Nenad <[email protected]>
1 parent 7a3999e commit 7a7aee6

File tree

2 files changed

+49
-116
lines changed

2 files changed

+49
-116
lines changed

src/robust_provider/provider.rs

Lines changed: 35 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -118,16 +118,12 @@ impl<N: Network> RobustProvider<N> {
118118
&self,
119119
number: BlockNumberOrTag,
120120
) -> Result<N::BlockResponse, Error> {
121-
info!("eth_getBlockByNumber called");
122121
let result = self
123122
.try_operation_with_failover(
124123
move |provider| async move { provider.get_block_by_number(number).await },
125124
false,
126125
)
127126
.await;
128-
if let Err(e) = &result {
129-
error!(error = %e, "eth_getByBlockNumber failed");
130-
}
131127

132128
result?.ok_or_else(|| Error::BlockNotFound(number.into()))
133129
}
@@ -144,16 +140,12 @@ impl<N: Network> RobustProvider<N> {
144140
/// `call_timeout`).
145141
/// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain.
146142
pub async fn get_block(&self, id: BlockId) -> Result<N::BlockResponse, Error> {
147-
info!("eth_getBlock called");
148143
let result = self
149144
.try_operation_with_failover(
150145
|provider| async move { provider.get_block(id).await },
151146
false,
152147
)
153148
.await;
154-
if let Err(e) = &result {
155-
error!(error = %e, "eth_getByBlockNumber failed");
156-
}
157149
result?.ok_or_else(|| Error::BlockNotFound(id))
158150
}
159151

@@ -168,18 +160,12 @@ impl<N: Network> RobustProvider<N> {
168160
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
169161
/// `call_timeout`).
170162
pub async fn get_block_number(&self) -> Result<BlockNumber, Error> {
171-
info!("eth_getBlockNumber called");
172-
let result = self
173-
.try_operation_with_failover(
174-
move |provider| async move { provider.get_block_number().await },
175-
false,
176-
)
177-
.await
178-
.map_err(Error::from);
179-
if let Err(e) = &result {
180-
error!(error = %e, "eth_getBlockNumber failed");
181-
}
182-
result
163+
self.try_operation_with_failover(
164+
move |provider| async move { provider.get_block_number().await },
165+
false,
166+
)
167+
.await
168+
.map_err(Error::from)
183169
}
184170

185171
/// Get the block number for a given block identifier.
@@ -198,16 +184,12 @@ impl<N: Network> RobustProvider<N> {
198184
/// `call_timeout`).
199185
/// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain.
200186
pub async fn get_block_number_by_id(&self, block_id: BlockId) -> Result<BlockNumber, Error> {
201-
info!("get_block_number_by_id called");
202187
let result = self
203188
.try_operation_with_failover(
204189
move |provider| async move { provider.get_block_number_by_id(block_id).await },
205190
false,
206191
)
207192
.await;
208-
if let Err(e) = &result {
209-
error!(error = %e, "get_block_number_by_id failed");
210-
}
211193
result?.ok_or_else(|| Error::BlockNotFound(block_id))
212194
}
213195

@@ -228,7 +210,6 @@ impl<N: Network> RobustProvider<N> {
228210
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
229211
/// `call_timeout`).
230212
pub async fn get_latest_confirmed(&self, confirmations: u64) -> Result<u64, Error> {
231-
info!(configurations = confirmations, "get_latest_confirmed called");
232213
let latest_block = self.get_block_number().await?;
233214
let confirmed_block = latest_block.saturating_sub(confirmations);
234215
Ok(confirmed_block)
@@ -246,16 +227,12 @@ impl<N: Network> RobustProvider<N> {
246227
/// `call_timeout`).
247228
/// * [`Error::BlockNotFound`] - if the block with the specified hash was not found on-chain.
248229
pub async fn get_block_by_hash(&self, hash: BlockHash) -> Result<N::BlockResponse, Error> {
249-
info!("eth_getBlockByHash called");
250230
let result = self
251231
.try_operation_with_failover(
252232
move |provider| async move { provider.get_block_by_hash(hash).await },
253233
false,
254234
)
255235
.await;
256-
if let Err(e) = &result {
257-
error!(error = %e, "eth_getBlockByHash failed");
258-
}
259236

260237
result?.ok_or_else(|| Error::BlockNotFound(hash.into()))
261238
}
@@ -271,18 +248,12 @@ impl<N: Network> RobustProvider<N> {
271248
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
272249
/// `call_timeout`).
273250
pub async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, Error> {
274-
info!("eth_getLogs called");
275-
let result = self
276-
.try_operation_with_failover(
277-
move |provider| async move { provider.get_logs(filter).await },
278-
false,
279-
)
280-
.await
281-
.map_err(Error::from);
282-
if let Err(e) = &result {
283-
error!(error = %e, "eth_getLogs failed");
284-
}
285-
result
251+
self.try_operation_with_failover(
252+
move |provider| async move { provider.get_logs(filter).await },
253+
false,
254+
)
255+
.await
256+
.map_err(Error::from)
286257
}
287258

288259
/// Subscribe to new block headers with automatic failover and reconnection.
@@ -301,7 +272,6 @@ impl<N: Network> RobustProvider<N> {
301272
/// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds
302273
/// `call_timeout`).
303274
pub async fn subscribe_blocks(&self) -> Result<RobustSubscription<N>, Error> {
304-
info!("eth_subscribe called");
305275
let subscription = self
306276
.try_operation_with_failover(
307277
move |provider| async move {
@@ -312,15 +282,9 @@ impl<N: Network> RobustProvider<N> {
312282
},
313283
true,
314284
)
315-
.await;
285+
.await?;
316286

317-
match subscription {
318-
Ok(sub) => Ok(RobustSubscription::new(sub, self.clone())),
319-
Err(e) => {
320-
error!(error = %e, "eth_subscribe failed");
321-
Err(e.into())
322-
}
323-
}
287+
Ok(RobustSubscription::new(subscription, self.clone()))
324288
}
325289

326290
/// Execute `operation` with exponential backoff and a total timeout.
@@ -352,26 +316,13 @@ impl<N: Network> RobustProvider<N> {
352316
let primary = self.primary();
353317
self.try_provider_with_timeout(primary, &operation)
354318
.or_else(|last_error| {
355-
self.try_fallback_providers(&operation, require_pubsub, last_error)
319+
self.try_fallback_providers_from(&operation, require_pubsub, last_error, 0)
320+
.map_ok(|(value, _)| value)
356321
})
357322
.await
358323
}
359324

360-
pub(crate) async fn try_fallback_providers<T: Debug, F, Fut>(
361-
&self,
362-
operation: F,
363-
require_pubsub: bool,
364-
last_error: CoreError,
365-
) -> Result<T, CoreError>
366-
where
367-
F: Fn(RootProvider<N>) -> Fut,
368-
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
369-
{
370-
self.try_fallback_providers_from(operation, require_pubsub, last_error, 0)
371-
.await
372-
.map(|(value, _idx)| value)
373-
}
374-
325+
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self, operation)))]
375326
pub(crate) async fn try_fallback_providers_from<T: Debug, F, Fut>(
376327
&self,
377328
operation: F,
@@ -384,34 +335,41 @@ impl<N: Network> RobustProvider<N> {
384335
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
385336
{
386337
let num_fallbacks = self.fallback_providers.len();
387-
if num_fallbacks > 0 && start_index == 0 {
388-
info!("Primary provider failed, trying fallback provider(s)");
389-
}
390338

391339
let fallback_providers = self.fallback_providers.iter().enumerate().skip(start_index);
392340
for (fallback_idx, provider) in fallback_providers {
393341
if require_pubsub && !Self::supports_pubsub(provider) {
394-
info!(
395-
fallback_index = fallback_idx + 1,
342+
trace!(
343+
provider_num = fallback_idx + 1,
396344
"Fallback provider doesn't support pubsub, skipping"
397345
);
398346
continue;
399347
}
400-
info!(fallback_index = fallback_idx + 1, "Attempting fallback provider");
348+
349+
trace!(
350+
fallback_provider_index = fallback_idx + 1,
351+
total_num_fallbacks = num_fallbacks,
352+
"Attempting fallback provider"
353+
);
401354

402355
match self.try_provider_with_timeout(provider, &operation).await {
403356
Ok(value) => {
404-
info!(provider_num = fallback_idx + 1, "Fallback provider succeeded");
357+
info!(
358+
provider_num = fallback_idx + 1,
359+
total_fallbacks = num_fallbacks,
360+
"Switched to fallback provider"
361+
);
405362
return Ok((value, fallback_idx));
406363
}
407364
Err(e) => {
408-
error!(provider_num = fallback_idx + 1, err = %e, "Fallback provider failed");
365+
tracing::warn!(provider_num = fallback_idx + 1, err = %e, "Fallback provider failed");
409366
last_error = e;
410367
}
411368
}
412369
}
413-
// All fallbacks failed / skipped, return the last error
414-
error!("All providers failed or timed out - returning the last providers attempt's error");
370+
371+
tracing::error!("All providers failed");
372+
415373
Err(last_error)
416374
}
417375

@@ -431,12 +389,7 @@ impl<N: Network> RobustProvider<N> {
431389

432390
timeout(
433391
self.call_timeout,
434-
(|| operation(provider.clone()))
435-
.retry(retry_strategy)
436-
.notify(|err: &RpcError<TransportErrorKind>, dur: Duration| {
437-
info!(error = %err, duration_ms = dur.as_millis(), "RPC error retrying");
438-
})
439-
.sleep(tokio::time::sleep),
392+
(|| operation(provider.clone())).retry(retry_strategy).sleep(tokio::time::sleep),
440393
)
441394
.await
442395
.map_err(CoreError::from)?

src/robust_provider/subscription.rs

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,8 @@ impl From<CoreError> for Error {
4343
impl From<RecvError> for Error {
4444
fn from(err: RecvError) -> Self {
4545
match err {
46-
RecvError::Closed => {
47-
error!("Provider closed the subscription channel");
48-
Error::Closed
49-
}
50-
RecvError::Lagged(count) => {
51-
error!(skipped = count, "Receiver lagged");
52-
Error::Lagged(count)
53-
}
46+
RecvError::Closed => Error::Closed,
47+
RecvError::Lagged(count) => Error::Lagged(count),
5448
}
5549
}
5650
}
@@ -122,17 +116,7 @@ impl<N: Network> RobustSubscription<N> {
122116
}
123117
return Ok(header);
124118
}
125-
Err(recv_error) => {
126-
match recv_error {
127-
RecvError::Closed => {
128-
error!("Provider closed the subscription channel");
129-
}
130-
RecvError::Lagged(count) => {
131-
error!(skipped = count, "Receiver lagged");
132-
}
133-
}
134-
return Err(recv_error.into());
135-
}
119+
Err(recv_error) => return Err(recv_error.into()),
136120
},
137121
Err(elapsed_err) => {
138122
warn!(
@@ -148,6 +132,7 @@ impl<N: Network> RobustSubscription<N> {
148132

149133
/// Try to reconnect to the primary provider if enough time has elapsed.
150134
/// Returns true if reconnection was successful, false if it's not time yet or if it failed.
135+
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
151136
async fn try_reconnect_to_primary(&mut self, force: bool) -> bool {
152137
// Check if we should attempt reconnection
153138
let should_reconnect = force ||
@@ -162,31 +147,26 @@ impl<N: Network> RobustSubscription<N> {
162147
return false;
163148
}
164149

165-
info!("Attempting to reconnect to primary provider");
166-
167150
let operation =
168151
move |provider: RootProvider<N>| async move { provider.subscribe_blocks().await };
169152

170153
let primary = self.robust_provider.primary();
171154
let subscription =
172155
self.robust_provider.try_provider_with_timeout(primary, &operation).await;
173156

174-
match subscription {
175-
Ok(sub) => {
176-
info!("Successfully reconnected to primary provider");
177-
self.subscription = sub;
178-
self.current_fallback_index = None;
179-
self.last_reconnect_attempt = None;
180-
true
181-
}
182-
Err(e) => {
183-
self.last_reconnect_attempt = Some(Instant::now());
184-
warn!(error = %e, "Failed to reconnect to primary provider");
185-
false
186-
}
157+
if let Ok(sub) = subscription {
158+
info!("Reconnected to primary provider");
159+
self.subscription = sub;
160+
self.current_fallback_index = None;
161+
self.last_reconnect_attempt = None;
162+
true
163+
} else {
164+
self.last_reconnect_attempt = Some(Instant::now());
165+
false
187166
}
188167
}
189168

169+
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
190170
async fn switch_to_fallback(&mut self, last_error: CoreError) -> Result<(), Error> {
191171
// If we're on a fallback, try primary first before moving to next fallback
192172
if self.is_on_fallback() && self.try_reconnect_to_primary(true).await {

0 commit comments

Comments
 (0)