Skip to content

Commit 927453a

Browse files
committed
fix: Rework sync state validation
1 parent ff09ef2 commit 927453a

File tree

5 files changed

+128
-347
lines changed

5 files changed

+128
-347
lines changed

dash-spv/src/client/sync_coordinator.rs

Lines changed: 83 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
1414
use super::{BlockProcessingTask, DashSpvClient, MessageHandler};
1515
use crate::client::interface::DashSpvClientCommand;
16-
use crate::error::{Result, SpvError};
16+
use crate::error::{Result, SpvError, StorageResult};
1717
use crate::network::constants::MESSAGE_RECEIVE_TIMEOUT;
1818
use crate::network::NetworkManager;
19-
use crate::storage::StorageManager;
19+
use crate::storage::{PersistentSyncState, RecoverySuggestion, StorageManager};
2020
use crate::sync::headers::validate_headers;
2121
use crate::types::{CachedHeader, DetailedSyncProgress, SyncProgress};
22+
use crate::StorageError;
2223
use key_wallet_manager::wallet_interface::WalletInterface;
2324
use std::time::{Duration, Instant, SystemTime};
2425
use tokio::sync::mpsc::UnboundedReceiver;
@@ -725,183 +726,140 @@ impl<
725726
/// Returns true if state was successfully restored, false if no state was found.
726727
pub(super) async fn restore_sync_state(&mut self) -> Result<bool> {
727728
// Load and validate sync state
728-
let (saved_state, should_continue) = self.load_and_validate_sync_state().await?;
729-
if !should_continue {
730-
return Ok(false);
731-
}
732-
733-
let saved_state = saved_state.unwrap();
729+
let loaded_state = match self.load_and_validate_sync_state().await {
730+
Ok(state) => state,
731+
Err(StorageError::NotFound(_)) => {
732+
// No saved state found - this is normal for first run
733+
return Ok(false);
734+
}
735+
Err(e) => return Err(e.into()),
736+
};
734737

735-
tracing::info!(
736-
"Restoring sync state from height {} (saved at {:?})",
737-
saved_state.chain_tip.height,
738-
saved_state.saved_at
739-
);
738+
tracing::info!("Restoring sync state from height {}", loaded_state.chain_tip.height,);
740739

741740
// Restore headers from state
742-
if !self.restore_headers_from_state(&saved_state).await? {
741+
if !self.restore_headers_from_state(&loaded_state).await? {
743742
return Ok(false);
744743
}
745744

746745
// Restore filter headers from state
747-
self.restore_filter_headers_from_state(&saved_state).await?;
746+
self.restore_filter_headers_from_state(&loaded_state).await?;
748747

749748
// Update stats from state
750-
self.update_stats_from_state(&saved_state).await;
749+
self.update_stats_from_state(&loaded_state).await;
751750

752751
// Restore sync manager state
753-
if !self.restore_sync_manager_state(&saved_state).await? {
752+
if !self.restore_sync_manager_state(&loaded_state).await? {
754753
return Ok(false);
755754
}
756755

757756
tracing::info!(
758757
"Sync state restored: headers={}, filter_headers={}, filters_downloaded={}",
759-
saved_state.sync_progress.header_height,
760-
saved_state.sync_progress.filter_header_height,
761-
saved_state.filter_sync.filters_downloaded
758+
loaded_state.sync_progress.header_height,
759+
loaded_state.sync_progress.filter_header_height,
760+
loaded_state.filter_sync.filters_downloaded
762761
);
763762

764763
Ok(true)
765764
}
766765

767766
/// Load sync state from storage and validate it, handling recovery if needed.
767+
/// Returns NotFound error if no saved state exists.
768768
pub(super) async fn load_and_validate_sync_state(
769769
&mut self,
770-
) -> Result<(Option<crate::storage::PersistentSyncState>, bool)> {
770+
) -> StorageResult<PersistentSyncState> {
771771
// Load sync state from storage
772772
let sync_state = {
773773
let storage = self.storage.lock().await;
774-
storage.load_sync_state().await.map_err(SpvError::Storage)?
774+
storage.load_sync_state().await?
775775
};
776776

777-
let Some(saved_state) = sync_state else {
778-
return Ok((None, false));
777+
let Some(loaded_state) = sync_state else {
778+
return Err(StorageError::NotFound("No saved sync state".to_string()));
779779
};
780780

781781
// Validate the sync state
782-
let validation = saved_state.validate(self.config.network);
783-
784-
if !validation.is_valid {
785-
tracing::error!("Sync state validation failed:");
786-
for error in &validation.errors {
787-
tracing::error!(" - {}", error);
788-
}
789-
790-
// Handle recovery based on suggestion
791-
if let Some(suggestion) = validation.recovery_suggestion {
792-
return match suggestion {
793-
crate::storage::RecoverySuggestion::StartFresh => {
794-
tracing::warn!("Recovery: Starting fresh sync");
795-
Ok((None, false))
796-
}
797-
crate::storage::RecoverySuggestion::RollbackToHeight(height) => {
798-
let recovered = self.handle_rollback_recovery(height).await?;
799-
Ok((None, recovered))
800-
}
801-
crate::storage::RecoverySuggestion::UseCheckpoint(height) => {
802-
let recovered = self.handle_checkpoint_recovery(height).await?;
803-
Ok((None, recovered))
782+
let result = loaded_state.validate(self.config.network);
783+
match result {
784+
Err(StorageError::InconsistentState(error, recovery)) => {
785+
tracing::error!(
786+
"Sync state validation failed: {}, recovery: {:?}",
787+
error,
788+
recovery
789+
);
790+
match recovery {
791+
RecoverySuggestion::StartFresh => Err(StorageError::Corruption(format!(
792+
"Invalid sync state, starting fresh: {}",
793+
error
794+
))),
795+
RecoverySuggestion::RollbackToHeight(height) => {
796+
match self.handle_rollback_recovery(height).await {
797+
Ok(()) => Ok(loaded_state),
798+
Err(error) => Err(StorageError::Corruption(format!(
799+
"Failed to rollback sync state to height {}, error: {}",
800+
height, error
801+
))),
802+
}
804803
}
805-
crate::storage::RecoverySuggestion::PartialRecovery => {
806-
tracing::warn!("Recovery: Attempting partial recovery");
804+
RecoverySuggestion::PartialRecovery => {
807805
// For partial recovery, we keep headers but reset filter sync
808806
if let Err(e) = self.reset_filter_sync_state().await {
809807
tracing::error!("Failed to reset filter sync state: {}", e);
810808
}
811-
Ok((Some(saved_state), true))
809+
Ok(loaded_state)
812810
}
813-
};
811+
}
814812
}
815-
816-
return Ok((None, false));
813+
Err(error) => Err(error),
814+
Ok(()) => Ok(loaded_state),
817815
}
818-
819-
// Log any warnings
820-
for warning in &validation.warnings {
821-
tracing::warn!("Sync state warning: {}", warning);
822-
}
823-
824-
Ok((Some(saved_state), true))
825816
}
826817

827818
/// Handle rollback recovery to a specific height.
828-
pub(super) async fn handle_rollback_recovery(&mut self, height: u32) -> Result<bool> {
819+
pub(super) async fn handle_rollback_recovery(&mut self, height: u32) -> StorageResult<()> {
829820
tracing::warn!("Recovery: Rolling back to height {}", height);
830821

831822
// Validate the rollback height
832823
if height == 0 {
833-
tracing::error!("Cannot rollback to genesis block (height 0)");
834-
return Ok(false);
824+
let error = "Cannot rollback to genesis block (height 0)";
825+
tracing::error!(error);
826+
return Err(StorageError::Corruption(error.to_string()));
835827
}
836828

837829
// Get current height from storage to validate against
838830
let current_height = {
839831
let storage = self.storage.lock().await;
840-
storage.get_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0)
832+
storage.get_tip_height().await?.unwrap_or(0)
841833
};
842834

843835
if height > current_height {
844-
tracing::error!(
836+
let error = format!(
845837
"Cannot rollback to height {} which is greater than current height {}",
846-
height,
847-
current_height
838+
height, current_height
848839
);
849-
return Ok(false);
840+
tracing::error!(error);
841+
return Err(StorageError::Corruption(error.to_string()));
850842
}
851843

852844
match self.rollback_to_height(height).await {
853845
Ok(_) => {
854846
tracing::info!("Successfully rolled back to height {}", height);
855-
Ok(false) // Start fresh sync from rollback point
847+
Ok(()) // Continue sync from rollback point
856848
}
857849
Err(e) => {
858850
tracing::error!("Failed to rollback to height {}: {}", height, e);
859-
Ok(false) // Start fresh sync
860-
}
861-
}
862-
}
863-
864-
/// Handle checkpoint recovery at a specific height.
865-
pub(super) async fn handle_checkpoint_recovery(&mut self, height: u32) -> Result<bool> {
866-
tracing::warn!("Recovery: Using checkpoint at height {}", height);
867-
868-
// Validate the checkpoint height
869-
if height == 0 {
870-
tracing::error!("Cannot use checkpoint at genesis block (height 0)");
871-
return Ok(false);
872-
}
873-
874-
// Check if checkpoint height is reasonable (not in the future)
875-
let current_height = {
876-
let storage = self.storage.lock().await;
877-
storage.get_tip_height().await.map_err(SpvError::Storage)?.unwrap_or(0)
878-
};
879-
880-
if current_height > 0 && height > current_height {
881-
tracing::error!(
882-
"Cannot use checkpoint at height {} which is greater than current height {}",
883-
height,
884-
current_height
885-
);
886-
return Ok(false);
887-
}
888-
889-
match self.recover_from_checkpoint(height).await {
890-
Ok(_) => {
891-
tracing::info!("Successfully recovered from checkpoint at height {}", height);
892-
Ok(true) // State restored from checkpoint
893-
}
894-
Err(e) => {
895-
tracing::error!("Failed to recover from checkpoint {}: {}", height, e);
896-
Ok(false) // Start fresh sync
851+
Err(StorageError::Corruption(format!(
852+
"Failed to rollback to height {}: {}",
853+
height, e
854+
)))
897855
}
898856
}
899857
}
900858

901859
/// Restore headers from saved state into ChainState.
902860
pub(super) async fn restore_headers_from_state(
903861
&mut self,
904-
saved_state: &crate::storage::PersistentSyncState,
862+
saved_state: &PersistentSyncState,
905863
) -> Result<bool> {
906864
if saved_state.chain_tip.height == 0 {
907865
return Ok(true);
@@ -1017,7 +975,7 @@ impl<
1017975
/// Restore filter headers from saved state.
1018976
pub(super) async fn restore_filter_headers_from_state(
1019977
&mut self,
1020-
saved_state: &crate::storage::PersistentSyncState,
978+
saved_state: &PersistentSyncState,
1021979
) -> Result<()> {
1022980
if saved_state.sync_progress.filter_header_height == 0 {
1023981
return Ok(());
@@ -1045,10 +1003,7 @@ impl<
10451003
}
10461004

10471005
/// Update stats from saved state.
1048-
pub(super) async fn update_stats_from_state(
1049-
&mut self,
1050-
saved_state: &crate::storage::PersistentSyncState,
1051-
) {
1006+
pub(super) async fn update_stats_from_state(&mut self, saved_state: &PersistentSyncState) {
10521007
let mut stats = self.stats.write().await;
10531008
stats.headers_downloaded = saved_state.sync_progress.header_height as u64;
10541009
stats.filter_headers_downloaded = saved_state.sync_progress.filter_header_height as u64;
@@ -1154,43 +1109,6 @@ impl<
11541109
Ok(())
11551110
}
11561111

1157-
/// Recover from a saved checkpoint.
1158-
pub(super) async fn recover_from_checkpoint(&mut self, checkpoint_height: u32) -> Result<()> {
1159-
tracing::info!("Recovering from checkpoint at height {}", checkpoint_height);
1160-
1161-
// Load checkpoints around the target height
1162-
let checkpoints = {
1163-
let storage = self.storage.lock().await;
1164-
storage
1165-
.get_sync_checkpoints(checkpoint_height, checkpoint_height)
1166-
.await
1167-
.map_err(SpvError::Storage)?
1168-
};
1169-
1170-
if checkpoints.is_empty() {
1171-
return Err(SpvError::Config(format!(
1172-
"No checkpoint found at height {}",
1173-
checkpoint_height
1174-
)));
1175-
}
1176-
1177-
let checkpoint = &checkpoints[0];
1178-
1179-
// Verify the checkpoint is validated
1180-
if !checkpoint.validated {
1181-
return Err(SpvError::Config(format!(
1182-
"Checkpoint at height {} is not validated",
1183-
checkpoint_height
1184-
)));
1185-
}
1186-
1187-
// Rollback to checkpoint height
1188-
self.rollback_to_height(checkpoint_height).await?;
1189-
1190-
tracing::info!("Successfully recovered from checkpoint at height {}", checkpoint_height);
1191-
Ok(())
1192-
}
1193-
11941112
/// Reset filter sync state while keeping headers.
11951113
pub(super) async fn reset_filter_sync_state(&mut self) -> Result<()> {
11961114
tracing::info!("Resetting filter sync state");
@@ -1233,39 +1151,28 @@ impl<
12331151
let chain_state = self.state.read().await;
12341152

12351153
// Create persistent sync state
1236-
let persistent_state = crate::storage::PersistentSyncState::from_chain_state(
1154+
let Some(state) = PersistentSyncState::from_chain_state(
12371155
&chain_state,
12381156
&sync_progress,
12391157
self.config.network,
1240-
);
1241-
1242-
if let Some(state) = persistent_state {
1243-
// Check if we should create a checkpoint
1244-
if state.should_checkpoint(state.chain_tip.height) {
1245-
if let Some(checkpoint) = state.checkpoints.last() {
1246-
let mut storage = self.storage.lock().await;
1247-
storage
1248-
.store_sync_checkpoint(checkpoint.height, checkpoint)
1249-
.await
1250-
.map_err(SpvError::Storage)?;
1251-
tracing::info!("Created sync checkpoint at height {}", checkpoint.height);
1252-
}
1253-
}
1254-
1255-
// Save the sync state
1256-
{
1257-
let mut storage = self.storage.lock().await;
1258-
storage.store_sync_state(&state).await.map_err(SpvError::Storage)?;
1259-
}
1158+
) else {
1159+
tracing::warn!("Cannot save sync state: chain state has no tip");
1160+
return Ok(());
1161+
};
12601162

1261-
tracing::debug!(
1262-
"Saved sync state: headers={}, filter_headers={}, filters={}",
1263-
state.sync_progress.header_height,
1264-
state.sync_progress.filter_header_height,
1265-
state.filter_sync.filters_downloaded
1266-
);
1163+
// Save the sync state
1164+
{
1165+
let mut storage = self.storage.lock().await;
1166+
storage.store_sync_state(&state).await.map_err(SpvError::Storage)?;
12671167
}
12681168

1169+
tracing::debug!(
1170+
"Saved sync state: headers={}, filter_headers={}, filters={}",
1171+
state.sync_progress.header_height,
1172+
state.sync_progress.filter_header_height,
1173+
state.filter_sync.filters_downloaded
1174+
);
1175+
12691176
Ok(())
12701177
}
12711178
}

0 commit comments

Comments
 (0)