diff --git a/dash-spv/src/client/message_handler.rs b/dash-spv/src/client/message_handler.rs index 2123cc2bb..75a83fc6d 100644 --- a/dash-spv/src/client/message_handler.rs +++ b/dash-spv/src/client/message_handler.rs @@ -7,6 +7,7 @@ use crate::network::NetworkManager; use crate::storage::StorageManager; use crate::sync::sequential::SequentialSyncManager; use crate::types::{MempoolState, SpvEvent, SpvStats}; +// Removed local ad-hoc compact filter construction in favor of always processing full blocks use key_wallet_manager::wallet_interface::WalletInterface; use std::sync::Arc; use tokio::sync::RwLock; @@ -238,7 +239,23 @@ impl< block.txdata.len() ); - // Process new block (update state, check watched items) + // 1) Ensure header processing and chain tip update for this block + // Route the header through the sequential sync manager as a Headers message + let headers_msg = NetworkMessage::Headers(vec![block.header]); + if let Err(e) = self + .sync_manager + .handle_message(headers_msg, &mut *self.network, &mut *self.storage) + .await + { + tracing::error!( + "❌ Failed to process header for block {} via sync manager: {}", + block_hash, + e + ); + return Err(SpvError::Sync(e)); + } + + // 2) Always process the full block (privacy and correctness) if let Err(e) = self.process_new_block(block).await { tracing::error!("❌ Failed to process new block {}: {}", block_hash, e); return Err(e); @@ -434,31 +451,16 @@ impl< self.network.send_message(getdata).await.map_err(SpvError::Network)?; } - // Process new blocks immediately when detected + // For blocks announced via inventory during tip sync, request full blocks for privacy if !blocks_to_request.is_empty() { tracing::info!( - "🔄 Processing {} new block announcements to stay synchronized", + "📥 Requesting {} new blocks announced via inventory", blocks_to_request.len() ); - // Extract block hashes - let block_hashes: Vec = blocks_to_request - .iter() - .filter_map(|inv| { - if let Inventory::Block(hash) = inv { - Some(*hash) - } else { - None - } - }) - .collect(); - - // Process each new block - for block_hash in block_hashes { - tracing::info!("📥 Requesting header for new block {}", block_hash); - if let Err(e) = self.process_new_block_hash(block_hash).await { - tracing::error!("❌ Failed to process new block {}: {}", block_hash, e); - } + let getdata = NetworkMessage::GetData(blocks_to_request); + if let Err(e) = self.network.send_message(getdata).await { + tracing::error!("Failed to request announced blocks: {}", e); } } diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index 0791b96a8..824c36ec6 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -880,14 +880,12 @@ impl< // Emit detailed progress update if last_rate_calc.elapsed() >= Duration::from_secs(1) { - // Storage tip is the headers vector index (0-based). - let current_storage_tip = { + // Storage tip now represents the absolute blockchain height. + let current_tip_height = { let storage = self.storage.lock().await; storage.get_tip_height().await.ok().flatten().unwrap_or(0) }; - // Convert to absolute blockchain height: base + storage_tip - let sync_base_height = { self.state.read().await.sync_base_height }; - let current_height = sync_base_height + current_storage_tip; + let current_height = current_tip_height; let peer_best = self .network .get_peer_best_height() @@ -897,9 +895,9 @@ impl< .unwrap_or(current_height); // Calculate headers downloaded this second - if current_storage_tip > last_height { - headers_this_second = current_storage_tip - last_height; - last_height = current_storage_tip; + if current_tip_height > last_height { + headers_this_second = current_tip_height - last_height; + last_height = current_tip_height; } let headers_per_second = headers_this_second as f64; @@ -956,7 +954,7 @@ impl< let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0); let filter_tip = storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0); - (self.state.read().await.sync_base_height + storage_tip, filter_tip) + (storage_tip, filter_tip) }; if abs_header_height != last_emitted_header_height || filter_header_height != last_emitted_filter_header_height @@ -1770,8 +1768,13 @@ impl< let mut loaded_count = 0u32; let target_height = saved_state.chain_tip.height; - // Start from height 1 (genesis is already in ChainState) - let mut current_height = 1u32; + // Determine first height to load. Skip genesis (already present) unless we started from a checkpoint base. + let mut current_height = + if saved_state.synced_from_checkpoint && saved_state.sync_base_height > 0 { + saved_state.sync_base_height + } else { + 1u32 + }; while current_height <= target_height { let end_height = (current_height + BATCH_SIZE - 1).min(target_height); @@ -1786,12 +1789,12 @@ impl< }; if headers.is_empty() { - tracing::error!( - "Failed to load headers for range {}..{} - storage may be corrupted", + tracing::warn!( + "No headers found for range {}..{} when restoring from state", current_height, end_height + 1 ); - return Ok(false); + break; } // Validate headers before adding to chain state diff --git a/dash-spv/src/client/status_display.rs b/dash-spv/src/client/status_display.rs index b8537274d..841f7b5c8 100644 --- a/dash-spv/src/client/status_display.rs +++ b/dash-spv/src/client/status_display.rs @@ -48,13 +48,13 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> { // For checkpoint sync: height = checkpoint_height + storage_count let storage = self.storage.lock().await; if let Ok(Some(storage_tip)) = storage.get_tip_height().await { - let blockchain_height = state.sync_base_height + storage_tip; + let blockchain_height = storage_tip; if with_logging { tracing::debug!( - "Status display: storage_tip={}, sync_base={}, blockchain_height={}", - storage_tip, + "Status display: reported tip height={}, sync_base={}, raw_storage_tip={}", + blockchain_height, state.sync_base_height, - blockchain_height + storage_tip ); } blockchain_height diff --git a/dash-spv/src/storage/disk.rs b/dash-spv/src/storage/disk.rs index c8cb5e3e1..9551bbca1 100644 --- a/dash-spv/src/storage/disk.rs +++ b/dash-spv/src/storage/disk.rs @@ -1159,25 +1159,36 @@ impl StorageManager for DiskStorageManager { async fn load_headers(&self, range: Range) -> StorageResult> { let mut headers = Vec::new(); - let start_segment = Self::get_segment_id(range.start); - let end_segment = Self::get_segment_id(range.end.saturating_sub(1)); + // Convert blockchain height range to storage index range using sync_base_height + let sync_base_height = *self.sync_base_height.read().await; + let storage_start = if sync_base_height > 0 && range.start >= sync_base_height { + range.start - sync_base_height + } else { + range.start + }; + + let storage_end = if sync_base_height > 0 && range.end > sync_base_height { + range.end - sync_base_height + } else { + range.end + }; + + let start_segment = Self::get_segment_id(storage_start); + let end_segment = Self::get_segment_id(storage_end.saturating_sub(1)); for segment_id in start_segment..=end_segment { self.ensure_segment_loaded(segment_id).await?; let segments = self.active_segments.read().await; if let Some(segment) = segments.get(&segment_id) { - let _segment_start_height = segment_id * HEADERS_PER_SEGMENT; - let _segment_end_height = _segment_start_height + segment.headers.len() as u32; - let start_idx = if segment_id == start_segment { - Self::get_segment_offset(range.start) + Self::get_segment_offset(storage_start) } else { 0 }; let end_idx = if segment_id == end_segment { - Self::get_segment_offset(range.end.saturating_sub(1)) + 1 + Self::get_segment_offset(storage_end.saturating_sub(1)) + 1 } else { segment.headers.len() }; @@ -1198,17 +1209,31 @@ impl StorageManager for DiskStorageManager { } async fn get_header(&self, height: u32) -> StorageResult> { - // TODO: This method currently expects storage-relative heights (0-based from sync_base_height). - // Consider refactoring to accept blockchain heights and handle conversion internally for better UX. + // Accept blockchain (absolute) height and convert to storage index using sync_base_height. + let sync_base_height = *self.sync_base_height.read().await; - // First check if this height is within our known range - let tip_height = self.cached_tip_height.read().await; - if let Some(tip) = *tip_height { - if height > tip { + // Convert absolute height to storage index (base-inclusive mapping) + let storage_index = if sync_base_height > 0 { + if height >= sync_base_height { + height - sync_base_height + } else { + // If caller passes a small value (likely a pre-conversion storage index), use it directly + height + } + } else { + height + }; + + // First check if this storage index is within our known range + let tip_index_opt = *self.cached_tip_height.read().await; + if let Some(tip_index) = tip_index_opt { + if storage_index > tip_index { tracing::trace!( - "Requested header at height {} is beyond tip height {}", + "Requested header at storage index {} is beyond tip index {} (abs height {} base {})", + storage_index, + tip_index, height, - tip + sync_base_height ); return Ok(None); } @@ -1217,8 +1242,8 @@ impl StorageManager for DiskStorageManager { return Ok(None); } - let segment_id = Self::get_segment_id(height); - let offset = Self::get_segment_offset(height); + let segment_id = Self::get_segment_id(storage_index); + let offset = Self::get_segment_offset(storage_index); self.ensure_segment_loaded(segment_id).await?; @@ -1235,10 +1260,12 @@ impl StorageManager for DiskStorageManager { if header.is_none() { tracing::debug!( - "Header not found at height {} (segment: {}, offset: {})", - height, + "Header not found at storage index {} (segment: {}, offset: {}, abs height {}, base {})", + storage_index, segment_id, - offset + offset, + height, + sync_base_height ); } @@ -1246,7 +1273,17 @@ impl StorageManager for DiskStorageManager { } async fn get_tip_height(&self) -> StorageResult> { - Ok(*self.cached_tip_height.read().await) + let tip_index_opt = *self.cached_tip_height.read().await; + if let Some(tip_index) = tip_index_opt { + let base = *self.sync_base_height.read().await; + if base > 0 { + Ok(Some(base + tip_index)) + } else { + Ok(Some(tip_index)) + } + } else { + Ok(None) + } } async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> { @@ -1487,7 +1524,12 @@ impl StorageManager for DiskStorageManager { // Load all headers if let Some(tip_height) = self.get_tip_height().await? { - state.headers = self.load_headers(0..tip_height + 1).await?; + let range_start = if state.synced_from_checkpoint && state.sync_base_height > 0 { + state.sync_base_height + } else { + 0 + }; + state.headers = self.load_headers(range_start..tip_height + 1).await?; } // Load all filter headers @@ -2032,16 +2074,22 @@ mod tests { // Store headers using checkpoint sync method storage.store_headers_from_height(&headers, checkpoint_height).await?; - // Verify headers are stored at correct storage indices - // Header at blockchain height 1,100,000 should be at storage index 0 - let header_at_0 = storage.get_header(0).await?; - assert!(header_at_0.is_some(), "Header at storage index 0 should exist"); - assert_eq!(header_at_0.unwrap(), headers[0]); + // Set sync base height so storage interprets heights as blockchain heights + let mut base_state = ChainState::new(); + base_state.sync_base_height = checkpoint_height; + base_state.synced_from_checkpoint = true; + storage.store_chain_state(&base_state).await?; + + // Verify headers are stored at correct blockchain heights + // Header at blockchain height 1,100,000 should be retrievable by that height + let header_at_base = storage.get_header(checkpoint_height).await?; + assert!(header_at_base.is_some(), "Header at base blockchain height should exist"); + assert_eq!(header_at_base.unwrap(), headers[0]); - // Header at blockchain height 1,100,099 should be at storage index 99 - let header_at_99 = storage.get_header(99).await?; - assert!(header_at_99.is_some(), "Header at storage index 99 should exist"); - assert_eq!(header_at_99.unwrap(), headers[99]); + // Header at blockchain height 1,100,099 should be retrievable by that height + let header_at_ending = storage.get_header(checkpoint_height + 99).await?; + assert!(header_at_ending.is_some(), "Header at ending blockchain height should exist"); + assert_eq!(header_at_ending.unwrap(), headers[99]); // Test the reverse index (hash -> blockchain height) let hash_0 = headers[0].block_hash(); @@ -2081,11 +2129,11 @@ mod tests { "After index rebuild, hash should still map to blockchain height 1,100,000" ); - // Verify headers can still be retrieved by storage index - let header_after_reload = storage2.get_header(0).await?; + // Verify header can still be retrieved by blockchain height after reload + let header_after_reload = storage2.get_header(checkpoint_height).await?; assert!( header_after_reload.is_some(), - "Header at storage index 0 should exist after reload" + "Header at base blockchain height should exist after reload" ); assert_eq!(header_after_reload.unwrap(), headers[0]); diff --git a/dash-spv/src/storage/memory.rs b/dash-spv/src/storage/memory.rs index aad026b80..bf3767034 100644 --- a/dash-spv/src/storage/memory.rs +++ b/dash-spv/src/storage/memory.rs @@ -111,26 +111,73 @@ impl StorageManager for MemoryStorageManager { } async fn load_headers(&self, range: Range) -> StorageResult> { - let start = range.start as usize; - let end = range.end.min(self.headers.len() as u32) as usize; + // Interpret range as blockchain (absolute) heights and map to storage indices + let (base, has_base) = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + (state.sync_base_height, true) + } + _ => (0u32, false), + }; - if start > self.headers.len() { + let start_idx = if has_base { + if range.start < base { + 0usize + } else { + (range.start - base) as usize + } + } else { + range.start as usize + }; + + let end_abs = range.end.min(if has_base { + base + self.headers.len() as u32 + } else { + self.headers.len() as u32 + }); + let end_idx = if has_base { + if end_abs <= base { + 0usize + } else { + (end_abs - base) as usize + } + } else { + end_abs as usize + }; + + if start_idx > self.headers.len() { return Ok(Vec::new()); } - - Ok(self.headers[start..end].to_vec()) + let end_idx = end_idx.min(self.headers.len()); + Ok(self.headers[start_idx..end_idx].to_vec()) } async fn get_header(&self, height: u32) -> StorageResult> { - Ok(self.headers.get(height as usize).copied()) + let sync_base_height = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + state.sync_base_height + } + _ => 0u32, + }; + if sync_base_height > 0 && height < sync_base_height { + return Ok(None); + } + + // Convert absolute height to storage index (base-inclusive mapping) + let idx = height.saturating_sub(sync_base_height) as usize; + Ok(self.headers.get(idx).copied()) } async fn get_tip_height(&self) -> StorageResult> { if self.headers.is_empty() { - Ok(None) - } else { - Ok(Some(self.headers.len() as u32 - 1)) + return Ok(None); } + let base = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + state.sync_base_height + } + _ => 0u32, + }; + Ok(Some(base + self.headers.len() as u32 - 1)) } async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> { @@ -349,11 +396,26 @@ impl StorageManager for MemoryStorageManager { return Ok(Vec::new()); } - let mut results = Vec::with_capacity((end_height - start_height + 1) as usize); + // Map absolute heights to storage indices + let base = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + state.sync_base_height + } + _ => 0u32, + }; - for height in start_height..=end_height { - if let Some(header) = self.headers.get(height as usize) { - results.push((height, *header)); + let mut results = Vec::with_capacity((end_height - start_height + 1) as usize); + for abs_h in start_height..=end_height { + let idx = if base > 0 { + if abs_h < base { + continue; + } + (abs_h - base) as usize + } else { + abs_h as usize + }; + if let Some(header) = self.headers.get(idx) { + results.push((abs_h, *header)); } } diff --git a/dash-spv/src/storage/mod.rs b/dash-spv/src/storage/mod.rs index 27d5f31f1..e4f144312 100644 --- a/dash-spv/src/storage/mod.rs +++ b/dash-spv/src/storage/mod.rs @@ -109,27 +109,22 @@ pub trait StorageManager: Send + Sync { /// Load block headers in the given range. async fn load_headers(&self, range: Range) -> StorageResult>; - /// Get a specific header by height. - /// - /// TODO: Consider changing this API to accept blockchain heights instead of storage-relative heights. - /// Currently expects storage index (0-based from sync_base_height), but this creates confusion - /// since most blockchain operations work with absolute blockchain heights. A future refactor - /// could make this more intuitive by handling the height conversion internally. + /// Get a specific header by blockchain height. async fn get_header(&self, height: u32) -> StorageResult>; - /// Get the current tip height. + /// Get the current tip blockchain height. async fn get_tip_height(&self) -> StorageResult>; /// Store filter headers. async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()>; - /// Load filter headers in the given range. + /// Load filter headers in the given blockchain height range. async fn load_filter_headers(&self, range: Range) -> StorageResult>; - /// Get a specific filter header by height. + /// Get a specific filter header by blockchain height. async fn get_filter_header(&self, height: u32) -> StorageResult>; - /// Get the current filter tip height. + /// Get the current filter tip blockchain height. async fn get_filter_tip_height(&self) -> StorageResult>; /// Store masternode state. @@ -144,10 +139,10 @@ pub trait StorageManager: Send + Sync { /// Load chain state. async fn load_chain_state(&self) -> StorageResult>; - /// Store a compact filter. + /// Store a compact filter at a blockchain height. async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()>; - /// Load a compact filter. + /// Load a compact filter by blockchain height. async fn load_filter(&self, height: u32) -> StorageResult>>; /// Store metadata. @@ -168,7 +163,7 @@ pub trait StorageManager: Send + Sync { hash: &dashcore::BlockHash, ) -> StorageResult>; - /// Get multiple headers in a single batch operation. + /// Get multiple headers in a single batch operation using blockchain heights. /// Returns headers with their heights. More efficient than calling get_header multiple times. async fn get_headers_batch( &self, diff --git a/dash-spv/src/sync/filters.rs b/dash-spv/src/sync/filters.rs index 4b80ddb69..d97e6f9f0 100644 --- a/dash-spv/src/sync/filters.rs +++ b/dash-spv/src/sync/filters.rs @@ -164,36 +164,21 @@ impl { - tracing::info!( - "Found available header at blockchain height {} / storage height {}", - scan_height, - scan_storage_height - ); + tracing::info!("Found available header at blockchain height {}", scan_height); return Some((header.block_hash(), scan_height)); } Ok(None) => { tracing::debug!( - "Header missing at blockchain height {} / storage height {}, scanning back", - scan_height, - scan_storage_height + "Header missing at blockchain height {}, scanning back", + scan_height ); } Err(e) => { tracing::warn!( - "Error reading header at blockchain height {} / storage height {}: {}", + "Error reading header at blockchain height {}: {}", scan_height, - scan_storage_height, e ); } @@ -218,14 +203,13 @@ impl SyncResult<(u32, u32, u32)> { - let storage_tip_index = storage + let header_tip_height = storage .get_tip_height() .await .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? - .unwrap_or(0); - - // Convert block header storage index to absolute blockchain height - let header_tip_height = self.header_storage_to_abs_height(storage_tip_index); + .ok_or_else(|| { + SyncError::Storage("No headers available for filter sync".to_string()) + })?; let stop_height = self .find_height_for_block_hash(&cf_headers.stop_hash, storage, 0, header_tip_height) @@ -240,10 +224,8 @@ impl storage.get_header(idx).await.ok().flatten().map(|h| h.block_hash()), - None => None, - }; + let start_hash_opt = + storage.get_header(start_height).await.ok().flatten().map(|h| h.block_hash()); // Always try to resolve the expected/requested start as well (current_sync_height) // We don't have access to current_sync_height here, so we'll log both the batch @@ -329,16 +311,6 @@ impl u32 { - if self.sync_base_height > 0 { - self.sync_base_height + index - } else { - index - } - } - /// Convert absolute blockchain height to filter header storage index. /// Storage indexing is base-inclusive for filter headers as well. fn filter_abs_to_storage_index(&self, height: u32) -> Option { @@ -394,17 +366,16 @@ impl storage.get_header(idx).await.ok().flatten().map(|h| h.block_hash()), - None => None, - }; + let recv_start_hash_opt = + storage.get_header(batch_start_height).await.ok().flatten().map(|h| h.block_hash()); // Resolve expected start hash (what we asked for), for clarity - let expected_start_hash_opt = - match self.header_abs_to_storage_index(self.current_sync_height) { - Some(idx) => storage.get_header(idx).await.ok().flatten().map(|h| h.block_hash()), - None => None, - }; + let expected_start_hash_opt = storage + .get_header(self.current_sync_height) + .await + .ok() + .flatten() + .map(|h| h.block_hash()); let prev_height = batch_start_height.saturating_sub(1); let effective_prev_height = self.current_sync_height.saturating_sub(1); @@ -599,21 +570,11 @@ impl header.block_hash(), Ok(None) => { tracing::warn!( - "Header not found at storage height {} (blockchain height {}), scanning backwards to find actual available height", - storage_height, + "Header not found at blockchain height {}, scanning backwards to find actual available height", next_batch_end_height ); @@ -667,60 +628,30 @@ impl header.block_hash(), Ok(None) if header_tip_height > 0 => { tracing::debug!( - "Tip header not found at storage height {} (blockchain height {}), trying previous header", - tip_storage_height, + "Tip header not found at blockchain height {}, trying previous header", header_tip_height ); // Try previous header when at chain tip - let prev_storage_height = self - .header_abs_to_storage_index(header_tip_height - 1) - .ok_or_else(|| { - SyncError::Validation(format!( - "prev header height {} below checkpoint base {}", - header_tip_height - 1, - self.sync_base_height - )) - })?; - storage - .get_header(prev_storage_height) - .await - .map_err(|e| { - SyncError::Storage(format!( - "Failed to get previous header: {}", - e - )) - })? - .ok_or_else(|| { - SyncError::Storage(format!( - "Neither tip ({}) nor previous header found", - header_tip_height - )) - })? - .block_hash() - } - Ok(None) => { - return Err(SyncError::Validation(format!( - "Tip header not found at height {} (genesis)", - header_tip_height - ))); + match storage.get_header(header_tip_height - 1).await { + Ok(Some(header)) => header.block_hash(), + _ => { + tracing::warn!( + "⚠️ No header found at tip or tip-1 during CFHeaders handling" + ); + return Err(SyncError::Validation( + "No header found at tip or tip-1".to_string(), + )); + } + } } - Err(e) => { - return Err(SyncError::Validation(format!( - "Failed to get tip header: {}", - e - ))); + _ => { + return Err(SyncError::Validation( + "No header found at computed end height".to_string(), + )); } } }; @@ -766,31 +697,24 @@ impl header.block_hash(), Ok(None) => { tracing::warn!( - "Recovery header not found at storage height {} (blockchain height {}), scanning backwards", - storage_height, + "Recovery header not found at blockchain height {}, scanning backwards", recovery_batch_end_height ); @@ -838,18 +762,16 @@ impl header.block_hash(), - Ok(None) if storage_tip_index > 0 => { + Ok(None) if header_tip_height > 0 => { tracing::debug!( - "Tip header not found at storage height {} (blockchain height {}) during recovery, trying previous header", - storage_tip_index, + "Tip header not found at blockchain height {} during recovery, trying previous header", header_tip_height ); // Try previous header when at chain tip storage - .get_header(storage_tip_index - 1) + .get_header(header_tip_height - 1) .await .map_err(|e| { SyncError::Storage(format!( @@ -929,18 +851,16 @@ impl 0 { - // Use storage_tip_height directly since get_header expects storage height - storage - .get_header(storage_tip_index) - .await - .map_err(|e| { - SyncError::Storage(format!( - "Failed to get stop header at storage height {} (blockchain height {}): {}", - storage_tip_index, header_tip_height, e - )) - })? - .ok_or_else(|| { - SyncError::Storage(format!( - "Stop header not found at storage height {} (blockchain height {})", - storage_tip_index, header_tip_height - )) - })? - .block_hash() - } else { - return Err(SyncError::Storage("No headers available for filter sync".to_string())); - }; + let stop_hash = storage + .get_header(header_tip_height) + .await + .map_err(|e| { + SyncError::Storage(format!( + "Failed to get stop header at blockchain height {}: {}", + header_tip_height, e + )) + })? + .ok_or_else(|| { + SyncError::Storage(format!( + "Stop header not found at blockchain height {}", + header_tip_height + )) + })? + .block_hash(); // Initial request for first batch let batch_end_height = @@ -1024,29 +939,18 @@ impl storage height {}", - batch_end_height, - storage_height - ); - match storage.get_header(storage_height).await { + match storage.get_header(batch_end_height).await { Ok(Some(header)) => { tracing::debug!( - "Found header for batch stop at blockchain height {} (storage height {}), hash={}", + "Found header for batch stop at blockchain height {}, hash={}", batch_end_height, - storage_height, header.block_hash() ); header.block_hash() } Ok(None) => { tracing::warn!( - "Initial batch header not found at storage height {} (blockchain height {}), scanning for available header", - storage_height, + "Initial batch header not found at blockchain height {}, scanning for available header", batch_end_height ); @@ -1417,21 +1321,24 @@ impl end { + let base_height = self.sync_base_height; + let clamped_start = start.max(base_height); + + if clamped_start > end { self.syncing_filters = false; return Ok(SyncProgress::default()); } tracing::info!( "🔄 Starting compact filter sync from height {} to {} ({} blocks)", - start, + clamped_start, end, - end - start + 1 + end - clamped_start + 1 ); // Request filters in batches let batch_size = FILTER_REQUEST_BATCH_SIZE; - let mut current_height = start; + let mut current_height = clamped_start; let mut filters_downloaded = 0; while current_height <= end { @@ -1439,18 +1346,12 @@ impl storage - .get_header(idx) - .await - .map_err(|e| SyncError::Storage(format!("Failed to get stop header: {}", e)))? - .ok_or_else(|| SyncError::Storage("Stop header not found".to_string()))? - .block_hash(), - None => { - return Err(SyncError::Storage("batch_end below checkpoint base".to_string())) - } - }; + let stop_hash = storage + .get_header(batch_end) + .await + .map_err(|e| SyncError::Storage(format!("Failed to get stop header: {}", e)))? + .ok_or_else(|| SyncError::Storage("Stop header not found".to_string()))? + .block_hash(); self.request_filters(network, current_height, stop_hash).await?; @@ -1551,7 +1452,10 @@ impl end { + let base_height = self.sync_base_height; + let clamped_start = start.max(base_height); + + if clamped_start > end { tracing::warn!( "⚠️ Filter sync requested from height {} but end height is {} - no filters to sync", start, @@ -1562,40 +1466,31 @@ impl { let stop_hash = header.block_hash(); @@ -2021,13 +1920,13 @@ impl SyncResult<()> { // Find the end height for the stop hash - let header_tip_index = storage + let header_tip_height = storage .get_tip_height() .await .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? - .unwrap_or(0); - - let header_tip_height = self.header_storage_to_abs_height(header_tip_index); + .ok_or_else(|| { + SyncError::Storage("No headers available for filter sync".to_string()) + })?; let end_height = self .find_height_for_block_hash(&stop_hash, storage, start_height, header_tip_height) @@ -2075,6 +1974,7 @@ impl { let stop_hash = header.block_hash(); @@ -3380,15 +3286,18 @@ impl { let batch_stop_hash = batch_header.block_hash(); @@ -3408,16 +3317,14 @@ impl { tracing::warn!( - "Missing header at storage height {} (batch end height {}) for batch retry, continuing to next batch", - batch_storage_height, + "Missing header at height {} for batch retry, continuing to next batch", batch_end ); current_start = batch_end + 1; } Err(e) => { tracing::error!( - "Error retrieving header at storage height {} (batch end height {}): {:?}, continuing to next batch", - batch_storage_height, + "Error retrieving header at height {}: {:?}, continuing to next batch", batch_end, e ); diff --git a/dash-spv/src/sync/headers_with_reorg.rs b/dash-spv/src/sync/headers_with_reorg.rs index 4e9f1ffc2..36b6b2fb7 100644 --- a/dash-spv/src/sync/headers_with_reorg.rs +++ b/dash-spv/src/sync/headers_with_reorg.rs @@ -152,24 +152,21 @@ impl 0 { - // For checkpoint sync, start from index 0 in storage - // (which represents blockchain height sync_base_height) - 0u32 - } else { - // For normal sync from genesis, start from 1 (genesis already in chain state) - 1u32 - }; + // Determine the first blockchain height we need to load. + // For checkpoint syncs we start at the checkpoint base; otherwise we skip genesis (already present). + let base_height = if self.is_synced_from_checkpoint() && self.get_sync_base_height() > 0 { + self.get_sync_base_height() + } else { + 1 + }; + + let mut current_height = base_height; - while current_storage_index <= tip_height { - let end_storage_index = (current_storage_index + BATCH_SIZE - 1).min(tip_height); + while current_height <= tip_height { + let end_height = (current_height + BATCH_SIZE - 1).min(tip_height); // Load batch from storage - let headers_result = - storage.load_headers(current_storage_index..end_storage_index + 1).await; + let headers_result = storage.load_headers(current_height..end_height + 1).await; match headers_result { Ok(headers) if !headers.is_empty() => { @@ -186,8 +183,8 @@ impl 0 { - self.total_headers_synced = self.get_sync_base_height() + tip_height; - tracing::info!( - "Checkpoint sync initialization: storage_tip={}, sync_base={}, total_headers_synced={}, chain_state.headers.len()={}", - tip_height, - self.get_sync_base_height(), - self.total_headers_synced, - self.chain_state.read().await.headers.len() - ); - } else { - self.total_headers_synced = tip_height; - } + self.total_headers_synced = tip_height; let elapsed = start_time.elapsed(); tracing::info!( diff --git a/dash-spv/src/sync/masternodes.rs b/dash-spv/src/sync/masternodes.rs index 4fe2be388..bf0f174ad 100644 --- a/dash-spv/src/sync/masternodes.rs +++ b/dash-spv/src/sync/masternodes.rs @@ -476,7 +476,6 @@ impl Result<(), String> { tracing::info!( "🔗 Feeding QRInfo to engine and getting additional diffs for quorum validation" @@ -591,9 +587,7 @@ impl, storage: &mut S, network: &mut dyn NetworkManager, - sync_base_height: u32, ) -> Result<(), String> { use dashcore::network::message::NetworkMessage; use dashcore::network::message_sml::GetMnListDiff; @@ -675,56 +668,43 @@ impl= sync_base_height { - validation_height - sync_base_height - } else { - tracing::warn!("⚠️ Validation height {} is before sync base height {}, skipping quorum validation", - validation_height, sync_base_height); - continue; - }; - - let storage_quorum_height = if quorum_height >= sync_base_height { - quorum_height - sync_base_height - } else { - tracing::warn!( - "⚠️ Quorum height {} is before sync base height {}, skipping quorum validation", - quorum_height, - sync_base_height - ); - continue; - }; + // Use blockchain heights directly with storage API + let storage_validation_height = validation_height; + let storage_quorum_height = quorum_height; tracing::debug!("🔄 Height conversion: blockchain validation_height={} -> storage_height={}, blockchain quorum_height={} -> storage_height={}", validation_height, storage_validation_height, quorum_height, storage_quorum_height); - // Get base block hash (storage_validation_height) + // Get base block hash (blockchain height) let base_header = match storage.get_header(storage_validation_height).await { Ok(Some(header)) => header, Ok(None) => { - tracing::warn!("⚠️ Base header not found at storage height {} (blockchain height {}), skipping", + tracing::warn!( + "⚠️ Base header not found at storage height {} (blockchain height {}), skipping", storage_validation_height, validation_height); continue; } Err(e) => { - tracing::warn!("⚠️ Failed to get base header at storage height {} (blockchain height {}): {}, skipping", + tracing::warn!( + "⚠️ Failed to get base header at storage height {} (blockchain height {}): {}, skipping", storage_validation_height, validation_height, e); continue; } }; let base_block_hash = base_header.block_hash(); - // Get target block hash (storage_quorum_height) + // Get target block hash (blockchain height) let target_header = match storage.get_header(storage_quorum_height).await { Ok(Some(header)) => header, Ok(None) => { - tracing::warn!("⚠️ Target header not found at storage height {} (blockchain height {}), skipping", + tracing::warn!( + "⚠️ Target header not found at storage height {} (blockchain height {}), skipping", storage_quorum_height, quorum_height); continue; } Err(e) => { - tracing::warn!("⚠️ Failed to get target header at storage height {} (blockchain height {}): {}, skipping", + tracing::warn!( + "⚠️ Failed to get target header at storage height {} (blockchain height {}): {}, skipping", storage_quorum_height, quorum_height, e); continue; } diff --git a/dash-spv/src/sync/sequential/mod.rs b/dash-spv/src/sync/sequential/mod.rs index cb3a16a5d..93772d283 100644 --- a/dash-spv/src/sync/sequential/mod.rs +++ b/dash-spv/src/sync/sequential/mod.rs @@ -259,7 +259,7 @@ impl< let effective_height = self.header_sync.get_chain_height(); let sync_base_height = self.header_sync.get_sync_base_height(); - // Also get the actual storage tip height to verify + // Also get the actual tip height to verify (blockchain height) let storage_tip = storage .get_tip_height() .await @@ -282,7 +282,7 @@ impl< // Use the minimum of effective height and what's actually in storage let _safe_height = if let Some(tip) = storage_tip { - let storage_based_height = sync_base_height + tip; + let storage_based_height = tip; if storage_based_height < effective_height { tracing::warn!( "Chain state height {} exceeds storage height {}, using storage height", @@ -1251,9 +1251,7 @@ impl< ); // Process QRInfo with full block height feeding and comprehensive processing - self.masternode_sync - .handle_qrinfo_message(qr_info.clone(), storage, network, sync_base_height) - .await; + self.masternode_sync.handle_qrinfo_message(qr_info.clone(), storage, network).await; // Check if QRInfo processing completed successfully if let Some(error) = self.masternode_sync.last_error() {