Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/networking/p2p/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
collections::{BTreeMap, VecDeque},
sync::{
Arc, LazyLock,
atomic::{AtomicU8, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -64,6 +64,7 @@ pub struct Metrics {
pub downloaded_headers: IntCounter,
pub time_to_retrieve_sync_head_block: Arc<Mutex<Option<Duration>>>,
pub headers_download_start_time: Arc<Mutex<Option<SystemTime>>>,
pub headers_downloading: AtomicBool,

// Account tries
pub downloaded_account_tries: AtomicU64,
Expand Down Expand Up @@ -715,6 +716,7 @@ impl Default for Metrics {
downloaded_headers,
time_to_retrieve_sync_head_block: Arc::new(Mutex::new(None)),
headers_download_start_time: Arc::new(Mutex::new(None)),
headers_downloading: AtomicBool::new(false),

// Account tries
downloaded_account_tries: AtomicU64::new(0),
Expand Down
108 changes: 70 additions & 38 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ pub async fn periodically_show_peer_stats_during_syncing(
let mut previous_step = CurrentStepValue::None;
let mut phase_start_time = std::time::Instant::now();
let mut sync_started_logged = false;
let mut headers_complete_logged = false;
let mut headers_were_downloading = false;

// Track metrics at phase start for phase summaries
let mut phase_start = PhaseCounters::default();
Expand Down Expand Up @@ -349,6 +351,27 @@ pub async fn periodically_show_peer_stats_during_syncing(
previous_step = current_step;
}

// Track background header download completion
let headers_downloading = METRICS.headers_downloading.load(Ordering::Relaxed);
if headers_downloading {
headers_were_downloading = true;
}
if headers_were_downloading && !headers_downloading && !headers_complete_logged {
let headers_downloaded = METRICS.downloaded_headers.get();
let elapsed = METRICS
.headers_download_start_time
.lock()
.await
.map(|t| format_duration(t.elapsed().unwrap_or_default()))
.unwrap_or_else(|| "??:??:??".to_string());
info!(
" [Headers] Complete: {} headers in {}",
format_thousands(headers_downloaded),
elapsed
);
headers_complete_logged = true;
}

// Log phase-specific progress update
let phase_elapsed = phase_start_time.elapsed();
let total_elapsed = format_duration(start.elapsed());
Expand All @@ -369,26 +392,26 @@ pub async fn periodically_show_peer_stats_during_syncing(
}
}

/// Returns (phase_number, phase_name) for the current step
fn phase_info(step: CurrentStepValue) -> (u8, &'static str) {
/// Returns the phase name for the current step
fn phase_name(step: CurrentStepValue) -> &'static str {
match step {
CurrentStepValue::DownloadingHeaders => (1, "BLOCK HEADERS"),
CurrentStepValue::RequestingAccountRanges => (2, "ACCOUNT RANGES"),
CurrentStepValue::DownloadingHeaders => "BLOCK HEADERS",
CurrentStepValue::RequestingAccountRanges => "ACCOUNT RANGES",
CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => {
(3, "ACCOUNT INSERTION")
"ACCOUNT INSERTION"
}
CurrentStepValue::RequestingStorageRanges => (4, "STORAGE RANGES"),
CurrentStepValue::InsertingStorageRanges => (5, "STORAGE INSERTION"),
CurrentStepValue::HealingState => (6, "STATE HEALING"),
CurrentStepValue::HealingStorage => (7, "STORAGE HEALING"),
CurrentStepValue::RequestingBytecodes => (8, "BYTECODES"),
CurrentStepValue::None => (0, "UNKNOWN"),
CurrentStepValue::RequestingStorageRanges => "STORAGE RANGES",
CurrentStepValue::InsertingStorageRanges => "STORAGE INSERTION",
CurrentStepValue::HealingState => "STATE HEALING",
CurrentStepValue::HealingStorage => "STORAGE HEALING",
CurrentStepValue::RequestingBytecodes => "BYTECODES",
CurrentStepValue::None => "UNKNOWN",
}
}

fn log_phase_separator(step: CurrentStepValue) {
let (phase_num, phase_name) = phase_info(step);
let header = format!("── PHASE {}/8: {} ", phase_num, phase_name);
let name = phase_name(step);
let header = format!("── {} ", name);
let header_width = header.chars().count();
let padding_width = 80usize.saturating_sub(header_width);
let padding = "─".repeat(padding_width);
Expand All @@ -397,8 +420,7 @@ fn log_phase_separator(step: CurrentStepValue) {
}

fn log_phase_completion(step: CurrentStepValue, elapsed: String, summary: &str) {
let (_, phase_name) = phase_info(step);
info!("✓ {} complete: {} in {}", phase_name, summary, elapsed);
info!("✓ {} complete: {} in {}", phase_name(step), summary, elapsed);
}

async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> String {
Expand Down Expand Up @@ -466,6 +488,34 @@ async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> S
/// Interval in seconds between progress updates
const PROGRESS_INTERVAL_SECS: u64 = 30;

/// Shows a compact one-liner for the background header download if active
fn log_background_headers_status(prev_interval: &PhaseCounters) {
let headers_downloading = METRICS.headers_downloading.load(Ordering::Relaxed);
if !headers_downloading {
return;
}

let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed);
let headers_downloaded = u64::min(METRICS.downloaded_headers.get(), headers_to_download);
let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers);
let percentage = if headers_to_download == 0 {
0.0
} else {
(headers_downloaded as f64 / headers_to_download as f64) * 100.0
};
let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;

let bar = progress_bar(percentage, 20);
info!(
" [Headers] {} {:>5.1}% {} / {} @ {}/s",
bar,
percentage,
format_thousands(headers_downloaded),
format_thousands(headers_to_download),
format_thousands(rate)
);
}

async fn log_phase_progress(
step: CurrentStepValue,
phase_elapsed: Duration,
Expand All @@ -478,31 +528,13 @@ async fn log_phase_progress(
// Use consistent column widths: left column 40 chars, then │, then right column
let col1_width = 40;

// Always show background header progress first (compact one-liner)
log_background_headers_status(prev_interval);

match step {
CurrentStepValue::DownloadingHeaders => {
let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed);
let headers_downloaded =
u64::min(METRICS.downloaded_headers.get(), headers_to_download);
let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers);
let percentage = if headers_to_download == 0 {
0.0
} else {
(headers_downloaded as f64 / headers_to_download as f64) * 100.0
};
let rate = interval_downloaded / PROGRESS_INTERVAL_SECS;

let progress = progress_bar(percentage, 40);
info!(" {} {:>5.1}%", progress, percentage);
info!("");
let col1 = format!(
"Headers: {} / {}",
format_thousands(headers_downloaded),
format_thousands(headers_to_download)
);
info!(" {:<col1_width$} │ Elapsed: {}", col1, phase_elapsed_str);
let col1 = format!("Rate: {} headers/s", format_thousands(rate));
info!(" {:<col1_width$} │ Peers: {}", col1, peer_count);
info!(" Total time: {}", total_elapsed);
// Headers are now a background task — progress shown by
// log_background_headers_status above. Nothing else to display.
}
CurrentStepValue::RequestingAccountRanges => {
let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed);
Expand Down
47 changes: 47 additions & 0 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,53 @@ impl PeerHandler {

Ok(None)
}

/// Requests a single block header by hash from a specific peer.
pub async fn get_block_header_by_hash(
&mut self,
peer_id: H256,
connection: &mut PeerConnection,
block_hash: H256,
) -> Result<Option<BlockHeader>, PeerHandlerError> {
let request_id = rand::random();
let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders {
id: request_id,
startblock: HashOrNumber::Hash(block_hash),
limit: 1,
skip: 0,
reverse: false,
});
debug!("get_block_header_by_hash: requesting header with hash {block_hash:?}");
match PeerHandler::make_request(
&mut self.peer_table,
peer_id,
connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
{
Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
block_headers,
})) => {
if let Some(header) = block_headers.into_iter().next() {
return Ok(Some(header));
Comment on lines +652 to +653
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_block_header_by_hash returns the first header the peer sends without verifying it matches block_hash. A malicious/buggy peer could return a different header, and snap sync would accept it as the pivot. Validate header.hash() == block_hash (and ideally that exactly one header was returned) before returning Some(header), otherwise treat it as a failure.

Suggested change
if let Some(header) = block_headers.into_iter().next() {
return Ok(Some(header));
if block_headers.len() == 1 {
let header = block_headers.into_iter().next().expect("len checked above");
if header.hash() == block_hash {
return Ok(Some(header));
} else {
debug!(
"Received block header with mismatching hash from peer {peer_id:?}: \
expected {block_hash:?}, got {:?}",
header.hash()
);
}
} else {
debug!(
"Expected exactly one block header from peer {peer_id:?} for hash \
{block_hash:?}, but received {} headers",
block_headers.len()
);

Copilot uses AI. Check for mistakes.
}
}
Ok(_other_msgs) => {
debug!("Received unexpected message from peer");
}
Err(PeerConnectionError::Timeout) => {
debug!("Timeout while waiting for header by hash from peer");
}
Err(_) => {
warn!("The RLPxConnection closed the backend channel");
}
}

Ok(None)
}
}

/// Validates the block headers received from a peer by checking that the parent hash of each header
Expand Down
4 changes: 2 additions & 2 deletions crates/networking/p2p/snap/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use ethrex_trie::{Node, verify_range};
use std::{
collections::{BTreeMap, HashMap, VecDeque},
path::Path,
sync::atomic::Ordering,
sync::{atomic::Ordering, Arc},
time::{Duration, SystemTime},
};
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -96,7 +96,7 @@ pub async fn request_account_range(
limit: H256,
account_state_snapshots_dir: &Path,
pivot_header: &mut BlockHeader,
block_sync_state: &mut SnapBlockSyncState,
block_sync_state: &Arc<tokio::sync::Mutex<SnapBlockSyncState>>,
) -> Result<(), SnapError> {
METRICS
.current_step
Expand Down
Loading
Loading