Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 94 additions & 24 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::network::NetworkManager;
use crate::storage::StorageManager;
use crate::sync::sequential::SequentialSyncManager;
use crate::types::{MempoolState, SpvEvent, SpvStats};
use dashcore::bip158::{BlockFilter, BlockFilterWriter};
use key_wallet_manager::wallet_interface::WalletInterface;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -238,10 +239,94 @@ impl<
block.txdata.len()
);

// Process new block (update state, check watched items)
if let Err(e) = self.process_new_block(block).await {
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
return Err(e);
// 1) Ensure header processing and chain tip update for this block
// Route the header through the sequential sync manager as a Headers message
let header = block.header;
let headers_msg = NetworkMessage::Headers(vec![header]);
if let Err(e) = self
.sync_manager
.handle_message(headers_msg, &mut *self.network, &mut *self.storage)
.await
{
tracing::error!(
"❌ Failed to process header for block {} via sync manager: {}",
block_hash,
e
);
return Err(SpvError::Sync(e));
}

// 2) Locally construct a basic compact filter from block outputs only
// and check relevance via the wallet through the block processor
let mut filter_bytes = Vec::new();
{
let mut writer = BlockFilterWriter::new(&mut filter_bytes, &block);
writer.add_output_scripts();
if let Err(e) = writer.finish() {
tracing::warn!(
"Failed to finalize locally constructed compact filter for block {}: {}. Proceeding to process block.",
block_hash,
e
);
}
}

// Send the constructed filter to the block processor to check with wallet
let (cf_tx, cf_rx) = tokio::sync::oneshot::channel();
let cf_task = crate::client::BlockProcessingTask::ProcessCompactFilter {
filter: BlockFilter::new(&filter_bytes),
block_hash,
response_tx: cf_tx,
};
if let Err(e) = self.block_processor_tx.send(cf_task) {
tracing::warn!(
"Failed to send local compact filter for block {} to processor: {}. Proceeding to process block.",
block_hash,
e
);
// Fall through to processing the block anyway
}

let should_process_block = match cf_rx.await {
Ok(Ok(matches)) => {
if matches {
tracing::info!(
"🎯 Locally constructed compact filter matched for block {}",
block_hash
);
true
} else {
tracing::info!(
"🚫 Local compact filter did not match for block {} - skipping block processing",
block_hash
);
false
}
}
Ok(Err(e)) => {
tracing::warn!(
"Compact filter check errored for block {}: {}. Proceeding to process block.",
block_hash,
e
);
true
}
Err(_) => {
// If the receiver is dropped or the worker didn't respond, proceed
tracing::warn!(
"Compact filter check channel dropped for block {}. Proceeding to process block.",
block_hash
);
true
}
};

if should_process_block {
// 3) Process new block (update state, check wallet)
if let Err(e) = self.process_new_block(block).await {
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
return Err(e);
}
}
}
NetworkMessage::Inv(inv) => {
Expand Down Expand Up @@ -434,31 +519,16 @@ impl<
self.network.send_message(getdata).await.map_err(SpvError::Network)?;
}

// Process new blocks immediately when detected
// For blocks announced via inventory during tip sync, request full blocks for privacy
if !blocks_to_request.is_empty() {
tracing::info!(
"🔄 Processing {} new block announcements to stay synchronized",
"📥 Requesting {} new blocks announced via inventory",
blocks_to_request.len()
);

// Extract block hashes
let block_hashes: Vec<dashcore::BlockHash> = blocks_to_request
.iter()
.filter_map(|inv| {
if let Inventory::Block(hash) = inv {
Some(*hash)
} else {
None
}
})
.collect();

// Process each new block
for block_hash in block_hashes {
tracing::info!("📥 Requesting header for new block {}", block_hash);
if let Err(e) = self.process_new_block_hash(block_hash).await {
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
}
let getdata = NetworkMessage::GetData(blocks_to_request);
if let Err(e) = self.network.send_message(getdata).await {
tracing::error!("Failed to request announced blocks: {}", e);
}
}

Expand Down
31 changes: 17 additions & 14 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,14 +880,12 @@ impl<

// Emit detailed progress update
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
// Storage tip is the headers vector index (0-based).
let current_storage_tip = {
// Storage tip now represents the absolute blockchain height.
let current_tip_height = {
let storage = self.storage.lock().await;
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
};
// Convert to absolute blockchain height: base + storage_tip
let sync_base_height = { self.state.read().await.sync_base_height };
let current_height = sync_base_height + current_storage_tip;
let current_height = current_tip_height;
let peer_best = self
.network
.get_peer_best_height()
Expand All @@ -897,9 +895,9 @@ impl<
.unwrap_or(current_height);

// Calculate headers downloaded this second
if current_storage_tip > last_height {
headers_this_second = current_storage_tip - last_height;
last_height = current_storage_tip;
if current_tip_height > last_height {
headers_this_second = current_tip_height - last_height;
last_height = current_tip_height;
}

let headers_per_second = headers_this_second as f64;
Expand Down Expand Up @@ -956,7 +954,7 @@ impl<
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
let filter_tip =
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
(storage_tip, filter_tip)
};
if abs_header_height != last_emitted_header_height
|| filter_header_height != last_emitted_filter_header_height
Expand Down Expand Up @@ -1770,8 +1768,13 @@ impl<
let mut loaded_count = 0u32;
let target_height = saved_state.chain_tip.height;

// Start from height 1 (genesis is already in ChainState)
let mut current_height = 1u32;
// Determine first height to load. Skip genesis (already present) unless we started from a checkpoint base.
let mut current_height =
if saved_state.synced_from_checkpoint && saved_state.sync_base_height > 0 {
saved_state.sync_base_height
} else {
1u32
};

while current_height <= target_height {
let end_height = (current_height + BATCH_SIZE - 1).min(target_height);
Expand All @@ -1786,12 +1789,12 @@ impl<
};

if headers.is_empty() {
tracing::error!(
"Failed to load headers for range {}..{} - storage may be corrupted",
tracing::warn!(
"No headers found for range {}..{} when restoring from state",
current_height,
end_height + 1
);
return Ok(false);
break;
}

// Validate headers before adding to chain state
Expand Down
8 changes: 4 additions & 4 deletions dash-spv/src/client/status_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
// For checkpoint sync: height = checkpoint_height + storage_count
let storage = self.storage.lock().await;
if let Ok(Some(storage_tip)) = storage.get_tip_height().await {
let blockchain_height = state.sync_base_height + storage_tip;
let blockchain_height = storage_tip;
if with_logging {
tracing::debug!(
"Status display: storage_tip={}, sync_base={}, blockchain_height={}",
storage_tip,
"Status display: reported tip height={}, sync_base={}, raw_storage_tip={}",
blockchain_height,
state.sync_base_height,
blockchain_height
storage_tip
);
}
blockchain_height
Expand Down
Loading
Loading