diff --git a/dash-spv-ffi/tests/unit/test_type_conversions.rs b/dash-spv-ffi/tests/unit/test_type_conversions.rs index 4d38d12c5..3aa61e4a9 100644 --- a/dash-spv-ffi/tests/unit/test_type_conversions.rs +++ b/dash-spv-ffi/tests/unit/test_type_conversions.rs @@ -209,7 +209,7 @@ mod tests { filters_received: u64::MAX, filter_sync_start_time: None, last_filter_received_time: None, - received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new( + received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), )), active_filter_requests: 0, diff --git a/dash-spv/src/client/status_display.rs b/dash-spv/src/client/status_display.rs index e8b9b4759..b8537274d 100644 --- a/dash-spv/src/client/status_display.rs +++ b/dash-spv/src/client/status_display.rs @@ -73,13 +73,16 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> { /// Get current sync progress. pub async fn sync_progress(&self) -> Result { let state = self.state.read().await; - let stats = self.stats.read().await; + // Clone the inner heights handle and copy needed counters without awaiting while holding the RwLock + let (filters_received, received_heights) = { + let stats = self.stats.read().await; + (stats.filters_received, std::sync::Arc::clone(&stats.received_filter_heights)) + }; - // Calculate last synced filter height from received filter heights - let last_synced_filter_height = if let Ok(heights) = stats.received_filter_heights.lock() { + // Calculate last synced filter height from received filter heights without holding the RwLock guard + let last_synced_filter_height = { + let heights = received_heights.lock().await; heights.iter().max().copied() - } else { - None }; // Calculate the actual header height considering checkpoint sync @@ -100,7 +103,7 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> { filter_headers_synced: false, // TODO: Implement masternodes_synced: false, // TODO: Implement filter_sync_available: false, // TODO: Get from network manager - filters_downloaded: stats.filters_received, + filters_downloaded: filters_received, last_synced_filter_height, sync_start: std::time::SystemTime::now(), // TODO: Track properly last_update: std::time::SystemTime::now(), diff --git a/dash-spv/src/sync/filters.rs b/dash-spv/src/sync/filters.rs index 0608bc44a..3ff4ae24b 100644 --- a/dash-spv/src/sync/filters.rs +++ b/dash-spv/src/sync/filters.rs @@ -16,7 +16,7 @@ use crate::client::ClientConfig; use crate::error::{SyncError, SyncResult}; use crate::network::NetworkManager; use crate::storage::StorageManager; -use crate::types::SyncProgress; +use crate::types::{SharedFilterHeights, SyncProgress}; // Constants for filter synchronization // Stay under Dash Core's 2000 limit (for CFHeaders). Using 1999 helps reduce accidental overlaps. @@ -74,11 +74,11 @@ pub struct FilterSyncManager { /// Blocks currently being downloaded (map for quick lookup) downloading_blocks: HashMap, /// Blocks requested by the filter processing thread - pub processing_thread_requests: std::sync::Arc>>, + pub processing_thread_requests: std::sync::Arc>>, /// Track requested filter ranges: (start_height, end_height) -> request_time requested_filter_ranges: HashMap<(u32, u32), std::time::Instant>, /// Track individual filter heights that have been received (shared with stats) - received_filter_heights: std::sync::Arc>>, + received_filter_heights: SharedFilterHeights, /// Maximum retries for a filter range max_filter_retries: u32, /// Retry attempts per range @@ -102,6 +102,64 @@ pub struct FilterSyncManager { impl FilterSyncManager { + /// Scan backward from `abs_height` down to `min_abs_height` (inclusive) + /// to find the nearest available block header stored in `storage`. + /// Returns the found `(BlockHash, height)` or `None` if none available. + async fn find_available_header_at_or_before( + &self, + abs_height: u32, + min_abs_height: u32, + storage: &S, + ) -> Option<(BlockHash, u32)> { + if abs_height < min_abs_height { + return None; + } + + let mut scan_height = abs_height; + loop { + let Some(scan_storage_height) = self.header_abs_to_storage_index(scan_height) else { + tracing::debug!( + "Storage index not available for blockchain height {} while scanning (min={})", + scan_height, + min_abs_height + ); + break; + }; + + match storage.get_header(scan_storage_height).await { + Ok(Some(header)) => { + tracing::info!( + "Found available header at blockchain height {} / storage height {}", + scan_height, + scan_storage_height + ); + return Some((header.block_hash(), scan_height)); + } + Ok(None) => { + tracing::debug!( + "Header missing at blockchain height {} / storage height {}, scanning back", + scan_height, + scan_storage_height + ); + } + Err(e) => { + tracing::warn!( + "Error reading header at blockchain height {} / storage height {}: {}", + scan_height, + scan_storage_height, + e + ); + } + } + + if scan_height == min_abs_height { + break; + } + scan_height = scan_height.saturating_sub(1); + } + + None + } /// Calculate the start height of a CFHeaders batch. fn calculate_batch_start_height(cf_headers: &CFHeaders, stop_height: u32) -> u32 { stop_height.saturating_sub(cf_headers.filter_hashes.len() as u32 - 1) @@ -176,10 +234,7 @@ impl>>, - ) -> Self { + pub fn new(config: &ClientConfig, received_filter_heights: SharedFilterHeights) -> Self { Self { _config: config.clone(), syncing_filter_headers: false, @@ -191,7 +246,7 @@ impl= min_height && found_header_info.is_none() { - let Some(scan_storage_height) = - self.header_abs_to_storage_index(scan_height) - else { - break; - }; - match storage.get_header(scan_storage_height).await { - Ok(Some(header)) => { - tracing::info!( - "Found available header at blockchain height {} / storage height {} (originally tried {})", - scan_height, - scan_storage_height, - next_batch_end_height - ); - found_header_info = - Some((header.block_hash(), scan_height)); - break; - } - Ok(None) => { - tracing::debug!( - "Header not found at blockchain height {} / storage height {}, trying {}", - scan_height, - scan_storage_height, - scan_height.saturating_sub(1) - ); - if scan_height == 0 { - break; - } - scan_height = scan_height.saturating_sub(1); - } - Err(e) => { - tracing::error!( - "Error checking header at height {}: {}", - scan_height, - e - ); - if scan_height == 0 { - break; - } - scan_height = scan_height.saturating_sub(1); - } - } - } - - match found_header_info { + match self + .find_available_header_at_or_before( + next_batch_end_height.saturating_sub(1), + min_height, + storage, + ) + .await + { Some((hash, height)) => { - // Check if we found a header at a height less than our current sync height if height < self.current_sync_height { tracing::warn!( "Found header at height {} which is less than current sync height {}. This means we already have filter headers up to {}. Marking sync as complete.", @@ -574,8 +587,6 @@ impl= min_height && found_recovery_info.is_none() { - let scan_storage_height = - match self.header_abs_to_storage_index(scan_height) { - Some(v) => v, - None => break, - }; - if let Ok(Some(header)) = storage.get_header(scan_storage_height).await - { - tracing::info!( - "Found recovery header at blockchain height {} / storage height {} (originally tried {})", - scan_height, - scan_storage_height, - recovery_batch_end_height - ); - found_recovery_info = Some((header.block_hash(), scan_height)); - break; - } else { - if scan_height == 0 { - break; - } - scan_height = scan_height.saturating_sub(1); - } - } - - match found_recovery_info { + match self + .find_available_header_at_or_before( + recovery_batch_end_height.saturating_sub(1), + min_height, + storage, + ) + .await + { Some((hash, height)) => { - // Check if we found a header at a height less than our current sync height if height < self.current_sync_height { tracing::warn!( "Recovery: Found header at height {} which is less than current sync height {}. This indicates we already have filter headers up to {}. Marking sync as complete.", @@ -773,8 +764,6 @@ impl= min_height && found_header.is_none() { - let Some(scan_storage_height) = - self.header_abs_to_storage_index(scan_height) - else { - break; - }; - match storage.get_header(scan_storage_height).await { - Ok(Some(header)) => { - tracing::info!( - "Found available header at blockchain height {} / storage height {} (originally tried {})", - scan_height, - scan_storage_height, - batch_end_height - ); - found_header = Some(header.block_hash()); - break; - } - Ok(None) => { - if scan_height == min_height { - break; - } - scan_height = scan_height.saturating_sub(1); - } - Err(e) => { - tracing::warn!( - "Error getting header at height {}: {}", - scan_height, - e - ); - if scan_height == min_height { - break; - } - scan_height = scan_height.saturating_sub(1); - } - } - } - - match found_header { - Some(hash) => hash, + match self + .find_available_header_at_or_before( + batch_end_height, + self.current_sync_height, + storage, + ) + .await + { + Some((hash, _height)) => hash, None => { // If we can't find any headers in the batch range, something is wrong // Don't fall back to tip as that would create an oversized request @@ -1745,7 +1699,8 @@ impl SyncResult { - if let Ok(received_heights) = self.received_filter_heights.lock() { - for height in start..=end { - if !received_heights.contains(&height) { - return Ok(false); - } + let received_heights = self.received_filter_heights.lock().await; + for height in start..=end { + if !received_heights.contains(&height) { + return Ok(false); } - Ok(true) - } else { - Err(SyncError::Storage("Failed to lock received filter heights".to_string())) } + Ok(true) } /// Record that a filter was received at a specific height. @@ -1795,14 +1747,13 @@ impl u32 { - if let Ok(heights) = self.received_filter_heights.lock() { - heights.len() as u32 - } else { - 0 + match self.received_filter_heights.try_lock() { + Ok(heights) => heights.len() as u32, + Err(_) => 0, } } @@ -2668,7 +2616,7 @@ impl, _processing_thread_requests: std::sync::Arc< - std::sync::Mutex>, + tokio::sync::Mutex>, >, stats: std::sync::Arc>, ) -> FilterNotificationSender { @@ -2706,7 +2654,7 @@ impl, processing_thread_requests: &std::sync::Arc< - std::sync::Mutex>, + tokio::sync::Mutex>, >, stats: &std::sync::Arc>, ) -> SyncResult<()> { @@ -2757,19 +2705,12 @@ impl { - requests.insert(cfilter.block_hash); - tracing::debug!( - "Registered block {} in processing thread requests", - cfilter.block_hash - ); - } - Err(e) => { - tracing::error!("Failed to lock processing thread requests: {}", e); - return Ok(()); - } - } + let mut requests = processing_thread_requests.lock().await; + requests.insert(cfilter.block_hash); + tracing::debug!( + "Registered block {} in processing thread requests", + cfilter.block_hash + ); } // Request the full block download @@ -2779,7 +2720,8 @@ impl Vec<(u32, u32)> { let mut missing_ranges = Vec::new(); - let heights = match self.received_filter_heights.lock() { + let heights = match self.received_filter_heights.try_lock() { Ok(heights) => heights.clone(), - Err(_) => return missing_ranges, // Return empty if lock fails + Err(_) => return missing_ranges, }; // For each requested range @@ -3124,9 +3066,9 @@ impl heights.clone(), - Err(_) => return timed_out, // Return empty if lock fails + Err(_) => return timed_out, }; for ((start, end), request_time) in &self.requested_filter_ranges { @@ -3151,9 +3093,9 @@ impl bool { - let heights = match self.received_filter_heights.lock() { + let heights = match self.received_filter_heights.try_lock() { Ok(heights) => heights, - Err(_) => return false, // Return false if lock fails + Err(_) => return false, }; for height in start_height..=end_height { @@ -3500,7 +3442,7 @@ impl>>, + received_filter_heights: SharedFilterHeights, wallet: std::sync::Arc>, ) -> SyncResult { // Create reorg config with sensible defaults diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index 7fad00d7a..d621cda94 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -9,6 +9,9 @@ use dashcore::{ }; use serde::{Deserialize, Serialize}; +/// Shared, mutex-protected set of filter heights used across components. +pub type SharedFilterHeights = std::sync::Arc>>; + /// Unique identifier for a peer connection. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct PeerId(pub u64); @@ -550,7 +553,7 @@ pub struct SpvStats { /// Received filter heights for gap tracking (shared with FilterSyncManager). #[serde(skip)] - pub received_filter_heights: std::sync::Arc>>, + pub received_filter_heights: SharedFilterHeights, /// Number of filter requests currently active. pub active_filter_requests: u32, @@ -587,7 +590,7 @@ impl Default for SpvStats { filters_received: 0, filter_sync_start_time: None, last_filter_received_time: None, - received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new( + received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), )), active_filter_requests: 0, diff --git a/dash-spv/tests/block_download_test.rs b/dash-spv/tests/block_download_test.rs index ab908b271..9d272ff2b 100644 --- a/dash-spv/tests/block_download_test.rs +++ b/dash-spv/tests/block_download_test.rs @@ -8,7 +8,8 @@ //! Tests for block downloading on filter match functionality. use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::Mutex; use tokio::sync::RwLock; use dashcore::{ diff --git a/dash-spv/tests/cfheader_gap_test.rs b/dash-spv/tests/cfheader_gap_test.rs index a24677401..e64b21c40 100644 --- a/dash-spv/tests/cfheader_gap_test.rs +++ b/dash-spv/tests/cfheader_gap_test.rs @@ -6,7 +6,8 @@ //! Tests for CFHeader gap detection and auto-restart functionality. use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::Mutex; use dash_spv::{ client::ClientConfig, diff --git a/dash-spv/tests/edge_case_filter_sync_test.rs b/dash-spv/tests/edge_case_filter_sync_test.rs index 83686867d..67f665158 100644 --- a/dash-spv/tests/edge_case_filter_sync_test.rs +++ b/dash-spv/tests/edge_case_filter_sync_test.rs @@ -8,7 +8,8 @@ //! Tests for edge case handling in filter header sync, particularly at the tip. use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::Mutex; use dash_spv::{ client::ClientConfig, @@ -52,8 +53,8 @@ impl MockNetworkManager { } } - fn get_sent_messages(&self) -> Vec { - self.sent_messages.lock().unwrap().clone() + async fn get_sent_messages(&self) -> Vec { + self.sent_messages.lock().await.clone() } } @@ -72,7 +73,7 @@ impl NetworkManager for MockNetworkManager { } async fn send_message(&mut self, message: NetworkMessage) -> NetworkResult<()> { - self.sent_messages.lock().unwrap().push(message); + self.sent_messages.lock().await.push(message); Ok(()) } @@ -181,7 +182,7 @@ async fn test_filter_sync_at_tip_edge_case() { assert!(!result.unwrap(), "Should not start sync when already at tip"); // Verify no messages were sent - let sent_messages = network.get_sent_messages(); + let sent_messages = network.get_sent_messages().await; assert_eq!(sent_messages.len(), 0, "Should not send any messages when at tip"); } @@ -285,7 +286,7 @@ async fn test_no_invalid_getcfheaders_at_tip() { assert!(result.unwrap(), "Should start sync when behind by 1 block"); // Check the sent message - let sent_messages = network.get_sent_messages(); + let sent_messages = network.get_sent_messages().await; assert_eq!(sent_messages.len(), 1, "Should send exactly one message"); match &sent_messages[0] { diff --git a/dash-spv/tests/filter_header_verification_test.rs b/dash-spv/tests/filter_header_verification_test.rs index 138f6691a..0ad8eaf0d 100644 --- a/dash-spv/tests/filter_header_verification_test.rs +++ b/dash-spv/tests/filter_header_verification_test.rs @@ -32,7 +32,8 @@ use dashcore::{ }; use dashcore_hashes::{sha256d, Hash}; use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::Mutex; /// Mock network manager for testing filter sync #[derive(Debug)] diff --git a/dash-spv/tests/simple_gap_test.rs b/dash-spv/tests/simple_gap_test.rs index 126e34b6f..04f19aa73 100644 --- a/dash-spv/tests/simple_gap_test.rs +++ b/dash-spv/tests/simple_gap_test.rs @@ -1,7 +1,8 @@ //! Basic test for CFHeader gap detection functionality. use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::Mutex; use dash_spv::{ client::ClientConfig,