diff --git a/dash-spv-ffi/dash_spv_ffi.h b/dash-spv-ffi/dash_spv_ffi.h index 644ce1094..b8169ea5c 100644 --- a/dash-spv-ffi/dash_spv_ffi.h +++ b/dash-spv-ffi/dash_spv_ffi.h @@ -28,8 +28,11 @@ typedef enum FFISyncStage { Downloading = 2, Validating = 3, Storing = 4, - Complete = 5, - Failed = 6, + DownloadingFilterHeaders = 5, + DownloadingFilters = 6, + DownloadingBlocks = 7, + Complete = 8, + Failed = 9, } FFISyncStage; typedef enum DashSpvValidationMode { @@ -70,32 +73,28 @@ typedef struct FFIString { uintptr_t length; } FFIString; +typedef struct FFISyncProgress { + uint32_t header_height; + uint32_t filter_header_height; + uint32_t masternode_height; + uint32_t peer_count; + bool filter_sync_available; + uint32_t filters_downloaded; + uint32_t last_synced_filter_height; +} FFISyncProgress; + typedef struct FFIDetailedSyncProgress { - uint32_t current_height; uint32_t total_height; double percentage; double headers_per_second; int64_t estimated_seconds_remaining; enum FFISyncStage stage; struct FFIString stage_message; - uint32_t connected_peers; + struct FFISyncProgress overview; uint64_t total_headers; int64_t sync_start_timestamp; } FFIDetailedSyncProgress; -typedef struct FFISyncProgress { - uint32_t header_height; - uint32_t filter_header_height; - uint32_t masternode_height; - uint32_t peer_count; - bool headers_synced; - bool filter_headers_synced; - bool masternodes_synced; - bool filter_sync_available; - uint32_t filters_downloaded; - uint32_t last_synced_filter_height; -} FFISyncProgress; - typedef struct FFISpvStats { uint32_t connected_peers; uint32_t total_peers; @@ -150,11 +149,6 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id, bool is_ours, void *user_data); -typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height, - uint32_t header_height, - double percentage, - void *user_data); - typedef struct FFIEventCallbacks { BlockCallback on_block; TransactionCallback on_transaction; @@ -164,7 +158,6 @@ typedef struct FFIEventCallbacks { MempoolRemovedCallback on_mempool_transaction_removed; CompactFilterMatchedCallback on_compact_filter_matched; WalletTransactionCallback on_wallet_transaction; - FilterHeadersProgressCallback on_filter_headers_progress; void *user_data; } FFIEventCallbacks; diff --git a/dash-spv-ffi/include/dash_spv_ffi.h b/dash-spv-ffi/include/dash_spv_ffi.h index 644ce1094..b8169ea5c 100644 --- a/dash-spv-ffi/include/dash_spv_ffi.h +++ b/dash-spv-ffi/include/dash_spv_ffi.h @@ -28,8 +28,11 @@ typedef enum FFISyncStage { Downloading = 2, Validating = 3, Storing = 4, - Complete = 5, - Failed = 6, + DownloadingFilterHeaders = 5, + DownloadingFilters = 6, + DownloadingBlocks = 7, + Complete = 8, + Failed = 9, } FFISyncStage; typedef enum DashSpvValidationMode { @@ -70,32 +73,28 @@ typedef struct FFIString { uintptr_t length; } FFIString; +typedef struct FFISyncProgress { + uint32_t header_height; + uint32_t filter_header_height; + uint32_t masternode_height; + uint32_t peer_count; + bool filter_sync_available; + uint32_t filters_downloaded; + uint32_t last_synced_filter_height; +} FFISyncProgress; + typedef struct FFIDetailedSyncProgress { - uint32_t current_height; uint32_t total_height; double percentage; double headers_per_second; int64_t estimated_seconds_remaining; enum FFISyncStage stage; struct FFIString stage_message; - uint32_t connected_peers; + struct FFISyncProgress overview; uint64_t total_headers; int64_t sync_start_timestamp; } FFIDetailedSyncProgress; -typedef struct FFISyncProgress { - uint32_t header_height; - uint32_t filter_header_height; - uint32_t masternode_height; - uint32_t peer_count; - bool headers_synced; - bool filter_headers_synced; - bool masternodes_synced; - bool filter_sync_available; - uint32_t filters_downloaded; - uint32_t last_synced_filter_height; -} FFISyncProgress; - typedef struct FFISpvStats { uint32_t connected_peers; uint32_t total_peers; @@ -150,11 +149,6 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id, bool is_ours, void *user_data); -typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height, - uint32_t header_height, - double percentage, - void *user_data); - typedef struct FFIEventCallbacks { BlockCallback on_block; TransactionCallback on_transaction; @@ -164,7 +158,6 @@ typedef struct FFIEventCallbacks { MempoolRemovedCallback on_mempool_transaction_removed; CompactFilterMatchedCallback on_compact_filter_matched; WalletTransactionCallback on_wallet_transaction; - FilterHeadersProgressCallback on_filter_headers_progress; void *user_data; } FFIEventCallbacks; diff --git a/dash-spv-ffi/src/bin/ffi_cli.rs b/dash-spv-ffi/src/bin/ffi_cli.rs index 6b0406317..38611856b 100644 --- a/dash-spv-ffi/src/bin/ffi_cli.rs +++ b/dash-spv-ffi/src/bin/ffi_cli.rs @@ -1,6 +1,7 @@ use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_void}; use std::ptr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Duration; @@ -16,6 +17,8 @@ enum NetworkOpt { Regtest, } +static SYNC_COMPLETED: AtomicBool = AtomicBool::new(false); + fn ffi_string_to_rust(s: *const c_char) -> String { if s.is_null() { return String::new(); @@ -23,10 +26,6 @@ fn ffi_string_to_rust(s: *const c_char) -> String { unsafe { CStr::from_ptr(s) }.to_str().unwrap_or_default().to_owned() } -extern "C" fn on_filter_headers_progress(filter: u32, headers: u32, pct: f64, _ud: *mut c_void) { - println!("filters: {} headers: {} progress: {:.2}%", filter, headers, pct * 100.0); -} - extern "C" fn on_detailed_progress(progress: *const FFIDetailedSyncProgress, _ud: *mut c_void) { if progress.is_null() { return; @@ -35,10 +34,10 @@ extern "C" fn on_detailed_progress(progress: *const FFIDetailedSyncProgress, _ud let p = &*progress; println!( "height {}/{} {:.2}% peers {} hps {:.1}", - p.current_height, + p.overview.header_height, p.total_height, p.percentage * 100.0, - p.connected_peers, + p.overview.peer_count, p.headers_per_second ); } @@ -48,6 +47,7 @@ extern "C" fn on_completion(success: bool, msg: *const c_char, _ud: *mut c_void) let m = ffi_string_to_rust(msg); if success { println!("Completed: {}", m); + SYNC_COMPLETED.store(true, Ordering::SeqCst); } else { eprintln!("Failed: {}", m); } @@ -171,7 +171,7 @@ fn main() { std::process::exit(1); } - // Set minimal event callbacks (progress via filter headers) + // Set minimal event callbacks let callbacks = FFIEventCallbacks { on_block: None, on_transaction: None, @@ -181,7 +181,6 @@ fn main() { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, - on_filter_headers_progress: Some(on_filter_headers_progress), user_data: ptr::null_mut(), }; let _ = dash_spv_ffi_client_set_event_callbacks(client, callbacks); @@ -193,6 +192,9 @@ fn main() { std::process::exit(1); } + // Ensure completion flag is reset before starting sync + SYNC_COMPLETED.store(false, Ordering::SeqCst); + // Run sync on this thread; detailed progress will print via callback let rc = dash_spv_ffi_client_sync_to_tip_with_progress( client, @@ -211,10 +213,14 @@ fn main() { let prog_ptr = dash_spv_ffi_client_get_sync_progress(client); if !prog_ptr.is_null() { let prog = &*prog_ptr; - let filters_complete = prog.filter_headers_synced - || !prog.filter_sync_available - || disable_filter_sync; - if prog.headers_synced && filters_complete { + let headers_done = SYNC_COMPLETED.load(Ordering::SeqCst); + let filters_complete = if disable_filter_sync || !prog.filter_sync_available { + false + } else { + prog.filter_header_height >= prog.header_height + && prog.last_synced_filter_height >= prog.filter_header_height + }; + if headers_done && (filters_complete || disable_filter_sync) { dash_spv_ffi_sync_progress_destroy(prog_ptr); break; } diff --git a/dash-spv-ffi/src/callbacks.rs b/dash-spv-ffi/src/callbacks.rs index 6e0bc6791..5506b3ddf 100644 --- a/dash-spv-ffi/src/callbacks.rs +++ b/dash-spv-ffi/src/callbacks.rs @@ -135,10 +135,6 @@ pub type WalletTransactionCallback = Option< ), >; -pub type FilterHeadersProgressCallback = Option< - extern "C" fn(filter_height: u32, header_height: u32, percentage: f64, user_data: *mut c_void), ->; - #[repr(C)] pub struct FFIEventCallbacks { pub on_block: BlockCallback, @@ -149,7 +145,6 @@ pub struct FFIEventCallbacks { pub on_mempool_transaction_removed: MempoolRemovedCallback, pub on_compact_filter_matched: CompactFilterMatchedCallback, pub on_wallet_transaction: WalletTransactionCallback, - pub on_filter_headers_progress: FilterHeadersProgressCallback, pub user_data: *mut c_void, } @@ -178,7 +173,6 @@ impl Default for FFIEventCallbacks { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data: std::ptr::null_mut(), } } @@ -389,23 +383,3 @@ impl FFIEventCallbacks { } } } - -impl FFIEventCallbacks { - pub fn call_filter_headers_progress( - &self, - filter_height: u32, - header_height: u32, - percentage: f64, - ) { - if let Some(callback) = self.on_filter_headers_progress { - tracing::info!( - "๐Ÿ“Š Calling filter headers progress callback: filter_height={}, header_height={}, pct={:.2}", - filter_height, header_height, percentage - ); - callback(filter_height, header_height, percentage, self.user_data); - tracing::info!("โœ… Filter headers progress callback completed"); - } else { - tracing::debug!("Filter headers progress callback not set"); - } - } -} diff --git a/dash-spv-ffi/src/client.rs b/dash-spv-ffi/src/client.rs index 581a7f4d8..84b8a3979 100644 --- a/dash-spv-ffi/src/client.rs +++ b/dash-spv-ffi/src/client.rs @@ -249,17 +249,6 @@ impl FFIDashSpvClient { } => { callbacks.call_balance_update(confirmed, unconfirmed); } - dash_spv::types::SpvEvent::FilterHeadersProgress { - filter_header_height, - header_height, - percentage, - } => { - callbacks.call_filter_headers_progress( - filter_header_height, - header_height, - percentage, - ); - } dash_spv::types::SpvEvent::TransactionDetected { ref txid, confirmed, @@ -804,7 +793,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress( match maybe_progress { Some(progress) => { // Handle callback in a thread-safe way - let should_stop = matches!(progress.sync_stage, SyncStage::Complete); + let should_stop = matches!( + progress.sync_stage, + SyncStage::Complete | SyncStage::Failed(_) + ); // Create FFI progress let ffi_progress = Box::new(FFIDetailedSyncProgress::from(progress)); @@ -945,7 +937,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress( FFIErrorCode::Success as i32 } -// Note: filter headers progress is forwarded via FFIEventCallbacks.on_filter_headers_progress +// Filter header progress updates are included in the detailed sync progress callback. /// Cancels the sync operation. /// @@ -1278,10 +1270,6 @@ pub unsafe extern "C" fn dash_spv_ffi_client_set_event_callbacks( tracing::debug!(" Block callback: {}", callbacks.on_block.is_some()); tracing::debug!(" Transaction callback: {}", callbacks.on_transaction.is_some()); tracing::debug!(" Balance update callback: {}", callbacks.on_balance_update.is_some()); - tracing::debug!( - " Filter headers progress callback: {}", - callbacks.on_filter_headers_progress.is_some() - ); let mut event_callbacks = client.event_callbacks.lock().unwrap(); *event_callbacks = callbacks; diff --git a/dash-spv-ffi/src/types.rs b/dash-spv-ffi/src/types.rs index d419b9a59..d703e376a 100644 --- a/dash-spv-ffi/src/types.rs +++ b/dash-spv-ffi/src/types.rs @@ -38,9 +38,6 @@ pub struct FFISyncProgress { pub filter_header_height: u32, pub masternode_height: u32, pub peer_count: u32, - pub headers_synced: bool, - pub filter_headers_synced: bool, - pub masternodes_synced: bool, pub filter_sync_available: bool, pub filters_downloaded: u32, pub last_synced_filter_height: u32, @@ -53,9 +50,6 @@ impl From for FFISyncProgress { filter_header_height: progress.filter_header_height, masternode_height: progress.masternode_height, peer_count: progress.peer_count, - headers_synced: progress.headers_synced, - filter_headers_synced: progress.filter_headers_synced, - masternodes_synced: progress.masternodes_synced, filter_sync_available: progress.filter_sync_available, filters_downloaded: progress.filters_downloaded as u32, last_synced_filter_height: progress.last_synced_filter_height.unwrap_or(0), @@ -71,8 +65,11 @@ pub enum FFISyncStage { Downloading = 2, Validating = 3, Storing = 4, - Complete = 5, - Failed = 6, + DownloadingFilterHeaders = 5, + DownloadingFilters = 6, + DownloadingBlocks = 7, + Complete = 8, + Failed = 9, } impl From for FFISyncStage { @@ -89,6 +86,15 @@ impl From for FFISyncStage { SyncStage::StoringHeaders { .. } => FFISyncStage::Storing, + SyncStage::DownloadingFilterHeaders { + .. + } => FFISyncStage::DownloadingFilterHeaders, + SyncStage::DownloadingFilters { + .. + } => FFISyncStage::DownloadingFilters, + SyncStage::DownloadingBlocks { + .. + } => FFISyncStage::DownloadingBlocks, SyncStage::Complete => FFISyncStage::Complete, SyncStage::Failed(_) => FFISyncStage::Failed, } @@ -97,14 +103,13 @@ impl From for FFISyncStage { #[repr(C)] pub struct FFIDetailedSyncProgress { - pub current_height: u32, pub total_height: u32, pub percentage: f64, pub headers_per_second: f64, pub estimated_seconds_remaining: i64, // -1 if unknown pub stage: FFISyncStage, pub stage_message: FFIString, - pub connected_peers: u32, + pub overview: FFISyncProgress, pub total_headers: u64, pub sync_start_timestamp: i64, } @@ -126,12 +131,24 @@ impl From for FFIDetailedSyncProgress { SyncStage::StoringHeaders { batch_size, } => format!("Storing {} headers", batch_size), + SyncStage::DownloadingFilterHeaders { + current, + target, + } => format!("Downloading filter headers {} / {}", current, target), + SyncStage::DownloadingFilters { + completed, + total, + } => format!("Downloading filters {} / {}", completed, total), + SyncStage::DownloadingBlocks { + pending, + } => format!("Downloading blocks ({} pending)", pending), SyncStage::Complete => "Synchronization complete".to_string(), SyncStage::Failed(err) => err.clone(), }; + let overview = FFISyncProgress::from(progress.sync_progress.clone()); + FFIDetailedSyncProgress { - current_height: progress.current_height, total_height: progress.peer_best_height, percentage: progress.percentage, headers_per_second: progress.headers_per_second, @@ -141,7 +158,7 @@ impl From for FFIDetailedSyncProgress { .unwrap_or(-1), stage: progress.sync_stage.into(), stage_message: FFIString::new(&stage_message), - connected_peers: progress.connected_peers as u32, + overview, total_headers: progress.total_headers_processed, sync_start_timestamp: progress .sync_start_time diff --git a/dash-spv-ffi/tests/integration/test_full_workflow.rs b/dash-spv-ffi/tests/integration/test_full_workflow.rs index 9fea9492f..ccf0039f5 100644 --- a/dash-spv-ffi/tests/integration/test_full_workflow.rs +++ b/dash-spv-ffi/tests/integration/test_full_workflow.rs @@ -196,7 +196,6 @@ mod tests { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data: &ctx as *const _ as *mut c_void, }; diff --git a/dash-spv-ffi/tests/performance/test_benchmarks.rs b/dash-spv-ffi/tests/performance/test_benchmarks.rs index 423a71899..4096b9def 100644 --- a/dash-spv-ffi/tests/performance/test_benchmarks.rs +++ b/dash-spv-ffi/tests/performance/test_benchmarks.rs @@ -382,9 +382,7 @@ mod tests { filter_header_height: 12340, masternode_height: 12300, peer_count: 8, - headers_synced: true, - filter_headers_synced: true, - masternodes_synced: false, + filter_sync_available: true, filters_downloaded: 1000, last_synced_filter_height: Some(12000), sync_start: std::time::SystemTime::now(), @@ -448,4 +446,4 @@ mod tests { } } } -} \ No newline at end of file +} diff --git a/dash-spv-ffi/tests/test_event_callbacks.rs b/dash-spv-ffi/tests/test_event_callbacks.rs index 0fcdddefd..71dfc38d3 100644 --- a/dash-spv-ffi/tests/test_event_callbacks.rs +++ b/dash-spv-ffi/tests/test_event_callbacks.rs @@ -174,7 +174,6 @@ fn test_event_callbacks_setup() { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data, }; @@ -268,7 +267,6 @@ fn test_enhanced_event_callbacks() { on_mempool_transaction_removed: None, on_compact_filter_matched: Some(test_compact_filter_matched_callback), on_wallet_transaction: Some(test_wallet_transaction_callback), - on_filter_headers_progress: None, user_data: Arc::as_ptr(&event_data) as *mut c_void, }; @@ -325,7 +323,6 @@ fn test_drain_events_integration() { on_mempool_transaction_confirmed: None, on_mempool_transaction_removed: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data, }; dash_spv_ffi_client_set_event_callbacks(client, callbacks); @@ -392,7 +389,6 @@ fn test_drain_events_concurrent_with_callbacks() { on_mempool_transaction_confirmed: None, on_mempool_transaction_removed: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data, }; dash_spv_ffi_client_set_event_callbacks(client, callbacks); @@ -472,7 +468,6 @@ fn test_drain_events_callback_lifecycle() { on_mempool_transaction_confirmed: None, on_mempool_transaction_removed: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data, }; dash_spv_ffi_client_set_event_callbacks(client, callbacks); @@ -491,7 +486,6 @@ fn test_drain_events_callback_lifecycle() { on_mempool_transaction_confirmed: None, on_mempool_transaction_removed: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data: std::ptr::null_mut(), }; dash_spv_ffi_client_set_event_callbacks(client, callbacks); @@ -510,7 +504,6 @@ fn test_drain_events_callback_lifecycle() { on_mempool_transaction_confirmed: None, on_mempool_transaction_removed: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data, }; dash_spv_ffi_client_set_event_callbacks(client, callbacks); diff --git a/dash-spv-ffi/tests/test_types.rs b/dash-spv-ffi/tests/test_types.rs index 23af9b212..da43b6295 100644 --- a/dash-spv-ffi/tests/test_types.rs +++ b/dash-spv-ffi/tests/test_types.rs @@ -84,11 +84,8 @@ mod tests { filter_header_height: 90, masternode_height: 80, peer_count: 5, - headers_synced: true, - filter_headers_synced: false, - masternodes_synced: false, - filters_downloaded: 50, filter_sync_available: true, + filters_downloaded: 50, last_synced_filter_height: Some(45), sync_start: std::time::SystemTime::now(), last_update: std::time::SystemTime::now(), @@ -100,9 +97,6 @@ mod tests { assert_eq!(ffi_progress.filter_header_height, 90); assert_eq!(ffi_progress.masternode_height, 80); assert_eq!(ffi_progress.peer_count, 5); - assert!(ffi_progress.headers_synced); - assert!(!ffi_progress.filter_headers_synced); - assert!(!ffi_progress.masternodes_synced); assert_eq!(ffi_progress.filters_downloaded, 50); assert_eq!(ffi_progress.last_synced_filter_height, 45); } diff --git a/dash-spv-ffi/tests/unit/test_async_operations.rs b/dash-spv-ffi/tests/unit/test_async_operations.rs index b093974db..aafc90856 100644 --- a/dash-spv-ffi/tests/unit/test_async_operations.rs +++ b/dash-spv-ffi/tests/unit/test_async_operations.rs @@ -599,7 +599,6 @@ mod tests { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, - on_filter_headers_progress: None, user_data: &event_data as *const _ as *mut c_void, }; diff --git a/dash-spv-ffi/tests/unit/test_type_conversions.rs b/dash-spv-ffi/tests/unit/test_type_conversions.rs index 3aa61e4a9..ee6586caa 100644 --- a/dash-spv-ffi/tests/unit/test_type_conversions.rs +++ b/dash-spv-ffi/tests/unit/test_type_conversions.rs @@ -144,9 +144,6 @@ mod tests { filter_header_height: u32::MAX, masternode_height: u32::MAX, peer_count: u32::MAX, - headers_synced: true, - filter_headers_synced: true, - masternodes_synced: true, filter_sync_available: true, filters_downloaded: u64::MAX, last_synced_filter_height: Some(u32::MAX), diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index 824c36ec6..ada0831d0 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -19,10 +19,11 @@ use crate::mempool_filter::MempoolFilter; use crate::network::NetworkManager; use crate::storage::StorageManager; use crate::sync::filters::FilterNotificationSender; +use crate::sync::sequential::phases::SyncPhase; use crate::sync::sequential::SequentialSyncManager; use crate::types::{ AddressBalance, ChainState, DetailedSyncProgress, MempoolState, SpvEvent, SpvStats, - SyncProgress, + SyncProgress, SyncStage, }; use crate::validation::ValidationManager; use dashcore::network::constants::NetworkExt; @@ -148,6 +149,61 @@ impl< let _ = self.event_tx.send(event); } + fn map_phase_to_stage( + phase: &SyncPhase, + sync_progress: &SyncProgress, + peer_best_height: u32, + ) -> SyncStage { + match phase { + SyncPhase::Idle => { + if sync_progress.peer_count == 0 { + SyncStage::Connecting + } else { + SyncStage::QueryingPeerHeight + } + } + SyncPhase::DownloadingHeaders { + start_height, + target_height, + .. + } => SyncStage::DownloadingHeaders { + start: *start_height, + end: target_height.unwrap_or(peer_best_height), + }, + SyncPhase::DownloadingMnList { + diffs_processed, + .. + } => SyncStage::ValidatingHeaders { + batch_size: *diffs_processed as usize, + }, + SyncPhase::DownloadingCFHeaders { + current_height, + target_height, + .. + } => SyncStage::DownloadingFilterHeaders { + current: *current_height, + target: *target_height, + }, + SyncPhase::DownloadingFilters { + completed_heights, + total_filters, + .. + } => SyncStage::DownloadingFilters { + completed: completed_heights.len() as u32, + total: *total_filters, + }, + SyncPhase::DownloadingBlocks { + pending_blocks, + .. + } => SyncStage::DownloadingBlocks { + pending: pending_blocks.len(), + }, + SyncPhase::FullySynced { + .. + } => SyncStage::Complete, + } + } + /// Helper to create a StatusDisplay instance. async fn create_status_display(&self) -> StatusDisplay<'_, S> { StatusDisplay::new( @@ -674,8 +730,6 @@ impl< let storage = self.storage.lock().await; storage.get_filter_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0) }, - headers_synced: false, // Will be synced by monitoring loop - filter_headers_synced: false, ..SyncProgress::default() }; @@ -745,6 +799,7 @@ impl< // Last emitted heights for filter headers progress to avoid duplicate events let mut last_emitted_header_height: u32 = 0; let mut last_emitted_filter_header_height: u32 = 0; + let mut last_emitted_filters_downloaded: u64 = 0; loop { // Check if we should stop @@ -901,23 +956,29 @@ impl< } let headers_per_second = headers_this_second as f64; - - // Determine sync stage - let sync_stage = if self.network.peer_count() == 0 { - crate::types::SyncStage::Connecting - } else if current_height == 0 { - crate::types::SyncStage::QueryingPeerHeight - } else if current_height < peer_best { - crate::types::SyncStage::DownloadingHeaders { - start: current_height, - end: peer_best, + let peer_count = self.network.peer_count() as u32; + let phase_snapshot = self.sync_manager.current_phase().clone(); + + let status_display = self.create_status_display().await; + let mut sync_progress = match status_display.sync_progress().await { + Ok(p) => p, + Err(e) => { + tracing::warn!("Failed to compute sync progress snapshot: {}", e); + SyncProgress::default() } - } else { - crate::types::SyncStage::Complete }; + // Update peer count with the latest network information. + sync_progress.peer_count = peer_count; + sync_progress.header_height = current_height; + sync_progress.filter_sync_available = self.config.enable_filters; + + let sync_stage = + Self::map_phase_to_stage(&phase_snapshot, &sync_progress, peer_best); + let filters_downloaded = sync_progress.filters_downloaded; + let progress = DetailedSyncProgress { - current_height, + sync_progress, peer_best_height: peer_best, percentage: if peer_best > 0 { (current_height as f64 / peer_best as f64 * 100.0).min(100.0) @@ -935,13 +996,13 @@ impl< None }, sync_stage, - connected_peers: self.network.peer_count(), total_headers_processed: current_height as u64, total_bytes_downloaded, sync_start_time, last_update_time: SystemTime::now(), }; + last_emitted_filters_downloaded = filters_downloaded; self.emit_progress(progress); headers_this_second = 0; @@ -956,24 +1017,67 @@ impl< storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0); (storage_tip, filter_tip) }; - if abs_header_height != last_emitted_header_height - || filter_header_height != last_emitted_filter_header_height + { - if abs_header_height > 0 { - let pct = if filter_header_height <= abs_header_height { - (filter_header_height as f64 / abs_header_height as f64 * 100.0) - .min(100.0) - } else { - 0.0 + // Build and emit a fresh DetailedSyncProgress snapshot reflecting current filter progress + let peer_best = self + .network + .get_peer_best_height() + .await + .ok() + .flatten() + .unwrap_or(abs_header_height); + + let phase_snapshot = self.sync_manager.current_phase().clone(); + let status_display = self.create_status_display().await; + let mut sync_progress = match status_display.sync_progress().await { + Ok(p) => p, + Err(e) => { + tracing::warn!( + "Failed to compute sync progress snapshot (filter): {}", + e + ); + SyncProgress::default() + } + }; + // Ensure we include up-to-date header height and peer count + let peer_count = self.network.peer_count() as u32; + sync_progress.peer_count = peer_count; + sync_progress.header_height = abs_header_height; + sync_progress.filter_sync_available = self.config.enable_filters; + + let filters_downloaded = sync_progress.filters_downloaded; + + if abs_header_height != last_emitted_header_height + || filter_header_height != last_emitted_filter_header_height + || filters_downloaded != last_emitted_filters_downloaded + { + let sync_stage = + Self::map_phase_to_stage(&phase_snapshot, &sync_progress, peer_best); + + let progress = DetailedSyncProgress { + sync_progress, + peer_best_height: peer_best, + percentage: if peer_best > 0 { + (abs_header_height as f64 / peer_best as f64 * 100.0).min(100.0) + } else { + 0.0 + }, + headers_per_second: 0.0, + bytes_per_second: 0, + estimated_time_remaining: None, + sync_stage, + total_headers_processed: abs_header_height as u64, + total_bytes_downloaded, + sync_start_time, + last_update_time: SystemTime::now(), }; - self.emit_event(SpvEvent::FilterHeadersProgress { - filter_header_height, - header_height: abs_header_height, - percentage: pct, - }); + last_emitted_header_height = abs_header_height; + last_emitted_filter_header_height = filter_header_height; + last_emitted_filters_downloaded = filters_downloaded; + + self.emit_progress(progress); } - last_emitted_header_height = abs_header_height; - last_emitted_filter_header_height = filter_header_height; } last_status_update = Instant::now(); @@ -1923,18 +2027,11 @@ impl< tracing::debug!("Sequential sync manager will resume from stored state"); // Determine phase based on sync progress - if saved_state.sync_progress.headers_synced { - if saved_state.sync_progress.filter_headers_synced { - // Headers and filter headers done, we're in filter download phase - tracing::info!("Resuming sequential sync in filter download phase"); - } else { - // Headers done, need filter headers - tracing::info!("Resuming sequential sync in filter header download phase"); - } - } else { - // Still downloading headers - tracing::info!("Resuming sequential sync in header download phase"); - } + tracing::info!( + "Resuming sequential sync; saved header height {} filter header height {}", + saved_state.sync_progress.header_height, + saved_state.sync_progress.filter_header_height + ); // Reset any in-flight requests self.sync_manager.reset_pending_requests(); diff --git a/dash-spv/src/client/status_display.rs b/dash-spv/src/client/status_display.rs index 841f7b5c8..a558921a6 100644 --- a/dash-spv/src/client/status_display.rs +++ b/dash-spv/src/client/status_display.rs @@ -99,9 +99,6 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> { filter_header_height, masternode_height: state.last_masternode_diff_height.unwrap_or(0), peer_count: 1, // TODO: Get from network manager - headers_synced: false, // TODO: Implement - filter_headers_synced: false, // TODO: Implement - masternodes_synced: false, // TODO: Implement filter_sync_available: false, // TODO: Get from network manager filters_downloaded: filters_received, last_synced_filter_height, diff --git a/dash-spv/src/storage/sync_state.rs b/dash-spv/src/storage/sync_state.rs index 379a0b55d..27318a44b 100644 --- a/dash-spv/src/storage/sync_state.rs +++ b/dash-spv/src/storage/sync_state.rs @@ -172,12 +172,8 @@ impl PersistentSyncState { sync_progress: sync_progress.clone(), checkpoints: Self::create_checkpoints(chain_state), masternode_sync: MasternodeSyncState { - last_synced_height: if sync_progress.masternodes_synced { - Some(sync_progress.masternode_height) - } else { - None - }, - is_synced: sync_progress.masternodes_synced, + last_synced_height: None, + is_synced: false, masternode_count: chain_state .masternode_engine .as_ref() diff --git a/dash-spv/src/sync/filters.rs b/dash-spv/src/sync/filters.rs index d97e6f9f0..c8509028f 100644 --- a/dash-spv/src/sync/filters.rs +++ b/dash-spv/src/sync/filters.rs @@ -2828,6 +2828,9 @@ impl Option { @@ -138,7 +137,8 @@ impl DetailedSyncProgress { return None; } - let remaining = self.peer_best_height.saturating_sub(self.current_height); + let current_height = self.sync_progress.header_height; + let remaining = self.peer_best_height.saturating_sub(current_height); if remaining == 0 { return Some(Duration::from_secs(0)); } @@ -788,23 +788,6 @@ pub enum SpvEvent { percentage: f64, }, - /// Filter headers progress update. - /// - /// Carries absolute blockchain heights for both the current filter header tip - /// and the current block header tip, along with a convenience percentage - /// (filter_header_height / header_height * 100), clamped to [0, 100]. - /// - /// Consumers who sync from a checkpoint may prefer to recompute a - /// checkpoint-relative percentage using their base height. - FilterHeadersProgress { - /// Current absolute height of synchronized filter headers. - filter_header_height: u32, - /// Current absolute height of synchronized block headers. - header_height: u32, - /// Convenience percentage in [0, 100]. - percentage: f64, - }, - /// ChainLock received and validated. ChainLockReceived { /// Block height of the ChainLock. diff --git a/dash-spv/tests/header_sync_test.rs b/dash-spv/tests/header_sync_test.rs index 032cf80f1..f254181bc 100644 --- a/dash-spv/tests/header_sync_test.rs +++ b/dash-spv/tests/header_sync_test.rs @@ -319,7 +319,6 @@ async fn test_header_sync_with_client_integration() { let stats = stats.unwrap(); assert_eq!(stats.header_height, 0); - assert!(!stats.headers_synced); info!("Header sync client integration test completed"); } diff --git a/dash-spv/tests/integration_real_node_test.rs b/dash-spv/tests/integration_real_node_test.rs index 163bd8ccd..596012f40 100644 --- a/dash-spv/tests/integration_real_node_test.rs +++ b/dash-spv/tests/integration_real_node_test.rs @@ -132,8 +132,8 @@ async fn test_real_header_sync_genesis_to_1000() { client.sync_progress().await.expect("Failed to get initial sync progress"); info!( - "Initial sync state: height={}, synced={}", - initial_progress.header_height, initial_progress.headers_synced + "Initial sync state: header_height={} filter_header_height={}", + initial_progress.header_height, initial_progress.filter_header_height ); // Perform header sync @@ -241,7 +241,7 @@ async fn test_real_header_sync_up_to_10k() { } // Check if we've reached our target or sync is complete - if progress.header_height >= MAX_TEST_HEADERS || progress.headers_synced { + if progress.header_height >= MAX_TEST_HEADERS { return Ok::<_, dash_spv::error::SpvError>(progress); } diff --git a/swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h b/swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h index 5ae81d3c3..b8169ea5c 100644 --- a/swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h +++ b/swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h @@ -28,8 +28,11 @@ typedef enum FFISyncStage { Downloading = 2, Validating = 3, Storing = 4, - Complete = 5, - Failed = 6, + DownloadingFilterHeaders = 5, + DownloadingFilters = 6, + DownloadingBlocks = 7, + Complete = 8, + Failed = 9, } FFISyncStage; typedef enum DashSpvValidationMode { @@ -61,6 +64,7 @@ typedef struct FFIArray { typedef struct FFIClientConfig { void *inner; + uint32_t worker_threads; } FFIClientConfig; @@ -69,32 +73,28 @@ typedef struct FFIString { uintptr_t length; } FFIString; +typedef struct FFISyncProgress { + uint32_t header_height; + uint32_t filter_header_height; + uint32_t masternode_height; + uint32_t peer_count; + bool filter_sync_available; + uint32_t filters_downloaded; + uint32_t last_synced_filter_height; +} FFISyncProgress; + typedef struct FFIDetailedSyncProgress { - uint32_t current_height; uint32_t total_height; double percentage; double headers_per_second; int64_t estimated_seconds_remaining; enum FFISyncStage stage; struct FFIString stage_message; - uint32_t connected_peers; + struct FFISyncProgress overview; uint64_t total_headers; int64_t sync_start_timestamp; } FFIDetailedSyncProgress; -typedef struct FFISyncProgress { - uint32_t header_height; - uint32_t filter_header_height; - uint32_t masternode_height; - uint32_t peer_count; - bool headers_synced; - bool filter_headers_synced; - bool masternodes_synced; - bool filter_sync_available; - uint32_t filters_downloaded; - uint32_t last_synced_filter_height; -} FFISyncProgress; - typedef struct FFISpvStats { uint32_t connected_peers; uint32_t total_peers; @@ -149,11 +149,6 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id, bool is_ours, void *user_data); -typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height, - uint32_t header_height, - double percentage, - void *user_data); - typedef struct FFIEventCallbacks { BlockCallback on_block; TransactionCallback on_transaction; @@ -163,7 +158,6 @@ typedef struct FFIEventCallbacks { MempoolRemovedCallback on_mempool_transaction_removed; CompactFilterMatchedCallback on_compact_filter_matched; WalletTransactionCallback on_wallet_transaction; - FilterHeadersProgressCallback on_filter_headers_progress; void *user_data; } FFIEventCallbacks; @@ -281,6 +275,14 @@ struct FFIArray dash_spv_ffi_checkpoints_between_heights(FFINetwork network, */ struct FFIDashSpvClient *dash_spv_ffi_client_new(const struct FFIClientConfig *config) ; +/** + * Drain pending events and invoke configured callbacks (non-blocking). + * + * # Safety + * - `client` must be a valid, non-null pointer. + */ + int32_t dash_spv_ffi_client_drain_events(struct FFIDashSpvClient *client) ; + /** * Update the running client's configuration. * @@ -392,10 +394,8 @@ int32_t dash_spv_ffi_client_sync_to_tip_with_progress(struct FFIDashSpvClient *c /** * Cancels the sync operation. * - * **Note**: This function currently only stops the SPV client and clears sync callbacks, - * but does not fully abort the ongoing sync process. The sync operation may continue - * running in the background until it completes naturally. Full sync cancellation with - * proper task abortion is not yet implemented. + * This stops the SPV client, clears callbacks, and joins active threads so the sync + * operation halts immediately. * * # Safety * The client pointer must be valid and non-null. @@ -704,6 +704,14 @@ int32_t dash_spv_ffi_config_set_masternode_sync_enabled(struct FFIClientConfig * void dash_spv_ffi_config_destroy(struct FFIClientConfig *config) ; +/** + * Sets the number of Tokio worker threads for the FFI runtime (0 = auto) + * + * # Safety + * - `config` must be a valid pointer to an FFIClientConfig + */ + int32_t dash_spv_ffi_config_set_worker_threads(struct FFIClientConfig *config, uint32_t threads) ; + /** * Enables or disables mempool tracking * diff --git a/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Core/SPVClient.swift b/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Core/SPVClient.swift index 187db798f..110e6b55f 100644 --- a/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Core/SPVClient.swift +++ b/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Core/SPVClient.swift @@ -8,17 +8,19 @@ import Network /// Detailed sync progress information with real-time statistics public struct DetailedSyncProgress: Sendable, Equatable { - public let currentHeight: UInt32 + public let overview: SyncProgress public let totalHeight: UInt32 public let percentage: Double public let headersPerSecond: Double public let estimatedSecondsRemaining: Int64 public let stage: SyncStage public let stageMessage: String - public let connectedPeers: UInt32 public let totalHeadersProcessed: UInt64 public let syncStartTimestamp: Date + public var currentHeight: UInt32 { overview.currentHeight } + public var connectedPeers: UInt32 { overview.peerCount } + /// Calculated properties public var blocksRemaining: UInt32 { guard totalHeight > currentHeight else { return 0 } @@ -65,39 +67,36 @@ public struct DetailedSyncProgress: Sendable, Equatable { /// Public initializer for creating DetailedSyncProgress public init( - currentHeight: UInt32, + overview: SyncProgress, totalHeight: UInt32, percentage: Double, headersPerSecond: Double, estimatedSecondsRemaining: Int64, stage: SyncStage, stageMessage: String, - connectedPeers: UInt32, totalHeadersProcessed: UInt64, syncStartTimestamp: Date ) { - self.currentHeight = currentHeight + self.overview = overview self.totalHeight = totalHeight self.percentage = percentage self.headersPerSecond = headersPerSecond self.estimatedSecondsRemaining = estimatedSecondsRemaining self.stage = stage self.stageMessage = stageMessage - self.connectedPeers = connectedPeers self.totalHeadersProcessed = totalHeadersProcessed self.syncStartTimestamp = syncStartTimestamp } /// Initialize from FFI type internal init(ffiProgress: FFIDetailedSyncProgress) { - self.currentHeight = ffiProgress.current_height + self.overview = SyncProgress(ffiProgress: ffiProgress.overview) self.totalHeight = ffiProgress.total_height self.percentage = ffiProgress.percentage self.headersPerSecond = ffiProgress.headers_per_second self.estimatedSecondsRemaining = ffiProgress.estimated_seconds_remaining self.stage = SyncStage(ffiStage: ffiProgress.stage) self.stageMessage = String(cString: ffiProgress.stage_message.ptr) - self.connectedPeers = ffiProgress.connected_peers self.totalHeadersProcessed = ffiProgress.total_headers self.syncStartTimestamp = Date(timeIntervalSince1970: TimeInterval(ffiProgress.sync_start_timestamp)) } @@ -108,11 +107,14 @@ public enum SyncStage: Equatable, Sendable { case connecting case queryingHeight case downloading + case downloadingFilterHeaders + case downloadingFilters + case downloadingBlocks case validating case storing case complete case failed - + /// Initialize from FFI enum value internal init(ffiStage: FFISyncStage) { switch ffiStage.rawValue { @@ -126,9 +128,15 @@ public enum SyncStage: Equatable, Sendable { self = .validating case 4: // Storing self = .storing - case 5: // Complete + case 5: // Downloading filter headers + self = .downloadingFilterHeaders + case 6: // Downloading filters + self = .downloadingFilters + case 7: // Downloading blocks + self = .downloadingBlocks + case 8: // Complete self = .complete - case 6: // Failed + case 9: // Failed self = .failed default: self = .failed @@ -143,6 +151,12 @@ public enum SyncStage: Equatable, Sendable { return "Querying blockchain height" case .downloading: return "Downloading headers" + case .downloadingFilterHeaders: + return "Downloading filter headers" + case .downloadingFilters: + return "Downloading filters" + case .downloadingBlocks: + return "Downloading blocks" case .validating: return "Validating headers" case .storing: @@ -171,6 +185,12 @@ public enum SyncStage: Equatable, Sendable { return "๐Ÿ”" case .downloading: return "โฌ‡๏ธ" + case .downloadingFilterHeaders: + return "๐Ÿงพ" + case .downloadingFilters: + return "๐Ÿช„" + case .downloadingBlocks: + return "๐Ÿ“ฆ" case .validating: return "โœ…" case .storing: @@ -308,6 +328,9 @@ extension DetailedSyncProgress { "Time Remaining": formattedTimeRemaining, "Connected Peers": "\(connectedPeers)", "Headers Processed": "\(totalHeadersProcessed)", + "Filter Header Height": "\(overview.filterHeaderHeight)", + "Filters Downloaded": "\(overview.filtersDownloaded)", + "Peer Count": "\(overview.peerCount)", "Duration": formattedSyncDuration ] } @@ -1240,4 +1263,4 @@ extension SPVClient { public func syncProgressStream() -> SyncProgressStream { return SyncProgressStream(client: self) } -} \ No newline at end of file +} diff --git a/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Models/SyncProgress.swift b/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Models/SyncProgress.swift index ba70221f9..e4795c1da 100644 --- a/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Models/SyncProgress.swift +++ b/swift-dash-core-sdk/Sources/SwiftDashCoreSDK/Models/SyncProgress.swift @@ -11,6 +11,11 @@ public struct SyncProgress: Sendable, Equatable { public let estimatedTimeRemaining: TimeInterval? public let message: String? public let filterSyncAvailable: Bool + public let filterHeaderHeight: UInt32 + public let masternodeHeight: UInt32 + public let peerCount: UInt32 + public let filtersDownloaded: UInt32 + public let lastSyncedFilterHeight: UInt32 public init( currentHeight: UInt32, @@ -19,7 +24,12 @@ public struct SyncProgress: Sendable, Equatable { status: SyncStatus, estimatedTimeRemaining: TimeInterval? = nil, message: String? = nil, - filterSyncAvailable: Bool = false + filterSyncAvailable: Bool = false, + filterHeaderHeight: UInt32 = 0, + masternodeHeight: UInt32 = 0, + peerCount: UInt32 = 0, + filtersDownloaded: UInt32 = 0, + lastSyncedFilterHeight: UInt32 = 0 ) { self.currentHeight = currentHeight self.totalHeight = totalHeight @@ -28,16 +38,26 @@ public struct SyncProgress: Sendable, Equatable { self.estimatedTimeRemaining = estimatedTimeRemaining self.message = message self.filterSyncAvailable = filterSyncAvailable + self.filterHeaderHeight = filterHeaderHeight + self.masternodeHeight = masternodeHeight + self.peerCount = peerCount + self.filtersDownloaded = filtersDownloaded + self.lastSyncedFilterHeight = lastSyncedFilterHeight } internal init(ffiProgress: FFISyncProgress) { self.currentHeight = ffiProgress.header_height self.totalHeight = 0 // FFISyncProgress doesn't provide total height - self.progress = ffiProgress.headers_synced ? 1.0 : 0.0 - self.status = ffiProgress.headers_synced ? .synced : .downloadingHeaders + self.progress = 0.0 + self.status = .downloadingHeaders self.estimatedTimeRemaining = nil self.message = nil self.filterSyncAvailable = ffiProgress.filter_sync_available + self.filterHeaderHeight = ffiProgress.filter_header_height + self.masternodeHeight = ffiProgress.masternode_height + self.peerCount = ffiProgress.peer_count + self.filtersDownloaded = ffiProgress.filters_downloaded + self.lastSyncedFilterHeight = ffiProgress.last_synced_filter_height } public var blocksRemaining: UInt32 { @@ -120,4 +140,4 @@ public enum SyncStatus: String, Codable, Sendable { return true } } -} \ No newline at end of file +}