diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index df1e2238d..2372ae7e8 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -190,9 +190,13 @@ impl< // Create sync manager let received_filter_heights = stats.read().await.received_filter_heights.clone(); tracing::info!("Creating sequential sync manager"); - let sync_manager = - SequentialSyncManager::new(&config, received_filter_heights, wallet.clone()) - .map_err(SpvError::Sync)?; + let sync_manager = SequentialSyncManager::new( + &config, + received_filter_heights, + wallet.clone(), + state.clone(), + ) + .map_err(SpvError::Sync)?; // Create validation manager let validation = ValidationManager::new(config.validation_mode); @@ -343,20 +347,6 @@ impl< match loaded_count { Ok(loaded_count) => { tracing::info!("✅ Sync manager loaded {} headers from storage", loaded_count); - - // IMPORTANT: Also load headers into the client's ChainState for normal sync - // This is needed because the status display reads from the client's ChainState - let state = self.state.read().await; - let is_normal_sync = !state.synced_from_checkpoint; - drop(state); // Release the lock before loading headers - - if is_normal_sync && loaded_count > 0 { - tracing::info!("Loading headers into client ChainState for normal sync..."); - if let Err(e) = self.load_headers_into_client_state(tip_height).await { - tracing::error!("Failed to load headers into client ChainState: {}", e); - // This is not critical for normal sync, continue anyway - } - } } Err(e) => { tracing::error!("Failed to load headers into sync manager: {}", e); @@ -1940,79 +1930,6 @@ impl< Ok(true) } - /// Load headers from storage into the client's ChainState. - /// This is used during normal sync to ensure the status display shows correct header count. - async fn load_headers_into_client_state(&mut self, tip_height: u32) -> Result<()> { - if tip_height == 0 { - return Ok(()); - } - - tracing::debug!("Loading {} headers from storage into client ChainState", tip_height); - let start_time = Instant::now(); - - // Load headers in batches to avoid memory spikes - const BATCH_SIZE: u32 = 10_000; - let mut loaded_count = 0u32; - - // Start from height 1 (genesis is already in ChainState) - let mut current_height = 1u32; - - while current_height <= tip_height { - let end_height = (current_height + BATCH_SIZE - 1).min(tip_height); - - // Load batch of headers from storage - let headers = { - let storage = self.storage.lock().await; - storage - .load_headers(current_height..end_height + 1) - .await - .map_err(SpvError::Storage)? - }; - - if headers.is_empty() { - tracing::warn!( - "No headers found for range {}..{} - storage may be incomplete", - current_height, - end_height + 1 - ); - break; - } - - // Add headers to client's chain state - { - let mut state = self.state.write().await; - for header in headers { - state.add_header(header); - loaded_count += 1; - } - } - - // Progress logging for large header counts - if loaded_count % 50_000 == 0 || loaded_count == tip_height { - let elapsed = start_time.elapsed(); - let headers_per_sec = loaded_count as f64 / elapsed.as_secs_f64(); - tracing::debug!( - "Loaded {}/{} headers into client ChainState ({:.0} headers/sec)", - loaded_count, - tip_height, - headers_per_sec - ); - } - - current_height = end_height + 1; - } - - let elapsed = start_time.elapsed(); - tracing::info!( - "✅ Loaded {} headers into client ChainState in {:.2}s ({:.0} headers/sec)", - loaded_count, - elapsed.as_secs_f64(), - loaded_count as f64 / elapsed.as_secs_f64() - ); - - Ok(()) - } - /// Rollback chain state to a specific height. async fn rollback_to_height(&mut self, target_height: u32) -> Result<()> { tracing::info!("Rolling back chain state to height {}", target_height); @@ -2255,9 +2172,9 @@ impl< self.config.network, ); - // Clone the chain state for storage and sync manager + // Clone the chain state for storage let chain_state_for_storage = (*chain_state).clone(); - let checkpoint_chain_state = (*chain_state).clone(); + let headers_len = chain_state_for_storage.headers.len() as u32; drop(chain_state); // Update storage with chain state including sync_base_height @@ -2278,8 +2195,12 @@ impl< checkpoint.height ); - // Update the sync manager's chain state with the checkpoint-initialized state - self.sync_manager.set_chain_state(checkpoint_chain_state); + // Update the sync manager's cached flags from the checkpoint-initialized state + self.sync_manager.update_chain_state_cache( + true, + checkpoint.height, + headers_len, + ); tracing::info!( "Updated sync manager with checkpoint-initialized chain state" ); diff --git a/dash-spv/src/sync/headers_with_reorg.rs b/dash-spv/src/sync/headers_with_reorg.rs index 18670860a..e6d498a9d 100644 --- a/dash-spv/src/sync/headers_with_reorg.rs +++ b/dash-spv/src/sync/headers_with_reorg.rs @@ -19,6 +19,8 @@ use crate::network::NetworkManager; use crate::storage::StorageManager; use crate::sync::headers2_state::Headers2StateManager; use crate::types::ChainState; +use std::sync::Arc; +use tokio::sync::RwLock; /// Configuration for reorg handling pub struct ReorgConfig { @@ -52,21 +54,27 @@ pub struct HeaderSyncManagerWithReorg { tip_manager: ChainTipManager, checkpoint_manager: CheckpointManager, reorg_config: ReorgConfig, - chain_state: ChainState, + chain_state: Arc>, // WalletState removed - wallet functionality is now handled externally headers2_state: Headers2StateManager, total_headers_synced: u32, syncing_headers: bool, last_sync_progress: std::time::Instant, headers2_failed: bool, + // Cached flags for quick access without locking + cached_synced_from_checkpoint: bool, + cached_sync_base_height: u32, } impl HeaderSyncManagerWithReorg { /// Create a new header sync manager with reorg support - pub fn new(config: &ClientConfig, reorg_config: ReorgConfig) -> SyncResult { - let chain_state = ChainState::new_for_network(config.network); + pub fn new( + config: &ClientConfig, + reorg_config: ReorgConfig, + chain_state: Arc>, + ) -> SyncResult { // WalletState removed - wallet functionality is now handled externally // Create checkpoint manager based on network @@ -91,6 +99,8 @@ impl 0 { + if self.is_synced_from_checkpoint() && self.get_sync_base_height() > 0 { tracing::info!("No headers in storage for checkpoint sync - this is expected"); return Ok(0); } return Ok(0); }; - if tip_height == 0 && !self.chain_state.synced_from_checkpoint { + if tip_height == 0 && !self.is_synced_from_checkpoint() { tracing::debug!("Only genesis block in storage"); return Ok(0); } @@ -140,7 +155,7 @@ impl 0 { + if self.is_synced_from_checkpoint() && self.get_sync_base_height() > 0 { // For checkpoint sync, start from index 0 in storage // (which represents blockchain height sync_base_height) 0u32 @@ -159,9 +174,12 @@ impl { // Add headers to chain state - for header in headers { - self.chain_state.add_header(header); - loaded_count += 1; + { + let mut cs = self.chain_state.write().await; + for header in headers { + cs.add_header(header); + loaded_count += 1; + } } } Ok(_) => { @@ -176,10 +194,7 @@ impl { // For checkpoint sync with only 1 header stored, this is expected - if self.chain_state.synced_from_checkpoint - && loaded_count == 0 - && tip_height == 0 - { + if self.is_synced_from_checkpoint() && loaded_count == 0 && tip_height == 0 { tracing::info!( "No additional headers to load for checkpoint sync - this is expected" ); @@ -206,14 +221,14 @@ impl 0 { - self.total_headers_synced = self.chain_state.sync_base_height + tip_height; + if self.is_synced_from_checkpoint() && self.get_sync_base_height() > 0 { + self.total_headers_synced = self.get_sync_base_height() + tip_height; tracing::info!( "Checkpoint sync initialization: storage_tip={}, sync_base={}, total_headers_synced={}, chain_state.headers.len()={}", tip_height, - self.chain_state.sync_base_height, + self.get_sync_base_height(), self.total_headers_synced, - self.chain_state.headers.len() + self.chain_state.read().await.headers.len() ); } else { self.total_headers_synced = tip_height; @@ -244,7 +259,7 @@ impl { // When syncing from a checkpoint, we need to create a proper locator // that helps the peer understand we want headers AFTER this point - if self.chain_state.synced_from_checkpoint && self.chain_state.sync_base_height > 0 - { + if self.is_synced_from_checkpoint() && self.get_sync_base_height() > 0 { // For checkpoint sync, only include the checkpoint hash // Including genesis would allow peers to fall back to sending headers from genesis // if they don't recognize the checkpoint, which is exactly what we want to avoid tracing::debug!( "📍 Using checkpoint-only locator for height {}: [{}]", - self.chain_state.sync_base_height, + self.get_sync_base_height(), hash ); vec![hash] @@ -403,12 +424,14 @@ impl { // Check if we're syncing from a checkpoint - if self.chain_state.synced_from_checkpoint && !self.chain_state.headers.is_empty() { + if self.is_synced_from_checkpoint() + && !self.chain_state.read().await.headers.is_empty() + { // Use the checkpoint hash from chain state - let checkpoint_hash = self.chain_state.headers[0].block_hash(); + let checkpoint_hash = self.chain_state.read().await.headers[0].block_hash(); tracing::info!( "📍 No base_hash provided but syncing from checkpoint at height {}. Using checkpoint hash: {}", - self.chain_state.sync_base_height, + self.get_sync_base_height(), checkpoint_hash ); vec![checkpoint_hash] @@ -443,7 +466,7 @@ impl 0 { + } else if self.chain_state.read().await.tip_height() > 0 { // Get our current tip to use as the base for compression - if let Some(tip_header) = self.chain_state.get_tip_header() { + if let Some(tip_header) = self.chain_state.read().await.get_tip_header() { tracing::info!( "Initializing headers2 compression state for peer {} with tip header at height {}", peer_id, - self.chain_state.tip_height() + self.chain_state.read().await.tip_height() ); self.headers2_state.init_peer_state(peer_id, tip_header); } @@ -574,7 +599,7 @@ impl { // No headers in storage - check if we're syncing from a checkpoint - if self.chain_state.synced_from_checkpoint && !self.chain_state.headers.is_empty() { + if self.is_synced_from_checkpoint() + && !self.chain_state.read().await.headers.is_empty() + { // We're syncing from a checkpoint and have the checkpoint header - let checkpoint_header = &self.chain_state.headers[0]; + let checkpoint_header = &self.chain_state.read().await.headers[0]; let checkpoint_hash = checkpoint_header.block_hash(); tracing::info!( "No headers in storage but syncing from checkpoint at height {}. Using checkpoint hash: {}", - self.chain_state.sync_base_height, + self.get_sync_base_height(), checkpoint_hash ); Some(checkpoint_hash) @@ -674,7 +701,8 @@ impl { // No headers in storage - check if we're syncing from a checkpoint - if self.chain_state.synced_from_checkpoint - && self.chain_state.sync_base_height > 0 - { + if self.is_synced_from_checkpoint() && self.get_sync_base_height() > 0 { // Use the checkpoint hash from chain state - if !self.chain_state.headers.is_empty() { - let checkpoint_hash = self.chain_state.headers[0].block_hash(); + if !self.chain_state.read().await.headers.is_empty() { + let checkpoint_hash = + self.chain_state.read().await.headers[0].block_hash(); tracing::info!( "Using checkpoint hash for recovery: {} (chain state has {} headers, first header time: {})", checkpoint_hash, - self.chain_state.headers.len(), - self.chain_state.headers[0].time + self.chain_state.read().await.headers.len(), + self.chain_state.read().await.headers[0].time ); Some(checkpoint_hash) } else { @@ -926,7 +951,8 @@ impl bool { - self.checkpoint_manager.is_past_last_checkpoint(self.chain_state.get_height()) + // Use total_headers_synced which tracks absolute blockchain height + self.checkpoint_manager.is_past_last_checkpoint(self.total_headers_synced) } /// Pre-populate headers from checkpoints for fast initial sync @@ -1073,39 +1099,33 @@ impl Option { - self.chain_state.tip_hash() + pub async fn get_tip_hash(&self) -> Option { + self.chain_state.read().await.tip_hash() } /// Get the sync base height (used when syncing from checkpoint) pub fn get_sync_base_height(&self) -> u32 { - self.chain_state.sync_base_height + self.cached_sync_base_height } - /// Get the chain state for checkpoint-aware operations - pub fn get_chain_state(&self) -> &ChainState { - &self.chain_state + /// Whether we're syncing from a checkpoint + pub fn is_synced_from_checkpoint(&self) -> bool { + self.cached_synced_from_checkpoint } - /// Update the chain state with an externally initialized state (e.g., from checkpoint) - pub fn set_chain_state(&mut self, chain_state: ChainState) { - tracing::info!( - "Updating HeaderSyncManager chain state: sync_base_height={}, synced_from_checkpoint={}, headers_count={}", - chain_state.sync_base_height, - chain_state.synced_from_checkpoint, - chain_state.headers.len() - ); - - // Update total_headers_synced based on the new chain state - if chain_state.synced_from_checkpoint && chain_state.sync_base_height > 0 { - // For checkpoint sync, total headers includes the base height - self.total_headers_synced = - chain_state.sync_base_height + chain_state.headers.len() as u32; + /// Update cached flags and totals based on an external state snapshot + pub fn update_cached_from_state_snapshot( + &mut self, + synced_from_checkpoint: bool, + sync_base_height: u32, + headers_len: u32, + ) { + self.cached_synced_from_checkpoint = synced_from_checkpoint; + self.cached_sync_base_height = sync_base_height; + if synced_from_checkpoint && sync_base_height > 0 { + self.total_headers_synced = sync_base_height + headers_len; } else { - // For normal sync, it's just the number of headers - self.total_headers_synced = chain_state.headers.len() as u32; + self.total_headers_synced = headers_len; } - - self.chain_state = chain_state; } } diff --git a/dash-spv/src/sync/sequential/mod.rs b/dash-spv/src/sync/sequential/mod.rs index 000bca2db..e7f5f7b9b 100644 --- a/dash-spv/src/sync/sequential/mod.rs +++ b/dash-spv/src/sync/sequential/mod.rs @@ -24,8 +24,11 @@ use crate::storage::StorageManager; use crate::sync::{ FilterSyncManager, HeaderSyncManagerWithReorg, MasternodeSyncManager, ReorgConfig, }; -use crate::types::{ChainState, SharedFilterHeights, SyncProgress}; +use crate::types::ChainState; +use crate::types::{SharedFilterHeights, SyncProgress}; use key_wallet_manager::wallet_interface::WalletInterface; +use std::sync::Arc; +use tokio::sync::RwLock; use phases::{PhaseTransition, SyncPhase}; use request_control::RequestController; @@ -87,6 +90,7 @@ impl< config: &ClientConfig, received_filter_heights: SharedFilterHeights, wallet: std::sync::Arc>, + chain_state: Arc>, ) -> SyncResult { // Create reorg config with sensible defaults let reorg_config = ReorgConfig::default(); @@ -95,9 +99,10 @@ impl< current_phase: SyncPhase::Idle, transition_manager: TransitionManager::new(config), request_controller: RequestController::new(config), - header_sync: HeaderSyncManagerWithReorg::new(config, reorg_config).map_err(|e| { - SyncError::InvalidState(format!("Failed to create header sync manager: {}", e)) - })?, + header_sync: HeaderSyncManagerWithReorg::new(config, reorg_config, chain_state) + .map_err(|e| { + SyncError::InvalidState(format!("Failed to create header sync manager: {}", e)) + })?, filter_sync: FilterSyncManager::new(config, received_filter_heights), masternode_sync: MasternodeSyncManager::new(config), config: config.clone(), @@ -797,8 +802,17 @@ impl< } /// Update the chain state (used for checkpoint sync initialization) - pub fn set_chain_state(&mut self, chain_state: ChainState) { - self.header_sync.set_chain_state(chain_state); + pub fn update_chain_state_cache( + &mut self, + synced_from_checkpoint: bool, + sync_base_height: u32, + headers_len: u32, + ) { + self.header_sync.update_cached_from_state_snapshot( + synced_from_checkpoint, + sync_base_height, + headers_len, + ); } // Private helper methods @@ -1952,8 +1966,9 @@ impl< let target_blockchain_height = storage.get_header_height_by_hash(&diff.block_hash).await.ok().flatten(); - // Get chain state for masternode height conversion (used later) - let chain_state = self.header_sync.get_chain_state(); + // Determine if we're syncing from a checkpoint for height conversion + let is_ckpt = self.header_sync.is_synced_from_checkpoint(); + let sync_base = self.header_sync.get_sync_base_height(); tracing::info!( "📥 Processing post-sync masternode diff for block {} at height {:?} (base: {} at height {:?})", @@ -1970,12 +1985,11 @@ impl< // Log the current masternode state after update if let Ok(Some(mn_state)) = storage.load_masternode_state().await { // Convert masternode storage height to blockchain height - let mn_blockchain_height = - if chain_state.synced_from_checkpoint && chain_state.sync_base_height > 0 { - chain_state.sync_base_height + mn_state.last_height - } else { - mn_state.last_height - }; + let mn_blockchain_height = if is_ckpt && sync_base > 0 { + sync_base + mn_state.last_height + } else { + mn_state.last_height + }; tracing::debug!( "📊 Masternode state after update: last height = {}, can validate ChainLocks up to height {}", @@ -2073,10 +2087,11 @@ impl< .unwrap_or(0); // Check if we're syncing from a checkpoint - let chain_state = self.header_sync.get_chain_state(); - if chain_state.synced_from_checkpoint && chain_state.sync_base_height > 0 { + if self.header_sync.is_synced_from_checkpoint() + && self.header_sync.get_sync_base_height() > 0 + { // For checkpoint sync, blockchain height = sync_base_height + storage_height - Ok(chain_state.sync_base_height + storage_height) + Ok(self.header_sync.get_sync_base_height() + storage_height) } else { // Normal sync: storage height IS the blockchain height Ok(storage_height)