Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 15 additions & 94 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ impl<
// Create sync manager
let received_filter_heights = stats.read().await.received_filter_heights.clone();
tracing::info!("Creating sequential sync manager");
let sync_manager =
SequentialSyncManager::new(&config, received_filter_heights, wallet.clone())
.map_err(SpvError::Sync)?;
let sync_manager = SequentialSyncManager::new(
&config,
received_filter_heights,
wallet.clone(),
state.clone(),
)
.map_err(SpvError::Sync)?;

// Create validation manager
let validation = ValidationManager::new(config.validation_mode);
Expand Down Expand Up @@ -343,20 +347,6 @@ impl<
match loaded_count {
Ok(loaded_count) => {
tracing::info!("✅ Sync manager loaded {} headers from storage", loaded_count);

// IMPORTANT: Also load headers into the client's ChainState for normal sync
// This is needed because the status display reads from the client's ChainState
let state = self.state.read().await;
let is_normal_sync = !state.synced_from_checkpoint;
drop(state); // Release the lock before loading headers

if is_normal_sync && loaded_count > 0 {
tracing::info!("Loading headers into client ChainState for normal sync...");
if let Err(e) = self.load_headers_into_client_state(tip_height).await {
tracing::error!("Failed to load headers into client ChainState: {}", e);
// This is not critical for normal sync, continue anyway
}
}
}
Err(e) => {
tracing::error!("Failed to load headers into sync manager: {}", e);
Expand Down Expand Up @@ -1940,79 +1930,6 @@ impl<
Ok(true)
}

/// Load headers from storage into the client's ChainState.
/// This is used during normal sync to ensure the status display shows correct header count.
async fn load_headers_into_client_state(&mut self, tip_height: u32) -> Result<()> {
if tip_height == 0 {
return Ok(());
}

tracing::debug!("Loading {} headers from storage into client ChainState", tip_height);
let start_time = Instant::now();

// Load headers in batches to avoid memory spikes
const BATCH_SIZE: u32 = 10_000;
let mut loaded_count = 0u32;

// Start from height 1 (genesis is already in ChainState)
let mut current_height = 1u32;

while current_height <= tip_height {
let end_height = (current_height + BATCH_SIZE - 1).min(tip_height);

// Load batch of headers from storage
let headers = {
let storage = self.storage.lock().await;
storage
.load_headers(current_height..end_height + 1)
.await
.map_err(SpvError::Storage)?
};

if headers.is_empty() {
tracing::warn!(
"No headers found for range {}..{} - storage may be incomplete",
current_height,
end_height + 1
);
break;
}

// Add headers to client's chain state
{
let mut state = self.state.write().await;
for header in headers {
state.add_header(header);
loaded_count += 1;
}
}

// Progress logging for large header counts
if loaded_count % 50_000 == 0 || loaded_count == tip_height {
let elapsed = start_time.elapsed();
let headers_per_sec = loaded_count as f64 / elapsed.as_secs_f64();
tracing::debug!(
"Loaded {}/{} headers into client ChainState ({:.0} headers/sec)",
loaded_count,
tip_height,
headers_per_sec
);
}

current_height = end_height + 1;
}

let elapsed = start_time.elapsed();
tracing::info!(
"✅ Loaded {} headers into client ChainState in {:.2}s ({:.0} headers/sec)",
loaded_count,
elapsed.as_secs_f64(),
loaded_count as f64 / elapsed.as_secs_f64()
);

Ok(())
}

/// Rollback chain state to a specific height.
async fn rollback_to_height(&mut self, target_height: u32) -> Result<()> {
tracing::info!("Rolling back chain state to height {}", target_height);
Expand Down Expand Up @@ -2255,9 +2172,9 @@ impl<
self.config.network,
);

// Clone the chain state for storage and sync manager
// Clone the chain state for storage
let chain_state_for_storage = (*chain_state).clone();
let checkpoint_chain_state = (*chain_state).clone();
let headers_len = chain_state_for_storage.headers.len() as u32;
drop(chain_state);

// Update storage with chain state including sync_base_height
Expand All @@ -2278,8 +2195,12 @@ impl<
checkpoint.height
);

// Update the sync manager's chain state with the checkpoint-initialized state
self.sync_manager.set_chain_state(checkpoint_chain_state);
// Update the sync manager's cached flags from the checkpoint-initialized state
self.sync_manager.update_chain_state_cache(
true,
checkpoint.height,
headers_len,
);
tracing::info!(
"Updated sync manager with checkpoint-initialized chain state"
);
Expand Down
Loading
Loading