Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::network::NetworkManager;
use crate::storage::StorageManager;
use crate::sync::sequential::SequentialSyncManager;
use crate::types::{MempoolState, SpvEvent, SpvStats};
// Removed local ad-hoc compact filter construction in favor of always processing full blocks
use key_wallet_manager::wallet_interface::WalletInterface;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -238,7 +239,23 @@ impl<
block.txdata.len()
);

// Process new block (update state, check watched items)
// 1) Ensure header processing and chain tip update for this block
// Route the header through the sequential sync manager as a Headers message
let headers_msg = NetworkMessage::Headers(vec![block.header]);
if let Err(e) = self
.sync_manager
.handle_message(headers_msg, &mut *self.network, &mut *self.storage)
.await
{
tracing::error!(
"❌ Failed to process header for block {} via sync manager: {}",
block_hash,
e
);
return Err(SpvError::Sync(e));
}

// 2) Always process the full block (privacy and correctness)
if let Err(e) = self.process_new_block(block).await {
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
return Err(e);
Expand Down Expand Up @@ -434,31 +451,16 @@ impl<
self.network.send_message(getdata).await.map_err(SpvError::Network)?;
}

// Process new blocks immediately when detected
// For blocks announced via inventory during tip sync, request full blocks for privacy
if !blocks_to_request.is_empty() {
tracing::info!(
"🔄 Processing {} new block announcements to stay synchronized",
"📥 Requesting {} new blocks announced via inventory",
blocks_to_request.len()
);

// Extract block hashes
let block_hashes: Vec<dashcore::BlockHash> = blocks_to_request
.iter()
.filter_map(|inv| {
if let Inventory::Block(hash) = inv {
Some(*hash)
} else {
None
}
})
.collect();

// Process each new block
for block_hash in block_hashes {
tracing::info!("📥 Requesting header for new block {}", block_hash);
if let Err(e) = self.process_new_block_hash(block_hash).await {
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
}
let getdata = NetworkMessage::GetData(blocks_to_request);
if let Err(e) = self.network.send_message(getdata).await {
tracing::error!("Failed to request announced blocks: {}", e);
}
}

Expand Down
31 changes: 17 additions & 14 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,14 +880,12 @@ impl<

// Emit detailed progress update
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
// Storage tip is the headers vector index (0-based).
let current_storage_tip = {
// Storage tip now represents the absolute blockchain height.
let current_tip_height = {
let storage = self.storage.lock().await;
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
};
// Convert to absolute blockchain height: base + storage_tip
let sync_base_height = { self.state.read().await.sync_base_height };
let current_height = sync_base_height + current_storage_tip;
let current_height = current_tip_height;
let peer_best = self
.network
.get_peer_best_height()
Expand All @@ -897,9 +895,9 @@ impl<
.unwrap_or(current_height);

// Calculate headers downloaded this second
if current_storage_tip > last_height {
headers_this_second = current_storage_tip - last_height;
last_height = current_storage_tip;
if current_tip_height > last_height {
headers_this_second = current_tip_height - last_height;
last_height = current_tip_height;
}

let headers_per_second = headers_this_second as f64;
Expand Down Expand Up @@ -956,7 +954,7 @@ impl<
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
let filter_tip =
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
(storage_tip, filter_tip)
};
if abs_header_height != last_emitted_header_height
|| filter_header_height != last_emitted_filter_header_height
Expand Down Expand Up @@ -1770,8 +1768,13 @@ impl<
let mut loaded_count = 0u32;
let target_height = saved_state.chain_tip.height;

// Start from height 1 (genesis is already in ChainState)
let mut current_height = 1u32;
// Determine first height to load. Skip genesis (already present) unless we started from a checkpoint base.
let mut current_height =
if saved_state.synced_from_checkpoint && saved_state.sync_base_height > 0 {
saved_state.sync_base_height
} else {
1u32
};

while current_height <= target_height {
let end_height = (current_height + BATCH_SIZE - 1).min(target_height);
Expand All @@ -1786,12 +1789,12 @@ impl<
};

if headers.is_empty() {
tracing::error!(
"Failed to load headers for range {}..{} - storage may be corrupted",
tracing::warn!(
"No headers found for range {}..{} when restoring from state",
current_height,
end_height + 1
);
return Ok(false);
break;
}

// Validate headers before adding to chain state
Expand Down
8 changes: 4 additions & 4 deletions dash-spv/src/client/status_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
// For checkpoint sync: height = checkpoint_height + storage_count
let storage = self.storage.lock().await;
if let Ok(Some(storage_tip)) = storage.get_tip_height().await {
let blockchain_height = state.sync_base_height + storage_tip;
let blockchain_height = storage_tip;
if with_logging {
tracing::debug!(
"Status display: storage_tip={}, sync_base={}, blockchain_height={}",
storage_tip,
"Status display: reported tip height={}, sync_base={}, raw_storage_tip={}",
blockchain_height,
state.sync_base_height,
blockchain_height
storage_tip
);
}
blockchain_height
Expand Down
116 changes: 82 additions & 34 deletions dash-spv/src/storage/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,25 +1159,36 @@ impl StorageManager for DiskStorageManager {
async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>> {
let mut headers = Vec::new();

let start_segment = Self::get_segment_id(range.start);
let end_segment = Self::get_segment_id(range.end.saturating_sub(1));
// Convert blockchain height range to storage index range using sync_base_height
let sync_base_height = *self.sync_base_height.read().await;
let storage_start = if sync_base_height > 0 && range.start >= sync_base_height {
range.start - sync_base_height
} else {
range.start
};

let storage_end = if sync_base_height > 0 && range.end > sync_base_height {
range.end - sync_base_height
} else {
range.end
};

let start_segment = Self::get_segment_id(storage_start);
let end_segment = Self::get_segment_id(storage_end.saturating_sub(1));

for segment_id in start_segment..=end_segment {
self.ensure_segment_loaded(segment_id).await?;

let segments = self.active_segments.read().await;
if let Some(segment) = segments.get(&segment_id) {
let _segment_start_height = segment_id * HEADERS_PER_SEGMENT;
let _segment_end_height = _segment_start_height + segment.headers.len() as u32;

let start_idx = if segment_id == start_segment {
Self::get_segment_offset(range.start)
Self::get_segment_offset(storage_start)
} else {
0
};

let end_idx = if segment_id == end_segment {
Self::get_segment_offset(range.end.saturating_sub(1)) + 1
Self::get_segment_offset(storage_end.saturating_sub(1)) + 1
} else {
segment.headers.len()
};
Expand All @@ -1198,17 +1209,31 @@ impl StorageManager for DiskStorageManager {
}

async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
// TODO: This method currently expects storage-relative heights (0-based from sync_base_height).
// Consider refactoring to accept blockchain heights and handle conversion internally for better UX.
// Accept blockchain (absolute) height and convert to storage index using sync_base_height.
let sync_base_height = *self.sync_base_height.read().await;

// First check if this height is within our known range
let tip_height = self.cached_tip_height.read().await;
if let Some(tip) = *tip_height {
if height > tip {
// Convert absolute height to storage index (base-inclusive mapping)
let storage_index = if sync_base_height > 0 {
if height >= sync_base_height {
height - sync_base_height
} else {
// If caller passes a small value (likely a pre-conversion storage index), use it directly
height
}
} else {
height
};

// First check if this storage index is within our known range
let tip_index_opt = *self.cached_tip_height.read().await;
if let Some(tip_index) = tip_index_opt {
if storage_index > tip_index {
tracing::trace!(
"Requested header at height {} is beyond tip height {}",
"Requested header at storage index {} is beyond tip index {} (abs height {} base {})",
storage_index,
tip_index,
height,
tip
sync_base_height
);
return Ok(None);
}
Expand All @@ -1217,8 +1242,8 @@ impl StorageManager for DiskStorageManager {
return Ok(None);
}

let segment_id = Self::get_segment_id(height);
let offset = Self::get_segment_offset(height);
let segment_id = Self::get_segment_id(storage_index);
let offset = Self::get_segment_offset(storage_index);

self.ensure_segment_loaded(segment_id).await?;

Expand All @@ -1235,18 +1260,30 @@ impl StorageManager for DiskStorageManager {

if header.is_none() {
tracing::debug!(
"Header not found at height {} (segment: {}, offset: {})",
height,
"Header not found at storage index {} (segment: {}, offset: {}, abs height {}, base {})",
storage_index,
segment_id,
offset
offset,
height,
sync_base_height
);
}

Ok(header)
}

async fn get_tip_height(&self) -> StorageResult<Option<u32>> {
Ok(*self.cached_tip_height.read().await)
let tip_index_opt = *self.cached_tip_height.read().await;
if let Some(tip_index) = tip_index_opt {
let base = *self.sync_base_height.read().await;
if base > 0 {
Ok(Some(base + tip_index))
} else {
Ok(Some(tip_index))
}
} else {
Ok(None)
}
}

async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
Expand Down Expand Up @@ -1487,7 +1524,12 @@ impl StorageManager for DiskStorageManager {

// Load all headers
if let Some(tip_height) = self.get_tip_height().await? {
state.headers = self.load_headers(0..tip_height + 1).await?;
let range_start = if state.synced_from_checkpoint && state.sync_base_height > 0 {
state.sync_base_height
} else {
0
};
state.headers = self.load_headers(range_start..tip_height + 1).await?;
}

// Load all filter headers
Expand Down Expand Up @@ -2032,16 +2074,22 @@ mod tests {
// Store headers using checkpoint sync method
storage.store_headers_from_height(&headers, checkpoint_height).await?;

// Verify headers are stored at correct storage indices
// Header at blockchain height 1,100,000 should be at storage index 0
let header_at_0 = storage.get_header(0).await?;
assert!(header_at_0.is_some(), "Header at storage index 0 should exist");
assert_eq!(header_at_0.unwrap(), headers[0]);
// Set sync base height so storage interprets heights as blockchain heights
let mut base_state = ChainState::new();
base_state.sync_base_height = checkpoint_height;
base_state.synced_from_checkpoint = true;
storage.store_chain_state(&base_state).await?;

// Verify headers are stored at correct blockchain heights
// Header at blockchain height 1,100,000 should be retrievable by that height
let header_at_base = storage.get_header(checkpoint_height).await?;
assert!(header_at_base.is_some(), "Header at base blockchain height should exist");
assert_eq!(header_at_base.unwrap(), headers[0]);

// Header at blockchain height 1,100,099 should be at storage index 99
let header_at_99 = storage.get_header(99).await?;
assert!(header_at_99.is_some(), "Header at storage index 99 should exist");
assert_eq!(header_at_99.unwrap(), headers[99]);
// Header at blockchain height 1,100,099 should be retrievable by that height
let header_at_ending = storage.get_header(checkpoint_height + 99).await?;
assert!(header_at_ending.is_some(), "Header at ending blockchain height should exist");
assert_eq!(header_at_ending.unwrap(), headers[99]);

// Test the reverse index (hash -> blockchain height)
let hash_0 = headers[0].block_hash();
Expand Down Expand Up @@ -2081,11 +2129,11 @@ mod tests {
"After index rebuild, hash should still map to blockchain height 1,100,000"
);

// Verify headers can still be retrieved by storage index
let header_after_reload = storage2.get_header(0).await?;
// Verify header can still be retrieved by blockchain height after reload
let header_after_reload = storage2.get_header(checkpoint_height).await?;
assert!(
header_after_reload.is_some(),
"Header at storage index 0 should exist after reload"
"Header at base blockchain height should exist after reload"
);
assert_eq!(header_after_reload.unwrap(), headers[0]);

Expand Down
Loading
Loading