Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ workspace = true

[features]
test-utils = []
tracing = []
42 changes: 23 additions & 19 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ use crate::{
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
};

#[allow(unused_imports)]
use alloy::{
consensus::BlockHeader,
eips::{BlockId, BlockNumberOrTag},
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::BlockNumber,
};
use tracing::{debug, error, info, warn};

mod common;
mod reorg_handler;
Expand Down Expand Up @@ -263,46 +263,47 @@ impl<N: Network> Service<N> {
}

pub async fn run(mut self) {
info!("Starting subscription service");
opt_info!("Starting subscription service");

while !self.shutdown {
tokio::select! {
cmd = self.command_receiver.recv() => {
if let Some(command) = cmd {
if let Err(e) = self.handle_command(command).await {
error!("Command handling error: {}", e);
#[allow(clippy::used_underscore_binding)]
if let Err(_e) = self.handle_command(command).await {
opt_error!("Command handling error: {}", _e);
self.error_count += 1;
}
} else {
warn!("Command channel closed, shutting down");
opt_warn!("Command channel closed, shutting down");
break;
}
}
}
}

info!("Subscription service stopped");
opt_info!("Subscription service stopped");
}

async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> {
match command {
Command::StreamLive { sender, block_confirmations, response } => {
info!("Starting live stream");
opt_info!("Starting live stream");
let result = self.handle_live(block_confirmations, sender).await;
let _ = response.send(result);
}
Command::StreamHistorical { sender, start_id, end_id, response } => {
info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream");
opt_info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream");
let result = self.handle_historical(start_id, end_id, sender).await;
let _ = response.send(result);
}
Command::StreamFrom { sender, start_id, block_confirmations, response } => {
info!(start_id = ?start_id, "Starting streaming from");
opt_info!(start_id = ?start_id, "Starting streaming from");
let result = self.handle_sync(start_id, block_confirmations, sender).await;
let _ = response.send(result);
}
Command::Rewind { sender, start_id, end_id, response } => {
info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind");
opt_info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind");
let result = self.handle_rewind(start_id, end_id, sender).await;
let _ = response.send(result);
}
Expand All @@ -327,7 +328,7 @@ impl<N: Network> Service<N> {

let subscription = self.provider.subscribe_blocks().await?;

info!("WebSocket connected for live blocks");
opt_info!("WebSocket connected for live blocks");

tokio::spawn(async move {
let mut reorg_handler =
Expand Down Expand Up @@ -370,7 +371,7 @@ impl<N: Network> Service<N> {
_ => (start_block_num, end_block_num),
};

info!(
opt_info!(
start_block = start_block_num,
end_block = end_block_num,
"Normalized the block range"
Expand Down Expand Up @@ -462,7 +463,7 @@ impl<N: Network> Service<N> {
{
Ok(block) => block,
Err(e) => {
error!(error = %e, "Failed to get finalized block");
opt_error!(error = %e, "Failed to get finalized block");
_ = sender.try_stream(e).await;
return;
}
Expand All @@ -485,7 +486,7 @@ impl<N: Network> Service<N> {

batch_count += 1;
if batch_count % 10 == 0 {
debug!(batch_count = batch_count, "Processed rewind batches");
opt_debug!(batch_count = batch_count, "Processed rewind batches");
}

// check early if end of stream achieved to avoid subtraction overflow when `to
Expand All @@ -498,7 +499,7 @@ impl<N: Network> Service<N> {
let reorg = match reorg_handler.check(&tip).await {
Ok(opt) => opt,
Err(e) => {
error!(error = %e, "Terminal RPC call error, shutting down");
opt_error!(error = %e, "Terminal RPC call error, shutting down");
_ = sender.try_stream(e).await;
return;
}
Expand All @@ -521,7 +522,7 @@ impl<N: Network> Service<N> {
batch_from = batch_to - 1;
}

info!(batch_count = batch_count, "Rewind completed");
opt_info!(batch_count = batch_count, "Rewind completed");
}

/// Handles re-scanning of reorged blocks.
Expand All @@ -536,7 +537,7 @@ impl<N: Network> Service<N> {
) -> bool {
let tip_number = tip.header().number();
let common_ancestor = common_ancestor.header().number();
info!(
opt_info!(
block_number = %tip_number,
hash = %tip.header().hash(),
common_ancestor = %common_ancestor,
Expand All @@ -551,10 +552,13 @@ impl<N: Network> Service<N> {
*tip = match provider.get_block_by_number(tip_number.into()).await {
Ok(block) => block,
Err(e) => {
#[allow(clippy::if_same_then_else)]
if matches!(e, crate::robust_provider::Error::BlockNotFound(_)) {
error!("Unexpected error: pre-reorg chain tip should exist on a reorged chain");
opt_error!(
"Unexpected error: pre-reorg chain tip should exist on a reorged chain"
);
} else {
error!(error = %e, "Terminal RPC call error, shutting down");
opt_error!(error = %e, "Terminal RPC call error, shutting down");
}
_ = sender.try_stream(e).await;
return false;
Expand Down
31 changes: 16 additions & 15 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use alloy::{
network::{BlockResponse, Network},
primitives::BlockNumber,
};
use tracing::{debug, error, info, warn};

#[allow(clippy::too_many_arguments)]
pub(crate) async fn stream_live_blocks<N: Network>(
Expand Down Expand Up @@ -67,7 +66,7 @@ pub(crate) async fn stream_live_blocks<N: Network>(
)
.await;

warn!("Live block subscription ended");
opt_warn!("Live block subscription ended");
}

async fn get_first_block<
Expand All @@ -85,7 +84,9 @@ async fn get_first_block<
subscription::Error::Lagged(_) => {
// scanner already accounts for skipped block numbers
// next block will be the actual incoming block
info!("Skipping Error::Lagged, next block should be the first live block");
opt_info!(
"Skipping Error::Lagged, next block should be the first live block"
);
}
subscription::Error::Timeout => {
_ = sender.try_stream(ScannerError::Timeout).await;
Expand Down Expand Up @@ -132,7 +133,7 @@ async fn initialize_live_streaming_state<N: Network>(
reorg_handler: &mut ReorgHandler<N>,
) -> Option<LiveStreamingState<N>> {
let incoming_block_num = first_block.number();
info!(block_number = incoming_block_num, "Received first block header");
opt_info!(block_number = incoming_block_num, "Received first block header");

let confirmed = incoming_block_num.saturating_sub(block_confirmations);

Expand Down Expand Up @@ -176,7 +177,7 @@ async fn stream_blocks_continuously<
let incoming_block = match incoming_block {
Ok(block) => block,
Err(e) => {
error!(error = %e, "Error receiving block from stream");
opt_error!(error = %e, "Error receiving block from stream");
match e {
subscription::Error::Lagged(_) => {
// scanner already accounts for skipped block numbers
Expand All @@ -200,7 +201,7 @@ async fn stream_blocks_continuously<
};

let incoming_block_num = incoming_block.number();
info!(block_number = incoming_block_num, "Received block header");
opt_info!(block_number = incoming_block_num, "Received block header");

let Some(previous_batch_end) = state.previous_batch_end.as_ref() else {
// previously detected reorg wasn't fully handled
Expand All @@ -210,7 +211,7 @@ async fn stream_blocks_continuously<
let common_ancestor = match reorg_handler.check(previous_batch_end).await {
Ok(reorg_opt) => reorg_opt,
Err(e) => {
error!(error = %e, "Failed to perform reorg check");
opt_error!(error = %e, "Failed to perform reorg check");
_ = sender.try_stream(e).await;
return;
}
Expand Down Expand Up @@ -260,7 +261,7 @@ async fn handle_reorg_detected<N: Network>(
// Reset streaming position based on common ancestor
if ancestor_num < stream_start {
// Reorg went before our starting point - restart from stream_start
info!(
opt_info!(
ancestor_block = ancestor_num,
stream_start = stream_start,
"Reorg detected before stream start, resetting to stream start"
Expand All @@ -269,7 +270,7 @@ async fn handle_reorg_detected<N: Network>(
state.previous_batch_end = None;
} else {
// Resume from after the common ancestor
info!(ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor");
opt_info!(ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor");
state.batch_start = ancestor_num + 1;
state.previous_batch_end = Some(common_ancestor);
}
Expand Down Expand Up @@ -335,12 +336,12 @@ pub(crate) async fn stream_historical_range<N: Network>(
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) -> Option<()> {
info!("Getting finalized block number");
opt_info!("Getting finalized block number");
let finalized = match provider.get_block_number_by_id(BlockNumberOrTag::Finalized.into()).await
{
Ok(block) => block,
Err(e) => {
error!(error = %e, "Failed to get finalized block");
opt_error!(error = %e, "Failed to get finalized block");
_ = sender.try_stream(e).await;
return None;
}
Expand Down Expand Up @@ -406,7 +407,7 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
Ok(block) => block,
Err(e) => {
error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
opt_error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
_ = sender.try_stream(e).await;
return None;
}
Expand All @@ -418,13 +419,13 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(

batch_count += 1;
if batch_count % 10 == 0 {
debug!(batch_count = batch_count, "Processed historical batches");
opt_debug!(batch_count = batch_count, "Processed historical batches");
}

let reorged_opt = match reorg_handler.check(&batch_end).await {
Ok(opt) => opt,
Err(e) => {
error!(error = %e, "Failed to perform reorg check");
opt_error!(error = %e, "Failed to perform reorg check");
_ = sender.try_stream(e).await;
return None;
}
Expand All @@ -441,7 +442,7 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
};

if next_start_block > end {
info!(batch_count = batch_count, "Historical sync completed");
opt_info!(batch_count = batch_count, "Historical sync completed");
return Some(batch_end);
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/block_range_scanner/reorg_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use alloy::{
network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse},
primitives::BlockHash,
};
use tracing::{info, warn};

use crate::{
ScannerError,
Expand Down Expand Up @@ -63,22 +62,22 @@ impl<N: Network> ReorgHandler<N> {
block: &N::BlockResponse,
) -> Result<Option<N::BlockResponse>, ScannerError> {
let block = block.header();
info!(block_hash = %block.hash(), block_number = block.number(), "Checking if block was reorged");
opt_info!(block_hash = %block.hash(), block_number = block.number(), "Checking if block was reorged");

if !self.reorg_detected(block).await? {
let block_hash = block.hash();
info!(block_hash = %block_hash, block_number = block.number(), "No reorg detected");
opt_info!(block_hash = %block_hash, block_number = block.number(), "No reorg detected");
// store the incoming block's hash for future reference
if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) {
self.buffer.push(block_hash);
}
return Ok(None);
}

info!("Reorg detected, searching for common ancestor");
opt_info!("Reorg detected, searching for common ancestor");

while let Some(&block_hash) = self.buffer.back() {
info!(block_hash = %block_hash, "Checking if block exists on-chain");
opt_info!(block_hash = %block_hash, "Checking if block exists on-chain");
match self.provider.get_block_by_hash(block_hash).await {
Ok(common_ancestor) => return self.return_common_ancestor(common_ancestor).await,
Err(robust_provider::Error::BlockNotFound(_)) => {
Expand All @@ -94,12 +93,13 @@ impl<N: Network> ReorgHandler<N> {
// no need to store finalized block's hash in the buffer, as it is returned by default only
// if not buffered hashes exist on-chain

warn!("Possible deep reorg detected, setting finalized block as common ancestor");
opt_warn!("Possible deep reorg detected, setting finalized block as common ancestor");

let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;

let header = finalized.header();
info!(finalized_hash = %header.hash(), block_number = header.number(), "Finalized block set as common ancestor");
#[allow(clippy::used_underscore_binding)]
let _header = finalized.header();
opt_info!(finalized_hash = %_header.hash(), block_number = _header.number(), "Finalized block set as common ancestor");

Ok(Some(finalized))
}
Expand All @@ -120,10 +120,10 @@ impl<N: Network> ReorgHandler<N> {
let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?;
let finalized_header = finalized.header();
let common_ancestor = if finalized_header.number() <= common_ancestor_header.number() {
info!(common_ancestor = %common_ancestor_header.hash(), block_number = common_ancestor_header.number(), "Common ancestor found");
opt_info!(common_ancestor = %common_ancestor_header.hash(), block_number = common_ancestor_header.number(), "Common ancestor found");
common_ancestor
} else {
warn!(
opt_warn!(
finalized_hash = %finalized_header.hash(), block_number = finalized_header.number(), "Possible deep reorg detected, using finalized block as common ancestor"
);
// all buffered blocks are finalized, so no more need to track them
Expand Down
Loading