diff --git a/crates/networking/p2p/metrics.rs b/crates/networking/p2p/metrics.rs index 593a9041240..b59a3be0cf8 100644 --- a/crates/networking/p2p/metrics.rs +++ b/crates/networking/p2p/metrics.rs @@ -3,7 +3,7 @@ use std::{ collections::{BTreeMap, VecDeque}, sync::{ Arc, LazyLock, - atomic::{AtomicU8, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}, }, time::{Duration, SystemTime}, }; @@ -64,6 +64,7 @@ pub struct Metrics { pub downloaded_headers: IntCounter, pub time_to_retrieve_sync_head_block: Arc>>, pub headers_download_start_time: Arc>>, + pub headers_downloading: AtomicBool, // Account tries pub downloaded_account_tries: AtomicU64, @@ -715,6 +716,7 @@ impl Default for Metrics { downloaded_headers, time_to_retrieve_sync_head_block: Arc::new(Mutex::new(None)), headers_download_start_time: Arc::new(Mutex::new(None)), + headers_downloading: AtomicBool::new(false), // Account tries downloaded_account_tries: AtomicU64::new(0), diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 2f1d8261f04..e3f0134cf88 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -212,6 +212,8 @@ pub async fn periodically_show_peer_stats_during_syncing( let mut previous_step = CurrentStepValue::None; let mut phase_start_time = std::time::Instant::now(); let mut sync_started_logged = false; + let mut headers_complete_logged = false; + let mut headers_were_downloading = false; // Track metrics at phase start for phase summaries let mut phase_start = PhaseCounters::default(); @@ -349,6 +351,27 @@ pub async fn periodically_show_peer_stats_during_syncing( previous_step = current_step; } + // Track background header download completion + let headers_downloading = METRICS.headers_downloading.load(Ordering::Relaxed); + if headers_downloading { + headers_were_downloading = true; + } + if headers_were_downloading && !headers_downloading && !headers_complete_logged { + let headers_downloaded = METRICS.downloaded_headers.get(); + let elapsed = METRICS + .headers_download_start_time + .lock() + .await + .map(|t| format_duration(t.elapsed().unwrap_or_default())) + .unwrap_or_else(|| "??:??:??".to_string()); + info!( + " [Headers] Complete: {} headers in {}", + format_thousands(headers_downloaded), + elapsed + ); + headers_complete_logged = true; + } + // Log phase-specific progress update let phase_elapsed = phase_start_time.elapsed(); let total_elapsed = format_duration(start.elapsed()); @@ -369,26 +392,26 @@ pub async fn periodically_show_peer_stats_during_syncing( } } -/// Returns (phase_number, phase_name) for the current step -fn phase_info(step: CurrentStepValue) -> (u8, &'static str) { +/// Returns the phase name for the current step +fn phase_name(step: CurrentStepValue) -> &'static str { match step { - CurrentStepValue::DownloadingHeaders => (1, "BLOCK HEADERS"), - CurrentStepValue::RequestingAccountRanges => (2, "ACCOUNT RANGES"), + CurrentStepValue::DownloadingHeaders => "BLOCK HEADERS", + CurrentStepValue::RequestingAccountRanges => "ACCOUNT RANGES", CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => { - (3, "ACCOUNT INSERTION") + "ACCOUNT INSERTION" } - CurrentStepValue::RequestingStorageRanges => (4, "STORAGE RANGES"), - CurrentStepValue::InsertingStorageRanges => (5, "STORAGE INSERTION"), - CurrentStepValue::HealingState => (6, "STATE HEALING"), - CurrentStepValue::HealingStorage => (7, "STORAGE HEALING"), - CurrentStepValue::RequestingBytecodes => (8, "BYTECODES"), - CurrentStepValue::None => (0, "UNKNOWN"), + CurrentStepValue::RequestingStorageRanges => "STORAGE RANGES", + CurrentStepValue::InsertingStorageRanges => "STORAGE INSERTION", + CurrentStepValue::HealingState => "STATE HEALING", + CurrentStepValue::HealingStorage => "STORAGE HEALING", + CurrentStepValue::RequestingBytecodes => "BYTECODES", + CurrentStepValue::None => "UNKNOWN", } } fn log_phase_separator(step: CurrentStepValue) { - let (phase_num, phase_name) = phase_info(step); - let header = format!("── PHASE {}/8: {} ", phase_num, phase_name); + let name = phase_name(step); + let header = format!("── {} ", name); let header_width = header.chars().count(); let padding_width = 80usize.saturating_sub(header_width); let padding = "─".repeat(padding_width); @@ -397,8 +420,7 @@ fn log_phase_separator(step: CurrentStepValue) { } fn log_phase_completion(step: CurrentStepValue, elapsed: String, summary: &str) { - let (_, phase_name) = phase_info(step); - info!("✓ {} complete: {} in {}", phase_name, summary, elapsed); + info!("✓ {} complete: {} in {}", phase_name(step), summary, elapsed); } async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> String { @@ -466,6 +488,34 @@ async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> S /// Interval in seconds between progress updates const PROGRESS_INTERVAL_SECS: u64 = 30; +/// Shows a compact one-liner for the background header download if active +fn log_background_headers_status(prev_interval: &PhaseCounters) { + let headers_downloading = METRICS.headers_downloading.load(Ordering::Relaxed); + if !headers_downloading { + return; + } + + let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed); + let headers_downloaded = u64::min(METRICS.downloaded_headers.get(), headers_to_download); + let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers); + let percentage = if headers_to_download == 0 { + 0.0 + } else { + (headers_downloaded as f64 / headers_to_download as f64) * 100.0 + }; + let rate = interval_downloaded / PROGRESS_INTERVAL_SECS; + + let bar = progress_bar(percentage, 20); + info!( + " [Headers] {} {:>5.1}% {} / {} @ {}/s", + bar, + percentage, + format_thousands(headers_downloaded), + format_thousands(headers_to_download), + format_thousands(rate) + ); +} + async fn log_phase_progress( step: CurrentStepValue, phase_elapsed: Duration, @@ -478,31 +528,13 @@ async fn log_phase_progress( // Use consistent column widths: left column 40 chars, then │, then right column let col1_width = 40; + // Always show background header progress first (compact one-liner) + log_background_headers_status(prev_interval); + match step { CurrentStepValue::DownloadingHeaders => { - let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed); - let headers_downloaded = - u64::min(METRICS.downloaded_headers.get(), headers_to_download); - let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers); - let percentage = if headers_to_download == 0 { - 0.0 - } else { - (headers_downloaded as f64 / headers_to_download as f64) * 100.0 - }; - let rate = interval_downloaded / PROGRESS_INTERVAL_SECS; - - let progress = progress_bar(percentage, 40); - info!(" {} {:>5.1}%", progress, percentage); - info!(""); - let col1 = format!( - "Headers: {} / {}", - format_thousands(headers_downloaded), - format_thousands(headers_to_download) - ); - info!(" {: { let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed); diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index ec2af80136b..eb5985b8ed8 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -619,6 +619,53 @@ impl PeerHandler { Ok(None) } + + /// Requests a single block header by hash from a specific peer. + pub async fn get_block_header_by_hash( + &mut self, + peer_id: H256, + connection: &mut PeerConnection, + block_hash: H256, + ) -> Result, PeerHandlerError> { + let request_id = rand::random(); + let request = RLPxMessage::GetBlockHeaders(GetBlockHeaders { + id: request_id, + startblock: HashOrNumber::Hash(block_hash), + limit: 1, + skip: 0, + reverse: false, + }); + debug!("get_block_header_by_hash: requesting header with hash {block_hash:?}"); + match PeerHandler::make_request( + &mut self.peer_table, + peer_id, + connection, + request, + PEER_REPLY_TIMEOUT, + ) + .await + { + Ok(RLPxMessage::BlockHeaders(BlockHeaders { + id: _, + block_headers, + })) => { + if let Some(header) = block_headers.into_iter().next() { + return Ok(Some(header)); + } + } + Ok(_other_msgs) => { + debug!("Received unexpected message from peer"); + } + Err(PeerConnectionError::Timeout) => { + debug!("Timeout while waiting for header by hash from peer"); + } + Err(_) => { + warn!("The RLPxConnection closed the backend channel"); + } + } + + Ok(None) + } } /// Validates the block headers received from a peer by checking that the parent hash of each header diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 0757f73a434..ab520995151 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -35,7 +35,7 @@ use ethrex_trie::{Node, verify_range}; use std::{ collections::{BTreeMap, HashMap, VecDeque}, path::Path, - sync::atomic::Ordering, + sync::{atomic::Ordering, Arc}, time::{Duration, SystemTime}, }; use tracing::{debug, error, info, trace, warn}; @@ -96,7 +96,7 @@ pub async fn request_account_range( limit: H256, account_state_snapshots_dir: &Path, pivot_header: &mut BlockHeader, - block_sync_state: &mut SnapBlockSyncState, + block_sync_state: &Arc>, ) -> Result<(), SnapError> { METRICS .current_step diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 5d47db58f89..e071ba4cb88 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -97,7 +97,10 @@ impl SnapBlockSyncState { } } -/// Performs snap sync cycle - fetches state via snap protocol while downloading blocks in parallel +/// Performs snap sync cycle - fetches state via snap protocol while downloading blocks in parallel. +/// +/// Headers are downloaded in a background task concurrently with state sync. +/// This removes headers from the critical path (saves 15-20 min on mainnet). pub async fn sync_cycle_snap( peers: &mut PeerHandler, blockchain: Arc, @@ -106,15 +109,9 @@ pub async fn sync_cycle_snap( store: Store, datadir: &Path, ) -> Result<(), SyncError> { - // Request all block headers between the current head and the sync head - // We will begin from the current head so that we download the earliest state first - // This step is not parallelized - let mut block_sync_state = SnapBlockSyncState::new(store.clone()); - // Check if we have some blocks downloaded from a previous sync attempt - // This applies only to snap sync—full sync always starts fetching headers - // from the canonical block, which updates as new block headers are fetched. - let mut current_head = block_sync_state.get_current_head().await?; - let mut current_head_number = store + let block_sync_state = SnapBlockSyncState::new(store.clone()); + let current_head = block_sync_state.get_current_head().await?; + let current_head_number = store .get_block_number(current_head) .await? .ok_or(SyncError::BlockNumber(current_head))?; @@ -122,36 +119,175 @@ pub async fn sync_cycle_snap( "Syncing from current head {:?} to sync_head {:?}", current_head, sync_head ); - let pending_block = match store.get_pending_block(sync_head).await { - Ok(res) => res, - Err(e) => return Err(e.into()), + + // Fetch the pivot (sync_head) header directly — no need to download the full chain first + let pivot_header = request_pivot_header(peers, sync_head).await?; + info!( + "Obtained pivot header #{} (hash: {:?})", + pivot_header.number, + pivot_header.hash() + ); + + // Check if we should fall back to full sync + let already_has_blocks = store.get_latest_block_number().await? > 0; + if already_has_blocks || pivot_header.number < MIN_FULL_BLOCKS { + info!("Sync head is found, switching to FullSync"); + snap_enabled.store(false, Ordering::Relaxed); + return super::full::sync_cycle_full( + peers, + blockchain, + tokio_util::sync::CancellationToken::new(), + sync_head, + store.clone(), + ) + .await; + } + + // Store the pivot header and add it to block_sync_state + store + .add_block_headers(vec![pivot_header.clone()]) + .await?; + let mut block_sync_state = block_sync_state; + block_sync_state.block_hashes.push(pivot_header.hash()); + + // Wrap in Arc> for sharing between background download and state sync + let block_sync_state = Arc::new(tokio::sync::Mutex::new(block_sync_state)); + + // Spawn background header download — runs concurrently with state sync + let header_handle = { + let bss = block_sync_state.clone(); + let mut bg_peers = peers.clone(); + let bg_store = store.clone(); + let pending_block = store.get_pending_block(sync_head).await.ok().flatten(); + tokio::spawn(async move { + download_headers_background( + &mut bg_peers, + &bg_store, + &bss, + current_head_number, + sync_head, + pending_block.map(|b| b.header), + ) + .await + }) }; - let mut attempts = 0; + info!("Background header download started — proceeding with state sync"); + snap_sync(peers, &store, &block_sync_state, datadir, header_handle).await?; + + store.clear_snap_state().await?; + snap_enabled.store(false, Ordering::Relaxed); + + Ok(()) +} +/// Fetch the pivot block header by hash, retrying across peers. +async fn request_pivot_header( + peers: &mut PeerHandler, + pivot_hash: H256, +) -> Result { + let mut attempts = 0; loop { - debug!("Requesting Block Headers from {current_head}"); + let Some((peer_id, mut connection)) = peers + .peer_table + .get_best_peer(&SUPPORTED_ETH_CAPABILITIES) + .await? + else { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + + if let Some(header) = peers + .get_block_header_by_hash(peer_id, &mut connection, pivot_hash) + .await + .map_err(SyncError::PeerHandler)? + { + peers.peer_table.record_success(&peer_id).await?; + return Ok(header); + } + + peers.peer_table.record_failure(&peer_id).await?; + attempts += 1; + if attempts > MAX_HEADER_FETCH_ATTEMPTS { + return Err(SyncError::NoBlockHeaders); + } + tokio::time::sleep(Duration::from_millis( + 1.1_f64.powf(attempts as f64) as u64, + )) + .await; + } +} + +/// Download all block headers from current_head to sync_head in the background. +/// This runs concurrently with state sync. +async fn download_headers_background( + peers: &mut PeerHandler, + store: &Store, + block_sync_state: &Arc>, + current_head_number: u64, + sync_head: H256, + pending_block_header: Option, +) -> Result<(), SyncError> { + METRICS + .headers_downloading + .store(true, Ordering::Relaxed); + *METRICS.headers_download_start_time.lock().await = Some(SystemTime::now()); + + let result = download_headers_background_inner( + peers, + store, + block_sync_state, + current_head_number, + sync_head, + pending_block_header, + ) + .await; + + METRICS + .headers_downloading + .store(false, Ordering::Relaxed); + result +} + +async fn download_headers_background_inner( + peers: &mut PeerHandler, + store: &Store, + block_sync_state: &Arc>, + mut current_head_number: u64, + sync_head: H256, + pending_block_header: Option, +) -> Result<(), SyncError> { + let mut current_head = { + let bss = block_sync_state.lock().await; + bss.store + .get_latest_canonical_block_hash() + .await? + .unwrap_or(H256::zero()) + }; + if let Some(head) = store.get_header_download_checkpoint().await? { + current_head = head; + } + + let mut attempts = 0; + loop { let Some(mut block_headers) = peers .request_block_headers(current_head_number, sync_head) .await? else { if attempts > MAX_HEADER_FETCH_ATTEMPTS { - warn!("Sync failed to find target block header, aborting"); - return Ok(()); + error!("Background header download: failed to find target after {attempts} attempts"); + return Err(SyncError::NoBlockHeaders); } attempts += 1; - tokio::time::sleep(Duration::from_millis(1.1_f64.powf(attempts as f64) as u64)).await; + tokio::time::sleep(Duration::from_millis( + 1.1_f64.powf(attempts as f64) as u64, + )) + .await; continue; }; - debug!("Sync Log 1: In snap sync"); - debug!( - "Sync Log 2: State block hashes len {}", - block_sync_state.block_hashes.len() - ); - - let (first_block_hash, first_block_number, first_block_parent_hash) = + let (first_block_hash, _first_block_number, first_block_parent_hash) = match block_headers.first() { Some(header) => (header.hash(), header.number, header.parent_hash), None => continue, @@ -160,34 +296,23 @@ pub async fn sync_cycle_snap( Some(header) => (header.hash(), header.number), None => continue, }; - // TODO(#2126): This is just a temporary solution to avoid a bug where the sync would get stuck - // on a loop when the target head is not found, i.e. on a reorg with a side-chain. + if first_block_hash == last_block_hash && first_block_hash == current_head && current_head != sync_head { - // There is no path to the sync head this goes back until it find a common ancerstor - warn!("Sync failed to find target block header, going back to the previous parent"); + warn!("Background header download: going back to previous parent"); current_head = first_block_parent_hash; continue; } - debug!( - "Received {} block headers| First Number: {} Last Number: {}", - block_headers.len(), - first_block_number, - last_block_number - ); - - // If we have a pending block from new_payload request - // attach it to the end if it matches the parent_hash of the latest received header - if let Some(ref block) = pending_block - && block.header.parent_hash == last_block_hash - { - block_headers.push(block.header.clone()); + // Attach pending block if it connects + if let Some(ref pending_header) = pending_block_header { + if pending_header.parent_hash == last_block_hash { + block_headers.push(pending_header.clone()); + } } - // Filter out everything after the sync_head let mut sync_head_found = false; if let Some(index) = block_headers .iter() @@ -197,68 +322,48 @@ pub async fn sync_cycle_snap( block_headers.drain(index + 1..); } - // Update current fetch head current_head = last_block_hash; current_head_number = last_block_number; - // If the sync head is not 0 we search to fullsync - let head_found = sync_head_found && store.get_latest_block_number().await? > 0; - // Or the head is very close to 0 - let head_close_to_0 = last_block_number < MIN_FULL_BLOCKS; - - if head_found || head_close_to_0 { - // Too few blocks for a snap sync, switching to full sync - info!("Sync head is found, switching to FullSync"); - snap_enabled.store(false, Ordering::Relaxed); - return super::full::sync_cycle_full( - peers, - blockchain, - tokio_util::sync::CancellationToken::new(), - sync_head, - store.clone(), - ) - .await; - } - - // Discard the first header as we already have it if block_headers.len() > 1 { let block_headers_iter = block_headers.into_iter().skip(1); - block_sync_state + .lock() + .await .process_incoming_headers(block_headers_iter) .await?; } if sync_head_found { + info!("Background header download complete"); break; - }; + } } - snap_sync(peers, &store, &mut block_sync_state, datadir).await?; - - store.clear_snap_state().await?; - snap_enabled.store(false, Ordering::Relaxed); - Ok(()) } -/// Main snap sync logic - downloads state via snap protocol +/// Main snap sync logic - downloads state via snap protocol. +/// +/// `header_download_handle` is the background header download task spawned by +/// `sync_cycle_snap`. It must complete before finalization (store_block_bodies +/// and forkchoice_update) which need the full header chain. pub async fn snap_sync( peers: &mut PeerHandler, store: &Store, - block_sync_state: &mut SnapBlockSyncState, + block_sync_state: &Arc>, datadir: &Path, + header_download_handle: tokio::task::JoinHandle>, ) -> Result<(), SyncError> { - // snap-sync: launch tasks to fetch blocks and state in parallel - // - Fetch each block's body and its receipt via eth p2p requests - // - Fetch the pivot block's state via snap p2p requests - // - Execute blocks after the pivot (like in full-sync) let pivot_hash = block_sync_state + .lock() + .await .block_hashes .last() + .copied() .ok_or(SyncError::NoBlockHeaders)?; let mut pivot_header = store - .get_block_header_by_hash(*pivot_hash)? + .get_block_header_by_hash(pivot_hash)? .ok_or(SyncError::CorruptDB)?; while block_is_stale(&pivot_header) { @@ -560,6 +665,12 @@ pub async fn snap_sync( debug_assert!(validate_bytecodes(store.clone(), pivot_header.state_root)); + // Wait for background header download to complete before store_block_bodies + // and forkchoice_update, which need the full header chain. + info!("Waiting for background header download to complete..."); + header_download_handle.await??; + info!("Background header download complete"); + store_block_bodies(vec![pivot_header.clone()], peers.clone(), store.clone()).await?; let block = store @@ -569,7 +680,8 @@ pub async fn snap_sync( store.add_block(block).await?; - let numbers_and_hashes = block_sync_state + let bss = block_sync_state.lock().await; + let numbers_and_hashes = bss .block_hashes .iter() .rev() @@ -622,7 +734,7 @@ pub async fn update_pivot( block_number: u64, block_timestamp: u64, peers: &mut PeerHandler, - block_sync_state: &mut SnapBlockSyncState, + block_sync_state: &Arc>, ) -> Result { // We multiply the estimation by 0.9 in order to account for missing slots (~9% in tesnets) let new_pivot_block_number = block_number @@ -672,6 +784,8 @@ pub async fn update_pivot( .await? .ok_or(SyncError::NoBlockHeaders)?; block_sync_state + .lock() + .await .process_incoming_headers(block_headers.into_iter()) .await?; *METRICS.sync_head_hash.lock().await = pivot.hash();