Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dash-spv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tempfile = "3.0"
tokio-test = "0.4"
env_logger = "0.10"
hex = "0.4"
test-case = "3.3"

[[bin]]
name = "dash-spv"
Expand Down
97 changes: 9 additions & 88 deletions dash-spv/src/sync/headers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,116 +102,37 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync

/// Load headers from storage into the chain state
pub async fn load_headers_from_storage(&mut self, storage: &S) -> SyncResult<u32> {
let start_time = std::time::Instant::now();
let mut loaded_count = 0;
let mut tip_height = 0;
// First, try to load the persisted chain state which may contain sync_base_height
if let Ok(Some(stored_chain_state)) = storage.load_chain_state().await {
tracing::info!(
"Loaded chain state from storage with sync_base_height: {}",
stored_chain_state.sync_base_height,
);
// Update our chain state with the loaded one to preserve sync_base_height
// Update our chain state with the loaded one
{
loaded_count = stored_chain_state.headers.len();
tip_height = stored_chain_state.tip_height();
self.cached_sync_base_height = stored_chain_state.sync_base_height;
let mut cs = self.chain_state.write().await;
*cs = stored_chain_state;
}
}

// Get the current tip height from storage
let tip_height = storage
.get_tip_height()
.await
.map_err(|e| SyncError::Storage(format!("Failed to get tip height: {}", e)))?;

let Some(tip_height) = tip_height else {
tracing::debug!("No headers found in storage");
// If we're syncing from a checkpoint, this is expected
if self.is_synced_from_checkpoint() {
tracing::info!("No headers in storage for checkpoint sync - this is expected");
return Ok(0);
}
return Ok(0);
};

if tip_height == 0 && !self.is_synced_from_checkpoint() {
tracing::debug!("Only genesis block in storage");
return Ok(0);
}

tracing::info!("Loading {} headers from storage into HeaderSyncManager", tip_height);
let start_time = std::time::Instant::now();

// Load headers in batches
const BATCH_SIZE: u32 = 10_000;
let mut loaded_count = 0u32;

// For checkpoint syncs we start at the checkpoint base; otherwise we skip genesis (already present).
let mut current_height = self.get_sync_base_height().max(1);

while current_height <= tip_height {
let end_height = (current_height + BATCH_SIZE - 1).min(tip_height);

// Load batch from storage
let headers_result = storage.load_headers(current_height..end_height + 1).await;

match headers_result {
Ok(headers) if !headers.is_empty() => {
// Add headers to chain state
{
let mut cs = self.chain_state.write().await;
for header in headers {
cs.add_header(header);
loaded_count += 1;
}
}
}
Ok(_) => {
// Empty headers - this can happen for checkpoint sync with minimal headers
tracing::debug!(
"No headers found for range {}..{} - continuing",
current_height,
end_height + 1
);
// Break out of the loop since we've reached the end of available headers
break;
}
Err(e) => {
// For checkpoint sync with only 1 header stored, this is expected
if self.is_synced_from_checkpoint() && loaded_count == 0 && tip_height == 0 {
tracing::info!(
"No additional headers to load for checkpoint sync - this is expected"
);
return Ok(0);
}
return Err(SyncError::Storage(format!("Failed to load headers: {}", e)));
}
}

// Progress logging
if loaded_count.is_multiple_of(50_000) || loaded_count == tip_height {
let elapsed = start_time.elapsed();
let headers_per_sec = loaded_count as f64 / elapsed.as_secs_f64();
tracing::info!(
"Loaded {}/{} headers ({:.0} headers/sec)",
loaded_count,
tip_height,
headers_per_sec
);
}

current_height = end_height + 1;
}

self.total_headers_synced = tip_height;

let elapsed = start_time.elapsed();
tracing::info!(
"✅ Loaded {} headers into HeaderSyncManager in {:.2}s ({:.0} headers/sec)",
"✅ Loaded {} headers for tip height {} into HeaderSyncManager in {:.2}s ({:.0} headers/sec)",
loaded_count,
tip_height,
elapsed.as_secs_f64(),
loaded_count as f64 / elapsed.as_secs_f64()
);

Ok(loaded_count)
Ok(loaded_count as u32)
}

/// Handle a Headers message
Expand Down
47 changes: 46 additions & 1 deletion dash-spv/tests/header_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;
use dash_spv::{
client::{ClientConfig, DashSpvClient},
network::PeerNetworkManager,
storage::{MemoryStorageManager, StorageManager},
storage::{DiskStorageManager, MemoryStorageManager, StorageManager},
sync::{HeaderSyncManager, ReorgConfig},
types::{ChainState, ValidationMode},
};
use dashcore::{block::Header as BlockHeader, block::Version, Network};
Expand All @@ -14,6 +15,8 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
use key_wallet_manager::wallet_manager::WalletManager;
use log::{debug, info};
use std::sync::Arc;
use tempfile::TempDir;
use test_case::test_case;
use tokio::sync::RwLock;

#[tokio::test]
Expand Down Expand Up @@ -332,3 +335,45 @@ async fn test_header_storage_consistency() {

info!("Header storage consistency test completed");
}

#[test_case(0, 0 ; "genesis_0_blocks")]
#[test_case(0, 1 ; "genesis_1_block")]
#[test_case(0, 60000 ; "genesis_60000_blocks")]
#[test_case(100, 0 ; "checkpoint_0_blocks")]
#[test_case(170000, 1 ; "checkpoint_1_block")]
#[test_case(12345, 60000 ; "checkpoint_60000_blocks")]
#[tokio::test]
async fn test_load_headers_from_storage(sync_base_height: u32, header_count: usize) {
// Setup: Create storage with 100 headers
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let mut storage = DiskStorageManager::new(temp_dir.path().to_path_buf())
.await
.expect("Failed to create storage");

let test_headers = create_test_header_chain(header_count);

// Store chain state
let mut chain_state = ChainState::new_for_network(Network::Dash);
chain_state.sync_base_height = sync_base_height;
chain_state.headers = test_headers.clone();
storage.store_chain_state(&chain_state).await.expect("Failed to store chain state");

// Create HeaderSyncManager and load headers
let config = ClientConfig::new(Network::Dash);
let chain_state = Arc::new(RwLock::new(ChainState::new_for_network(Network::Dash)));
let mut header_sync = HeaderSyncManager::<DiskStorageManager, PeerNetworkManager>::new(
&config,
ReorgConfig::default(),
chain_state.clone(),
)
.expect("Failed to create HeaderSyncManager");

// Load headers from storage
let loaded_count =
header_sync.load_headers_from_storage(&storage).await.expect("Failed to load headers");

let cs = chain_state.read().await;

assert_eq!(loaded_count as usize, header_count, "Loaded count mismatch");
assert_eq!(header_count, cs.headers.len(), "Chain state count mismatch");
}