diff --git a/dash-spv-ffi/include/dash_spv_ffi.h b/dash-spv-ffi/include/dash_spv_ffi.h index bf2c0073e..cfe225e20 100644 --- a/dash-spv-ffi/include/dash_spv_ffi.h +++ b/dash-spv-ffi/include/dash_spv_ffi.h @@ -149,6 +149,11 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id, bool is_ours, void *user_data); +typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height, + uint32_t header_height, + double percentage, + void *user_data); + typedef struct FFIEventCallbacks { BlockCallback on_block; TransactionCallback on_transaction; @@ -158,6 +163,7 @@ typedef struct FFIEventCallbacks { MempoolRemovedCallback on_mempool_transaction_removed; CompactFilterMatchedCallback on_compact_filter_matched; WalletTransactionCallback on_wallet_transaction; + FilterHeadersProgressCallback on_filter_headers_progress; void *user_data; } FFIEventCallbacks; diff --git a/dash-spv-ffi/src/callbacks.rs b/dash-spv-ffi/src/callbacks.rs index 5506b3ddf..6e0bc6791 100644 --- a/dash-spv-ffi/src/callbacks.rs +++ b/dash-spv-ffi/src/callbacks.rs @@ -135,6 +135,10 @@ pub type WalletTransactionCallback = Option< ), >; +pub type FilterHeadersProgressCallback = Option< + extern "C" fn(filter_height: u32, header_height: u32, percentage: f64, user_data: *mut c_void), +>; + #[repr(C)] pub struct FFIEventCallbacks { pub on_block: BlockCallback, @@ -145,6 +149,7 @@ pub struct FFIEventCallbacks { pub on_mempool_transaction_removed: MempoolRemovedCallback, pub on_compact_filter_matched: CompactFilterMatchedCallback, pub on_wallet_transaction: WalletTransactionCallback, + pub on_filter_headers_progress: FilterHeadersProgressCallback, pub user_data: *mut c_void, } @@ -173,6 +178,7 @@ impl Default for FFIEventCallbacks { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, + on_filter_headers_progress: None, user_data: std::ptr::null_mut(), } } @@ -383,3 +389,23 @@ impl FFIEventCallbacks { } } } + +impl FFIEventCallbacks { + pub fn call_filter_headers_progress( + &self, + filter_height: u32, + header_height: u32, + percentage: f64, + ) { + if let Some(callback) = self.on_filter_headers_progress { + tracing::info!( + "📊 Calling filter headers progress callback: filter_height={}, header_height={}, pct={:.2}", + filter_height, header_height, percentage + ); + callback(filter_height, header_height, percentage, self.user_data); + tracing::info!("✅ Filter headers progress callback completed"); + } else { + tracing::debug!("Filter headers progress callback not set"); + } + } +} diff --git a/dash-spv-ffi/src/client.rs b/dash-spv-ffi/src/client.rs index cade88082..dbd2288a0 100644 --- a/dash-spv-ffi/src/client.rs +++ b/dash-spv-ffi/src/client.rs @@ -230,6 +230,16 @@ impl FFIDashSpvClient { confirmed, unconfirmed, total); callbacks.call_balance_update(confirmed, unconfirmed); } + dash_spv::types::SpvEvent::FilterHeadersProgress { filter_header_height, header_height, percentage } => { + tracing::info!("📊 Filter headers progress event: filter={}, header={}, pct={:.2}", + filter_header_height, header_height, percentage); + callbacks + .call_filter_headers_progress( + filter_header_height, + header_height, + percentage, + ); + } dash_spv::types::SpvEvent::TransactionDetected { ref txid, confirmed, ref addresses, amount, block_height, .. } => { tracing::info!("💸 Transaction detected: txid={}, confirmed={}, amount={}, addresses={:?}, height={:?}", txid, confirmed, amount, addresses, block_height); @@ -717,6 +727,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress( let inner = client.inner.clone(); let runtime = client.runtime.clone(); let sync_callbacks = client.sync_callbacks.clone(); + // Shared flag to coordinate internal threads during sync + let sync_running = Arc::new(AtomicBool::new(true)); // Take progress receiver from client let progress_receiver = { @@ -772,6 +784,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress( // Spawn sync task in a separate thread with safe callback access let runtime_handle = runtime.handle().clone(); let sync_callbacks_clone = sync_callbacks.clone(); + let sync_running_for_join = sync_running.clone(); let sync_handle = std::thread::spawn(move || { // Run monitoring loop let monitor_result = runtime_handle.block_on(async move { @@ -792,6 +805,9 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress( res }); + // Signal background handlers to stop + sync_running_for_join.store(false, Ordering::Relaxed); + // Send completion callback and cleanup { let mut cb_guard = sync_callbacks_clone.lock().unwrap(); @@ -843,6 +859,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress( FFIErrorCode::Success as i32 } +// Note: filter headers progress is forwarded via FFIEventCallbacks.on_filter_headers_progress + /// Cancels the sync operation. /// /// **Note**: This function currently only stops the SPV client and clears sync callbacks, @@ -1021,6 +1039,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_set_event_callbacks( tracing::info!(" Block callback: {}", callbacks.on_block.is_some()); tracing::info!(" Transaction callback: {}", callbacks.on_transaction.is_some()); tracing::info!(" Balance update callback: {}", callbacks.on_balance_update.is_some()); + tracing::info!( + " Filter headers progress callback: {}", + callbacks.on_filter_headers_progress.is_some() + ); let mut event_callbacks = client.event_callbacks.lock().unwrap(); *event_callbacks = callbacks; diff --git a/dash-spv-ffi/tests/integration/test_full_workflow.rs b/dash-spv-ffi/tests/integration/test_full_workflow.rs index a4c95fcf1..9fea9492f 100644 --- a/dash-spv-ffi/tests/integration/test_full_workflow.rs +++ b/dash-spv-ffi/tests/integration/test_full_workflow.rs @@ -191,6 +191,12 @@ mod tests { on_block: Some(on_block), on_transaction: Some(on_transaction), on_balance_update: Some(on_balance), + on_mempool_transaction_added: None, + on_mempool_transaction_confirmed: None, + on_mempool_transaction_removed: None, + on_compact_filter_matched: None, + on_wallet_transaction: None, + on_filter_headers_progress: None, user_data: &ctx as *const _ as *mut c_void, }; @@ -536,4 +542,4 @@ mod tests { ctx.cleanup(); } } -} \ No newline at end of file +} diff --git a/dash-spv-ffi/tests/test_event_callbacks.rs b/dash-spv-ffi/tests/test_event_callbacks.rs index 7c6e552a4..50cbded24 100644 --- a/dash-spv-ffi/tests/test_event_callbacks.rs +++ b/dash-spv-ffi/tests/test_event_callbacks.rs @@ -173,6 +173,7 @@ fn test_event_callbacks_setup() { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, + on_filter_headers_progress: None, user_data, }; @@ -266,6 +267,7 @@ fn test_enhanced_event_callbacks() { on_mempool_transaction_removed: None, on_compact_filter_matched: Some(test_compact_filter_matched_callback), on_wallet_transaction: Some(test_wallet_transaction_callback), + on_filter_headers_progress: None, user_data: Arc::as_ptr(&event_data) as *mut c_void, }; diff --git a/dash-spv-ffi/tests/unit/test_async_operations.rs b/dash-spv-ffi/tests/unit/test_async_operations.rs index aafc90856..b093974db 100644 --- a/dash-spv-ffi/tests/unit/test_async_operations.rs +++ b/dash-spv-ffi/tests/unit/test_async_operations.rs @@ -599,6 +599,7 @@ mod tests { on_mempool_transaction_removed: None, on_compact_filter_matched: None, on_wallet_transaction: None, + on_filter_headers_progress: None, user_data: &event_data as *const _ as *mut c_void, }; diff --git a/dash-spv/src/client/message_handler.rs b/dash-spv/src/client/message_handler.rs index 9dcc37661..2123cc2bb 100644 --- a/dash-spv/src/client/message_handler.rs +++ b/dash-spv/src/client/message_handler.rs @@ -164,6 +164,25 @@ impl< { tracing::error!("Sequential sync manager error handling message: {}", e); } + + // Additionally forward compact filters to the block processor so it can + // perform wallet matching and emit CompactFilterMatched events. + if let NetworkMessage::CFilter(ref cfilter_msg) = message { + let (response_tx, _response_rx) = tokio::sync::oneshot::channel(); + let task = crate::client::BlockProcessingTask::ProcessCompactFilter { + filter: dashcore::bip158::BlockFilter { + content: cfilter_msg.filter.clone(), + }, + block_hash: cfilter_msg.block_hash, + response_tx, + }; + if let Err(e) = self.block_processor_tx.send(task) { + tracing::warn!( + "Failed to forward CFilter to block processor for event emission: {}", + e + ); + } + } } NetworkMessage::Block(_) => { // Blocks can be large - avoid cloning unless necessary diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index ac6ca5ecb..df1e2238d 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -728,6 +728,10 @@ impl< // Track masternode sync completion for ChainLock validation let mut masternode_engine_updated = false; + // Last emitted heights for filter headers progress to avoid duplicate events + let mut last_emitted_header_height: u32 = 0; + let mut last_emitted_filter_header_height: u32 = 0; + loop { // Check if we should stop let running = self.running.read().await; @@ -862,10 +866,14 @@ impl< // Emit detailed progress update if last_rate_calc.elapsed() >= Duration::from_secs(1) { - let current_height = { + // Storage tip is the headers vector index (0-based). + let current_storage_tip = { let storage = self.storage.lock().await; storage.get_tip_height().await.ok().flatten().unwrap_or(0) }; + // Convert to absolute blockchain height: base + storage_tip + let sync_base_height = { self.state.read().await.sync_base_height }; + let current_height = sync_base_height + current_storage_tip; let peer_best = self .network .get_peer_best_height() @@ -875,9 +883,9 @@ impl< .unwrap_or(current_height); // Calculate headers downloaded this second - if current_height > last_height { - headers_this_second = current_height - last_height; - last_height = current_height; + if current_storage_tip > last_height { + headers_this_second = current_storage_tip - last_height; + last_height = current_storage_tip; } let headers_per_second = headers_this_second as f64; @@ -928,6 +936,34 @@ impl< last_rate_calc = Instant::now(); } + // Emit filter headers progress only when heights change + let (abs_header_height, filter_header_height) = { + let storage = self.storage.lock().await; + let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0); + let filter_tip = + storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0); + (self.state.read().await.sync_base_height + storage_tip, filter_tip) + }; + if abs_header_height != last_emitted_header_height + || filter_header_height != last_emitted_filter_header_height + { + if abs_header_height > 0 { + let pct = if filter_header_height <= abs_header_height { + (filter_header_height as f64 / abs_header_height as f64 * 100.0) + .min(100.0) + } else { + 0.0 + }; + self.emit_event(SpvEvent::FilterHeadersProgress { + filter_header_height, + header_height: abs_header_height, + percentage: pct, + }); + } + last_emitted_header_height = abs_header_height; + last_emitted_filter_header_height = filter_header_height; + } + last_status_update = Instant::now(); } @@ -2374,6 +2410,13 @@ impl< } } + tracing::debug!( + "get_stats: header_height={}, filter_height={}, peers={}", + stats.header_height, + stats.filter_height, + stats.connected_peers + ); + Ok(stats) } diff --git a/dash-spv/src/storage/memory.rs b/dash-spv/src/storage/memory.rs index f5e05cd1d..aad026b80 100644 --- a/dash-spv/src/storage/memory.rs +++ b/dash-spv/src/storage/memory.rs @@ -56,16 +56,32 @@ impl StorageManager for MemoryStorageManager { initial_count ); + // Determine absolute height offset (for checkpoint-based sync) once per batch + // If syncing from a checkpoint, storage index 0 corresponds to absolute height + // sync_base_height (base-inclusive). Otherwise, absolute height equals storage index. + let (sync_base_height, synced_from_checkpoint) = match self.load_sync_state().await { + Ok(Some(state)) => (state.sync_base_height, state.synced_from_checkpoint), + _ => (0u32, false), + }; + let abs_offset: u32 = if synced_from_checkpoint && sync_base_height > 0 { + sync_base_height + } else { + 0 + }; + for header in headers { - let height = self.headers.len() as u32; + let storage_index = self.headers.len() as u32; let block_hash = header.block_hash(); // Check if we already have this header if self.header_hash_index.contains_key(&block_hash) { + let existing_index = self.header_hash_index.get(&block_hash).copied(); + let existing_abs = existing_index.map(|i| i.saturating_add(abs_offset)); tracing::warn!( - "MemoryStorage: header {} already exists at height {:?}, skipping", + "MemoryStorage: header {} already exists at storage_index {:?} (abs height {:?}), skipping", block_hash, - self.header_hash_index.get(&block_hash) + existing_index, + existing_abs ); continue; } @@ -74,9 +90,15 @@ impl StorageManager for MemoryStorageManager { self.headers.push(*header); // Update the reverse index - self.header_hash_index.insert(block_hash, height); - - tracing::debug!("MemoryStorage: stored header {} at height {}", block_hash, height); + self.header_hash_index.insert(block_hash, storage_index); + + let abs_height = storage_index.saturating_add(abs_offset); + tracing::debug!( + "MemoryStorage: stored header {} at storage_index {} (abs height {})", + block_hash, + storage_index, + abs_height + ); } let final_count = self.headers.len(); @@ -119,27 +141,73 @@ impl StorageManager for MemoryStorageManager { } async fn load_filter_headers(&self, range: Range) -> StorageResult> { - let start = range.start as usize; - let end = range.end.min(self.filter_headers.len() as u32) as usize; + // Interpret range as blockchain (absolute) heights and map to storage indices + let (base, has_base) = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + (state.sync_base_height, true) + } + _ => (0u32, false), + }; - if start > self.filter_headers.len() { + let start_idx = if has_base { + if range.start < base { + 0usize + } else { + (range.start - base) as usize + } + } else { + range.start as usize + }; + + let end_abs = range.end.min(if has_base { + base + self.filter_headers.len() as u32 + } else { + self.filter_headers.len() as u32 + }); + + let end_idx = if has_base { + if end_abs <= base { + 0usize + } else { + (end_abs - base) as usize + } + } else { + end_abs as usize + }; + + if start_idx > self.filter_headers.len() { return Ok(Vec::new()); } - Ok(self.filter_headers[start..end].to_vec()) + let end_idx = end_idx.min(self.filter_headers.len()); + Ok(self.filter_headers[start_idx..end_idx].to_vec()) } async fn get_filter_header(&self, height: u32) -> StorageResult> { - // Filter headers are stored starting from height 0 in the vector - Ok(self.filter_headers.get(height as usize).copied()) + // Map blockchain (absolute) height to storage index relative to checkpoint base + let base = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + state.sync_base_height + } + _ => 0u32, + }; + + let idx = height.saturating_sub(base) as usize; + Ok(self.filter_headers.get(idx).copied()) } async fn get_filter_tip_height(&self) -> StorageResult> { if self.filter_headers.is_empty() { Ok(None) } else { - // Filter headers are stored starting from height 0, so length-1 gives us the highest height - Ok(Some(self.filter_headers.len() as u32 - 1)) + // Return blockchain (absolute) height for the tip, accounting for checkpoint base + let base = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + state.sync_base_height + } + _ => 0u32, + }; + Ok(Some(base + self.filter_headers.len() as u32 - 1)) } } @@ -255,7 +323,21 @@ impl StorageManager for MemoryStorageManager { } async fn get_header_height_by_hash(&self, hash: &BlockHash) -> StorageResult> { - Ok(self.header_hash_index.get(hash).copied()) + // Return ABSOLUTE blockchain height for consistency with DiskStorage. + // memory.header_hash_index stores storage index; convert to absolute height using base. + let storage_index = match self.header_hash_index.get(hash).copied() { + Some(idx) => idx, + None => return Ok(None), + }; + + let base = match self.load_sync_state().await { + Ok(Some(state)) if state.synced_from_checkpoint && state.sync_base_height > 0 => { + state.sync_base_height + } + _ => 0u32, + }; + + Ok(Some(base + storage_index)) } async fn get_headers_batch( diff --git a/dash-spv/src/sync/filters.rs b/dash-spv/src/sync/filters.rs index c5dd21262..0608bc44a 100644 --- a/dash-spv/src/sync/filters.rs +++ b/dash-spv/src/sync/filters.rs @@ -19,7 +19,8 @@ use crate::storage::StorageManager; use crate::types::SyncProgress; // Constants for filter synchronization -const FILTER_BATCH_SIZE: u32 = 1999; // Stay under Dash Core's 2000 limit (for CFHeaders) +// Stay under Dash Core's 2000 limit (for CFHeaders). Using 1999 helps reduce accidental overlaps. +const FILTER_BATCH_SIZE: u32 = 1999; const SYNC_TIMEOUT_SECONDS: u64 = 5; const DEFAULT_FILTER_SYNC_RANGE: u32 = 100; const FILTER_REQUEST_BATCH_SIZE: u32 = 100; // For compact filter requests (CFilters) @@ -112,14 +113,14 @@ impl SyncResult<(u32, u32, u32)> { - let storage_tip_height = storage + let storage_tip_index = storage .get_tip_height() .await .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? .unwrap_or(0); - // Convert storage height to blockchain height - let header_tip_height = self.storage_to_blockchain_height(storage_tip_height); + // Convert block header storage index to absolute blockchain height + let header_tip_height = self.header_storage_to_abs_height(storage_tip_index); let stop_height = self .find_height_for_block_hash(&cf_headers.stop_hash, storage, 0, header_tip_height) @@ -132,14 +133,45 @@ impl storage.get_header(idx).await.ok().flatten().map(|h| h.block_hash()), + None => None, + }; + + // Always try to resolve the expected/requested start as well (current_sync_height) + // We don't have access to current_sync_height here, so we'll log both the batch + // start and a best-effort expected start in the caller. For this analysis log, + // avoid placeholder labels and prefer concrete values when known. + let prev_height = start_height.saturating_sub(1); + match start_hash_opt { + Some(h) => { + tracing::debug!( + "CFHeaders batch analysis: batch_start_hash={}, msg_prev_filter_header={}, msg_prev_height={}, stop_hash={}, stop_height={}, start_height={}, count={}, header_tip_height={}", + h, + cf_headers.previous_filter_header, + prev_height, + cf_headers.stop_hash, + stop_height, + start_height, + cf_headers.filter_hashes.len(), + header_tip_height + ); + } + None => { + tracing::debug!( + "CFHeaders batch analysis: batch_start_hash=, msg_prev_filter_header={}, msg_prev_height={}, stop_hash={}, stop_height={}, start_height={}, count={}, header_tip_height={}", + cf_headers.previous_filter_header, + prev_height, + cf_headers.stop_hash, + stop_height, + start_height, + cf_headers.filter_hashes.len(), + header_tip_height + ); + } + } Ok((start_height, stop_height, header_tip_height)) } @@ -185,24 +217,38 @@ impl u32 { + /// Convert absolute blockchain height to block header storage index. + /// Storage indexing is base-inclusive: at checkpoint base B, storage index 0 == absolute height B. + fn header_abs_to_storage_index(&self, height: u32) -> Option { + if self.sync_base_height > 0 { + height.checked_sub(self.sync_base_height) + } else { + Some(height) + } + } + + /// Convert block header storage index to absolute blockchain height. + /// Storage indexing is base-inclusive: at checkpoint base B, absolute height == B + index. + fn header_storage_to_abs_height(&self, index: u32) -> u32 { if self.sync_base_height > 0 { - blockchain_height.saturating_sub(self.sync_base_height) + self.sync_base_height + index } else { - blockchain_height + index } } - /// Convert storage height to blockchain height for checkpoint-based sync - fn storage_to_blockchain_height(&self, storage_height: u32) -> u32 { + /// Convert absolute blockchain height to filter header storage index. + /// Storage indexing is base-inclusive for filter headers as well. + fn filter_abs_to_storage_index(&self, height: u32) -> Option { if self.sync_base_height > 0 { - self.sync_base_height + storage_height + height.checked_sub(self.sync_base_height) } else { - storage_height + Some(height) } } + // Note: previously had filter_storage_to_abs_height, but it was unused and removed for clarity. + /// Enable flow control for filter downloads. pub fn enable_flow_control(&mut self) { self.flow_control_enabled = true; @@ -245,33 +291,122 @@ impl storage.get_header(idx).await.ok().flatten().map(|h| h.block_hash()), + None => None, + }; + + // Resolve expected start hash (what we asked for), for clarity + let expected_start_hash_opt = + match self.header_abs_to_storage_index(self.current_sync_height) { + Some(idx) => storage.get_header(idx).await.ok().flatten().map(|h| h.block_hash()), + None => None, + }; + + let prev_height = batch_start_height.saturating_sub(1); + let effective_prev_height = self.current_sync_height.saturating_sub(1); + match (recv_start_hash_opt, expected_start_hash_opt) { + (Some(batch_hash), Some(expected_hash)) => { + tracing::debug!( + "Received CFHeaders batch: batch_start={} (hash={}), msg_prev_header={} at {}, expected_start={} (hash={}), effective_prev_height={}, stop={}, count={}", + batch_start_height, + batch_hash, + cf_headers.previous_filter_header, + prev_height, + self.current_sync_height, + expected_hash, + effective_prev_height, + stop_height, + cf_headers.filter_hashes.len() + ); + } + (None, Some(expected_hash)) => { + tracing::debug!( + "Received CFHeaders batch: batch_start={} (hash=), msg_prev_header={} at {}, expected_start={} (hash={}), effective_prev_height={}, stop={}, count={}", + batch_start_height, + cf_headers.previous_filter_header, + prev_height, + self.current_sync_height, + expected_hash, + effective_prev_height, + stop_height, + cf_headers.filter_hashes.len() + ); + } + (Some(batch_hash), None) => { + tracing::debug!( + "Received CFHeaders batch: batch_start={} (hash={}), msg_prev_header={} at {}, expected_start={} (hash=), effective_prev_height={}, stop={}, count={}", + batch_start_height, + batch_hash, + cf_headers.previous_filter_header, + prev_height, + self.current_sync_height, + effective_prev_height, + stop_height, + cf_headers.filter_hashes.len() + ); + } + (None, None) => { + tracing::debug!( + "Received CFHeaders batch: batch_start={} (hash=), msg_prev_header={} at {}, expected_start={} (hash=), effective_prev_height={}, stop={}, count={}", + batch_start_height, + cf_headers.previous_filter_header, + prev_height, + self.current_sync_height, + effective_prev_height, + stop_height, + cf_headers.filter_hashes.len() + ); + } + } // Check if this is the expected batch or if there's overlap if batch_start_height < self.current_sync_height { + // Special-case benign overlaps around checkpoint boundaries; log at debug level + let benign_checkpoint_overlap = self.sync_base_height > 0 + && ((batch_start_height + 1 == self.sync_base_height + && self.current_sync_height == self.sync_base_height) + || (batch_start_height == self.sync_base_height + && self.current_sync_height == self.sync_base_height + 1)); + // Try to include the peer address for diagnostics let peer_addr = network.get_last_message_peer_addr().await; - match peer_addr { - Some(addr) => { - tracing::warn!( - "📋 Received overlapping filter headers from {}: expected start={}, received start={} (likely from recovery/retry)", - addr, - self.current_sync_height, - batch_start_height - ); + if benign_checkpoint_overlap { + match peer_addr { + Some(addr) => { + tracing::debug!( + "📋 Benign checkpoint overlap from {}: expected start={}, received start={}", + addr, + self.current_sync_height, + batch_start_height + ); + } + None => { + tracing::debug!( + "📋 Benign checkpoint overlap: expected start={}, received start={}", + self.current_sync_height, + batch_start_height + ); + } } - None => { - tracing::warn!( - "📋 Received overlapping filter headers: expected start={}, received start={} (likely from recovery/retry)", - self.current_sync_height, - batch_start_height - ); + } else { + match peer_addr { + Some(addr) => { + tracing::warn!( + "📋 Received overlapping filter headers from {}: expected start={}, received start={} (likely from recovery/retry)", + addr, + self.current_sync_height, + batch_start_height + ); + } + None => { + tracing::warn!( + "📋 Received overlapping filter headers: expected start={}, received start={} (likely from recovery/retry)", + self.current_sync_height, + batch_start_height + ); + } } } @@ -363,8 +498,14 @@ impl header.block_hash(), Ok(None) => { @@ -380,8 +521,11 @@ impl= min_height && found_header_info.is_none() { - let scan_storage_height = - self.blockchain_to_storage_height(scan_height); + let Some(scan_storage_height) = + self.header_abs_to_storage_index(scan_height) + else { + break; + }; match storage.get_header(scan_storage_height).await { Ok(Some(header)) => { tracing::info!( @@ -463,8 +607,14 @@ impl header.block_hash(), Ok(None) if header_tip_height > 0 => { @@ -474,8 +624,15 @@ impl header.block_hash(), Ok(None) => { @@ -581,7 +741,10 @@ impl= min_height && found_recovery_info.is_none() { let scan_storage_height = - self.blockchain_to_storage_height(scan_height); + match self.header_abs_to_storage_index(scan_height) { + Some(v) => v, + None => break, + }; if let Ok(Some(header)) = storage.get_header(scan_storage_height).await { tracing::info!( @@ -639,18 +802,18 @@ impl header.block_hash(), - Ok(None) if storage_tip_height > 0 => { + Ok(None) if storage_tip_index > 0 => { tracing::debug!( "Tip header not found at storage height {} (blockchain height {}) during recovery, trying previous header", - storage_tip_height, + storage_tip_index, header_tip_height ); // Try previous header when at chain tip storage - .get_header(storage_tip_height - 1) + .get_header(storage_tip_index - 1) .await .map_err(|e| { SyncError::Storage(format!( @@ -730,18 +893,18 @@ impl 0 && current_filter_height < self.sync_base_height { - // Start from checkpoint base + 1, not from 1 tracing::info!( "Starting filter sync from checkpoint base {} (current filter height: {})", - self.sync_base_height + 1, + self.sync_base_height, current_filter_height ); - self.sync_base_height + 1 + self.sync_base_height } else { current_filter_height + 1 }; @@ -787,21 +951,21 @@ impl 0 { + let stop_hash = if storage_tip_index > 0 { // Use storage_tip_height directly since get_header expects storage height storage - .get_header(storage_tip_height) + .get_header(storage_tip_index) .await .map_err(|e| { SyncError::Storage(format!( "Failed to get stop header at storage height {} (blockchain height {}): {}", - storage_tip_height, header_tip_height, e + storage_tip_index, header_tip_height, e )) })? .ok_or_else(|| { SyncError::Storage(format!( "Stop header not found at storage height {} (blockchain height {})", - storage_tip_height, header_tip_height + storage_tip_index, header_tip_height )) })? .block_hash() @@ -825,7 +989,9 @@ impl storage height {}", batch_end_height, @@ -854,7 +1020,11 @@ impl= min_height && found_header.is_none() { - let scan_storage_height = self.blockchain_to_storage_height(scan_height); + let Some(scan_storage_height) = + self.header_abs_to_storage_index(scan_height) + else { + break; + }; match storage.get_header(scan_storage_height).await { Ok(Some(header)) => { tracing::info!( @@ -891,12 +1061,15 @@ impl { // If we can't find any headers in the batch range, something is wrong // Don't fall back to tip as that would create an oversized request + let start_idx = + self.header_abs_to_storage_index(self.current_sync_height); + let end_idx = self.header_abs_to_storage_index(batch_end_height); return Err(SyncError::Storage(format!( - "No headers found in batch range {} to {} (storage range {} to {})", + "No headers found in batch range {} to {} (header storage idx {:?} to {:?})", self.current_sync_height, batch_end_height, - self.blockchain_to_storage_height(self.current_sync_height), - self.blockchain_to_storage_height(batch_end_height) + start_idx, + end_idx ))); } } @@ -935,11 +1108,12 @@ impl height, None => { - // No connection found - check if this is overlapping data we can safely ignore - let overlap_end = expected_start_height.saturating_sub(1); - if batch_start_height <= overlap_end && overlap_end <= current_filter_tip { - tracing::warn!( - "📋 Ignoring overlapping headers from different peer view (range {}-{})", - batch_start_height, - stop_height + // Special-case: checkpoint overlap where peer starts at checkpoint height + // and we expect to start at checkpoint+1. We don't store the checkpoint's + // filter header in storage, but CFHeaders provides previous_filter_header + // for (checkpoint-1), allowing us to compute from checkpoint onward and skip one. + if self.sync_base_height > 0 + && ( + // Case A: peer starts at checkpoint, we expect checkpoint+1 + (batch_start_height == self.sync_base_height + && expected_start_height == self.sync_base_height + 1) + || + // Case B: peer starts one before checkpoint, we expect checkpoint + (batch_start_height + 1 == self.sync_base_height + && expected_start_height == self.sync_base_height) + ) + { + tracing::debug!( + "Overlap at checkpoint: synthesizing connection at height {}", + self.sync_base_height - 1 ); - return Ok((0, expected_start_height)); + self.sync_base_height - 1 } else { - return Err(SyncError::Validation( - "Cannot find connection point for overlapping headers".to_string(), - )); + // No connection found - check if this is overlapping data we can safely ignore + let overlap_end = expected_start_height.saturating_sub(1); + if batch_start_height <= overlap_end && overlap_end <= current_filter_tip { + tracing::warn!( + "📋 Ignoring overlapping headers from different peer view (range {}-{})", + batch_start_height, + stop_height + ); + return Ok((0, expected_start_height)); + } else { + return Err(SyncError::Validation( + "Cannot find connection point for overlapping headers".to_string(), + )); + } } } }; @@ -1123,11 +1319,14 @@ impl 0 && start_height == self.sync_base_height + 1) + || (self.sync_base_height > 0 + && (start_height == self.sync_base_height + || start_height == self.sync_base_height + 1)) { tracing::debug!( "Skipping filter header chain verification for first batch (start_height={}, sync_base_height={})", @@ -1240,12 +1439,17 @@ impl storage + .get_header(idx) + .await + .map_err(|e| SyncError::Storage(format!("Failed to get stop header: {}", e)))? + .ok_or_else(|| SyncError::Storage("Stop header not found".to_string()))? + .block_hash(), + None => { + return Err(SyncError::Storage("batch_end below checkpoint base".to_string())) + } + }; self.request_filters(network, current_height, stop_hash).await?; @@ -1371,7 +1575,12 @@ impl { let stop_hash = header.block_hash(); @@ -1809,12 +2023,14 @@ impl SyncResult<()> { // Find the end height for the stop hash - let header_tip_height = storage + let header_tip_index = storage .get_tip_height() .await .map_err(|e| SyncError::Storage(format!("Failed to get header tip height: {}", e)))? .unwrap_or(0); + let header_tip_height = self.header_storage_to_abs_height(header_tip_index); + let end_height = self .find_height_for_block_hash(&stop_hash, storage, start_height, header_tip_height) .await? @@ -1849,13 +2065,16 @@ impl SyncResult> { - // Use the efficient reverse index first - if let Some(height) = storage.get_header_height_by_hash(block_hash).await.map_err(|e| { - SyncError::Storage(format!("Failed to get header height by hash: {}", e)) - })? { - // Check if the height is within the requested range - if height >= start_height && height <= end_height { - return Ok(Some(height)); + // Use the efficient reverse index first. + // Contract: StorageManager::get_header_height_by_hash returns ABSOLUTE blockchain height. + if let Some(abs_height) = + storage.get_header_height_by_hash(block_hash).await.map_err(|e| { + SyncError::Storage(format!("Failed to get header height by hash: {}", e)) + })? + { + // Check if the absolute height is within the requested range + if abs_height >= start_height && abs_height <= end_height { + return Ok(Some(abs_height)); } } Ok(None) @@ -3132,7 +3351,12 @@ impl { let stop_hash = header.block_hash(); @@ -3168,7 +3392,13 @@ impl { let batch_stop_hash = batch_header.block_hash(); diff --git a/dash-spv/src/types.rs b/dash-spv/src/types.rs index 27d0f9401..7fad00d7a 100644 --- a/dash-spv/src/types.rs +++ b/dash-spv/src/types.rs @@ -785,6 +785,23 @@ pub enum SpvEvent { percentage: f64, }, + /// Filter headers progress update. + /// + /// Carries absolute blockchain heights for both the current filter header tip + /// and the current block header tip, along with a convenience percentage + /// (filter_header_height / header_height * 100), clamped to [0, 100]. + /// + /// Consumers who sync from a checkpoint may prefer to recompute a + /// checkpoint-relative percentage using their base height. + FilterHeadersProgress { + /// Current absolute height of synchronized filter headers. + filter_header_height: u32, + /// Current absolute height of synchronized block headers. + header_height: u32, + /// Convenience percentage in [0, 100]. + percentage: f64, + }, + /// ChainLock received and validated. ChainLockReceived { /// Block height of the ChainLock.