Skip to content

Commit 8dc8990

Browse files
committed
tracing fixes proposal
1 parent 8cfad38 commit 8dc8990

File tree

10 files changed

+165
-158
lines changed

10 files changed

+165
-158
lines changed

src/block_range_scanner.rs

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,10 @@ use crate::{
7373
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
7474
};
7575

76-
#[allow(unused_imports)]
7776
use alloy::{
7877
consensus::BlockHeader,
7978
eips::{BlockId, BlockNumberOrTag},
80-
network::{BlockResponse, Network, primitives::HeaderResponse},
79+
network::{BlockResponse, Network},
8180
primitives::BlockNumber,
8281
};
8382

@@ -288,47 +287,46 @@ impl<N: Network> Service<N> {
288287
}
289288

290289
pub async fn run(mut self) {
291-
opt_info!("Starting subscription service");
290+
info!("Starting subscription service");
292291

293292
while !self.shutdown {
294293
tokio::select! {
295294
cmd = self.command_receiver.recv() => {
296295
if let Some(command) = cmd {
297-
#[allow(clippy::used_underscore_binding)]
298-
if let Err(_e) = self.handle_command(command).await {
299-
opt_error!("Command handling error: {}", _e);
296+
if let Err(e) = self.handle_command(command).await {
297+
error!(error = %e, "Command handling error");
300298
self.error_count += 1;
301299
}
302300
} else {
303-
opt_warn!("Command channel closed, shutting down");
301+
warn!("Command channel closed, shutting down");
304302
break;
305303
}
306304
}
307305
}
308306
}
309307

310-
opt_info!("Subscription service stopped");
308+
info!("Subscription service stopped");
311309
}
312310

313311
async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> {
314312
match command {
315313
Command::StreamLive { sender, block_confirmations, response } => {
316-
opt_info!("Starting live stream");
314+
info!("Starting live stream");
317315
let result = self.handle_live(block_confirmations, sender).await;
318316
let _ = response.send(result);
319317
}
320318
Command::StreamHistorical { sender, start_id, end_id, response } => {
321-
opt_info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream");
319+
info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream");
322320
let result = self.handle_historical(start_id, end_id, sender).await;
323321
let _ = response.send(result);
324322
}
325323
Command::StreamFrom { sender, start_id, block_confirmations, response } => {
326-
opt_info!(start_id = ?start_id, "Starting streaming from");
324+
info!(start_id = ?start_id, "Starting streaming from");
327325
let result = self.handle_sync(start_id, block_confirmations, sender).await;
328326
let _ = response.send(result);
329327
}
330328
Command::Rewind { sender, start_id, end_id, response } => {
331-
opt_info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind");
329+
info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind");
332330
let result = self.handle_rewind(start_id, end_id, sender).await;
333331
let _ = response.send(result);
334332
}
@@ -353,7 +351,7 @@ impl<N: Network> Service<N> {
353351

354352
let subscription = self.provider.subscribe_blocks().await?;
355353

356-
opt_info!("WebSocket connected for live blocks");
354+
info!("WebSocket connected for live blocks");
357355

358356
tokio::spawn(async move {
359357
let mut reorg_handler =
@@ -396,7 +394,7 @@ impl<N: Network> Service<N> {
396394
_ => (start_block_num, end_block_num),
397395
};
398396

399-
opt_info!(
397+
info!(
400398
start_block = start_block_num,
401399
end_block = end_block_num,
402400
"Normalized the block range"
@@ -486,7 +484,7 @@ impl<N: Network> Service<N> {
486484
{
487485
Ok(block) => block,
488486
Err(e) => {
489-
opt_error!(error = %e, "Failed to get finalized block");
487+
error!(error = %e, "Failed to get finalized block");
490488
_ = sender.try_stream(e).await;
491489
return;
492490
}
@@ -508,7 +506,7 @@ impl<N: Network> Service<N> {
508506
let reorg = match reorg_handler.check(&tip).await {
509507
Ok(opt) => opt,
510508
Err(e) => {
511-
opt_error!(error = %e, "Terminal RPC call error, shutting down");
509+
error!(error = %e, "Terminal RPC call error, shutting down");
512510
_ = sender.try_stream(e).await;
513511
return;
514512
}
@@ -529,7 +527,7 @@ impl<N: Network> Service<N> {
529527
}
530528
}
531529

532-
opt_info!(batch_count = iter.batch_count(), "Rewind completed");
530+
info!(batch_count = iter.batch_count(), "Rewind completed");
533531
}
534532

535533
/// Handles re-scanning of reorged blocks.
@@ -544,9 +542,9 @@ impl<N: Network> Service<N> {
544542
) -> bool {
545543
let tip_number = tip.header().number();
546544
let common_ancestor = common_ancestor.header().number();
547-
opt_info!(
545+
info!(
548546
block_number = %tip_number,
549-
hash = %tip.header().hash(),
547+
hash = %alloy::network::primitives::HeaderResponse::hash(tip.header()),
550548
common_ancestor = %common_ancestor,
551549
"Reorg detected"
552550
);
@@ -559,13 +557,10 @@ impl<N: Network> Service<N> {
559557
*tip = match provider.get_block_by_number(tip_number.into()).await {
560558
Ok(block) => block,
561559
Err(e) => {
562-
#[allow(clippy::if_same_then_else)]
563560
if matches!(e, crate::robust_provider::Error::BlockNotFound(_)) {
564-
opt_error!(
565-
"Unexpected error: pre-reorg chain tip should exist on a reorged chain"
566-
);
561+
error!("Unexpected error: pre-reorg chain tip should exist on a reorged chain");
567562
} else {
568-
opt_error!(error = %e, "Terminal RPC call error, shutting down");
563+
error!(error = %e, "Terminal RPC call error, shutting down");
569564
}
570565
_ = sender.try_stream(e).await;
571566
return false;

src/block_range_scanner/common.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub(crate) async fn stream_live_blocks<N: Network>(
6666
)
6767
.await;
6868

69-
opt_warn!("Live block subscription ended");
69+
warn!("Live block subscription ended");
7070
}
7171

7272
async fn get_first_block<
@@ -84,9 +84,7 @@ async fn get_first_block<
8484
subscription::Error::Lagged(_) => {
8585
// scanner already accounts for skipped block numbers
8686
// next block will be the actual incoming block
87-
opt_info!(
88-
"Skipping Error::Lagged, next block should be the first live block"
89-
);
87+
info!("Skipping Error::Lagged, next block should be the first live block");
9088
}
9189
subscription::Error::Timeout => {
9290
_ = sender.try_stream(ScannerError::Timeout).await;
@@ -133,7 +131,7 @@ async fn initialize_live_streaming_state<N: Network>(
133131
reorg_handler: &mut ReorgHandler<N>,
134132
) -> Option<LiveStreamingState<N>> {
135133
let incoming_block_num = first_block.number();
136-
opt_info!(block_number = incoming_block_num, "Received first block header");
134+
info!(block_number = incoming_block_num, "Received first block header");
137135

138136
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
139137

@@ -177,7 +175,7 @@ async fn stream_blocks_continuously<
177175
let incoming_block = match incoming_block {
178176
Ok(block) => block,
179177
Err(e) => {
180-
opt_error!(error = %e, "Error receiving block from stream");
178+
error!(error = %e, "Error receiving block from stream");
181179
match e {
182180
subscription::Error::Lagged(_) => {
183181
// scanner already accounts for skipped block numbers
@@ -201,7 +199,7 @@ async fn stream_blocks_continuously<
201199
};
202200

203201
let incoming_block_num = incoming_block.number();
204-
opt_info!(block_number = incoming_block_num, "Received block header");
202+
info!(block_number = incoming_block_num, "Received block header");
205203

206204
let Some(previous_batch_end) = state.previous_batch_end.as_ref() else {
207205
// previously detected reorg wasn't fully handled
@@ -211,7 +209,7 @@ async fn stream_blocks_continuously<
211209
let common_ancestor = match reorg_handler.check(previous_batch_end).await {
212210
Ok(reorg_opt) => reorg_opt,
213211
Err(e) => {
214-
opt_error!(error = %e, "Failed to perform reorg check");
212+
error!(error = %e, "Failed to perform reorg check");
215213
_ = sender.try_stream(e).await;
216214
return;
217215
}
@@ -261,7 +259,7 @@ async fn handle_reorg_detected<N: Network>(
261259
// Reset streaming position based on common ancestor
262260
if ancestor_num < stream_start {
263261
// Reorg went before our starting point - restart from stream_start
264-
opt_info!(
262+
info!(
265263
ancestor_block = ancestor_num,
266264
stream_start = stream_start,
267265
"Reorg detected before stream start, resetting to stream start"
@@ -270,7 +268,7 @@ async fn handle_reorg_detected<N: Network>(
270268
state.previous_batch_end = None;
271269
} else {
272270
// Resume from after the common ancestor
273-
opt_info!(ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor");
271+
info!(ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor");
274272
state.batch_start = ancestor_num + 1;
275273
state.previous_batch_end = Some(common_ancestor);
276274
}
@@ -336,12 +334,12 @@ pub(crate) async fn stream_historical_range<N: Network>(
336334
provider: &RobustProvider<N>,
337335
reorg_handler: &mut ReorgHandler<N>,
338336
) -> Option<()> {
339-
opt_info!("Getting finalized block number");
337+
info!("Getting finalized block number");
340338
let finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await
341339
{
342340
Ok(block) => block,
343341
Err(e) => {
344-
opt_error!(error = %e, "Failed to get finalized block");
342+
error!(error = %e, "Failed to get finalized block");
345343
_ = sender.try_stream(e).await;
346344
return None;
347345
}
@@ -407,7 +405,7 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
407405
let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
408406
Ok(block) => block,
409407
Err(e) => {
410-
opt_error!(batch_start = batch.start(), batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
408+
error!(batch_start = batch.start(), batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
411409
_ = sender.try_stream(e).await;
412410
return None;
413411
}
@@ -420,7 +418,7 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
420418
let reorged_opt = match reorg_handler.check(&batch_end).await {
421419
Ok(opt) => opt,
422420
Err(e) => {
423-
opt_error!(error = %e, "Failed to perform reorg check");
421+
error!(error = %e, "Failed to perform reorg check");
424422
_ = sender.try_stream(e).await;
425423
return None;
426424
}
@@ -437,6 +435,6 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
437435
last_batch_end = Some(batch_end);
438436
}
439437

440-
opt_error!(batch_count = iter.batch_count(), "Historical sync completed");
438+
error!(batch_count = iter.batch_count(), "Historical sync completed");
441439
last_batch_end
442440
}

src/block_range_scanner/reorg_handler.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,22 @@ impl<N: Network> ReorgHandler<N> {
6262
block: &N::BlockResponse,
6363
) -> Result<Option<N::BlockResponse>, ScannerError> {
6464
let block = block.header();
65-
opt_info!(block_hash = %block.hash(), block_number = block.number(), "Checking if block was reorged");
65+
info!(block_hash = %block.hash(), block_number = block.number(), "Checking if block was reorged");
6666

6767
if !self.reorg_detected(block).await? {
6868
let block_hash = block.hash();
69-
opt_info!(block_hash = %block_hash, block_number = block.number(), "No reorg detected");
69+
info!(block_hash = %block_hash, block_number = block.number(), "No reorg detected");
7070
// store the incoming block's hash for future reference
7171
if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) {
7272
self.buffer.push(block_hash);
7373
}
7474
return Ok(None);
7575
}
7676

77-
opt_info!("Reorg detected, searching for common ancestor");
77+
info!("Reorg detected, searching for common ancestor");
7878

7979
while let Some(&block_hash) = self.buffer.back() {
80-
opt_info!(block_hash = %block_hash, "Checking if block exists on-chain");
80+
info!(block_hash = %block_hash, "Checking if block exists on-chain");
8181
match self.provider.get_block_by_hash(block_hash).await {
8282
Ok(common_ancestor) => return self.return_common_ancestor(common_ancestor).await,
8383
Err(robust_provider::Error::BlockNotFound(_)) => {
@@ -93,13 +93,12 @@ impl<N: Network> ReorgHandler<N> {
9393
// no need to store finalized block's hash in the buffer, as it is returned by default only
9494
// if not buffered hashes exist on-chain
9595

96-
opt_warn!("Possible deep reorg detected, setting finalized block as common ancestor");
96+
warn!("Possible deep reorg detected, setting finalized block as common ancestor");
9797

9898
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
9999

100-
#[allow(clippy::used_underscore_binding)]
101-
let _header = finalized.header();
102-
opt_info!(finalized_hash = %_header.hash(), block_number = _header.number(), "Finalized block set as common ancestor");
100+
let header = finalized.header();
101+
info!(finalized_hash = %header.hash(), block_number = header.number(), "Finalized block set as common ancestor");
103102

104103
Ok(Some(finalized))
105104
}
@@ -120,10 +119,10 @@ impl<N: Network> ReorgHandler<N> {
120119
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
121120
let finalized_header = finalized.header();
122121
let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() {
123-
opt_info!(common_ancestor = %common_ancestor_header.hash(), block_number = common_ancestor_header.number(), "Common ancestor found");
122+
info!(common_ancestor = %common_ancestor_header.hash(), block_number = common_ancestor_header.number(), "Common ancestor found");
124123
common_ancestor
125124
} else {
126-
opt_warn!(
125+
warn!(
127126
finalized_hash = %finalized_header.hash(), block_number = finalized_header.number(), "Possible deep reorg detected, using finalized block as common ancestor"
128127
);
129128
// all buffered blocks are finalized, so no more need to track them

src/block_range_scanner/sync_handler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,14 @@ impl<N: Network> SyncHandler<N> {
4545

4646
match sync_state {
4747
SyncState::AlreadyLive { start_block } => {
48-
opt_info!(
48+
info!(
4949
start_block = start_block,
5050
"Start block is beyond confirmed tip, waiting until starting block is confirmed before starting live stream"
5151
);
5252
self.spawn_live_only(start_block).await?;
5353
}
5454
SyncState::NeedsCatchup { start_block, confirmed_tip } => {
55-
opt_info!(
55+
info!(
5656
start_block = start_block,
5757
confirmed_tip = confirmed_tip,
5858
"Start block is behind confirmed tip, catching up then transitioning to live"
@@ -131,7 +131,7 @@ impl<N: Network> SyncHandler<N> {
131131
return; // channel closed
132132
}
133133
Err(e) => {
134-
opt_error!(error = %e, "Error during historical catchup, shutting down");
134+
error!(error = %e, "Error during historical catchup, shutting down");
135135
_ = sender.try_stream(e).await;
136136
return;
137137
}
@@ -182,7 +182,7 @@ impl<N: Network> SyncHandler<N> {
182182
confirmed_tip = latest.saturating_sub(block_confirmations);
183183
}
184184

185-
opt_info!("Historical catchup complete, ready to transition to live");
185+
info!("Historical catchup complete, ready to transition to live");
186186

187187
Ok(Some(start_block))
188188
}
@@ -199,7 +199,7 @@ impl<N: Network> SyncHandler<N> {
199199
let subscription = match provider.subscribe_blocks().await {
200200
Ok(sub) => sub,
201201
Err(e) => {
202-
opt_error!(error = %e, "Error subscribing to live blocks, shutting down");
202+
error!(error = %e, "Error subscribing to live blocks, shutting down");
203203
_ = sender.try_stream(e).await;
204204
return;
205205
}
@@ -209,7 +209,7 @@ impl<N: Network> SyncHandler<N> {
209209
return;
210210
}
211211

212-
opt_info!("Successfully transitioned from historical to live streaming");
212+
info!("Successfully transitioned from historical to live streaming");
213213

214214
common::stream_live_blocks(
215215
start_block,

0 commit comments

Comments
 (0)