Skip to content

Commit dc5fed4

Browse files
committed
Merge branch 'fix/filter-sync' into feat/dash-spv-client-interface
2 parents c586be9 + c26abcc commit dc5fed4

File tree

8 files changed

+368
-41
lines changed

8 files changed

+368
-41
lines changed

dash-spv/src/storage/disk/filters.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::ops::Range;
55
use dashcore::hash_types::FilterHeader;
66
use dashcore_hashes::Hash;
77

8-
use crate::error::StorageResult;
8+
use crate::error::{StorageError, StorageResult};
9+
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
910

1011
use super::manager::DiskStorageManager;
1112
use super::segments::SegmentState;
@@ -216,6 +217,17 @@ impl DiskStorageManager {
216217
}
217218
tokio::fs::create_dir_all(&filters_dir).await?;
218219

220+
// Remove trusted checkpoint predecessor filter header metadata if present
221+
let metadata_path =
222+
self.base_path.join(format!("state/{}.dat", CHECKPOINT_PREV_FILTER_HEADER_KEY));
223+
if metadata_path.exists() {
224+
if let Err(e) = tokio::fs::remove_file(&metadata_path).await {
225+
if e.kind() != std::io::ErrorKind::NotFound {
226+
return Err(StorageError::Io(e));
227+
}
228+
}
229+
}
230+
219231
// Restart background worker for future operations
220232
self.start_worker().await;
221233

dash-spv/src/storage/memory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::ops::Range;
77
use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid};
88

99
use crate::error::{StorageError, StorageResult};
10+
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
1011
use crate::storage::{MasternodeState, StorageManager, StorageStats};
1112
use crate::types::{ChainState, MempoolState, UnconfirmedTransaction};
1213

@@ -310,6 +311,7 @@ impl StorageManager for MemoryStorageManager {
310311
async fn clear_filters(&mut self) -> StorageResult<()> {
311312
self.filter_headers.clear();
312313
self.filters.clear();
314+
self.metadata.remove(CHECKPOINT_PREV_FILTER_HEADER_KEY);
313315
Ok(())
314316
}
315317

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
//! Common metadata keys stored by storage backends.
2+
3+
/// Metadata key storing the filter header for the block immediately before a trusted checkpoint.
4+
pub const CHECKPOINT_PREV_FILTER_HEADER_KEY: &str = "checkpoint_prev_filter_header_v1";

dash-spv/src/storage/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
pub mod disk;
44
pub mod memory;
5+
pub mod metadata_keys;
56
pub mod sync_state;
67
pub mod sync_storage;
78
pub mod types;

dash-spv/src/sync/filters/download.rs

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use dashcore::{
1616
BlockHash,
1717
};
1818

19+
use super::manager::TrustedCheckpointFilterHeader;
1920
use super::types::*;
2021
use crate::error::{SyncError, SyncResult};
2122
use crate::network::NetworkManager;
@@ -38,19 +39,50 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
3839
return Ok(true);
3940
}
4041

41-
// Load previous and expected headers
42-
let prev_header = storage.get_filter_header(height - 1).await.map_err(|e| {
43-
SyncError::Storage(format!("Failed to load previous filter header: {}", e))
44-
})?;
42+
let prev_height = height - 1;
43+
let prev_header = match storage.get_filter_header(prev_height).await.map_err(|e| {
44+
SyncError::Storage(format!(
45+
"Failed to load previous filter header at height {}: {}",
46+
prev_height, e
47+
))
48+
})? {
49+
Some(header) => header,
50+
None if self.sync_base_height > 0 && height == self.sync_base_height => {
51+
match self.load_checkpoint_prev_filter_header(storage).await? {
52+
Some(trusted) if trusted.height == prev_height => trusted.header,
53+
Some(trusted) => {
54+
tracing::error!(
55+
"Checkpoint predecessor header height mismatch: expected {}, stored {}",
56+
prev_height,
57+
trusted.height
58+
);
59+
return Ok(false);
60+
}
61+
None => {
62+
tracing::warn!(
63+
"Missing trusted checkpoint predecessor filter header at height {}",
64+
prev_height
65+
);
66+
return Ok(false);
67+
}
68+
}
69+
}
70+
None => {
71+
tracing::warn!(
72+
"Missing filter header at height {} required for verifying cfilter at {}",
73+
prev_height,
74+
height
75+
);
76+
return Ok(false);
77+
}
78+
};
79+
4580
let expected_header = storage.get_filter_header(height).await.map_err(|e| {
4681
SyncError::Storage(format!("Failed to load expected filter header: {}", e))
4782
})?;
4883

49-
let (Some(prev_header), Some(expected_header)) = (prev_header, expected_header) else {
50-
tracing::warn!(
51-
"Missing filter headers in storage for height {} (prev and/or expected)",
52-
height
53-
);
84+
let Some(expected_header) = expected_header else {
85+
tracing::warn!("Missing filter headers in storage for height {}", height);
5486
return Ok(false);
5587
};
5688

@@ -582,8 +614,21 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
582614
);
583615
}
584616

585-
// If this is the first batch after a checkpoint, store the checkpoint filter header
586-
if self.sync_base_height > 0
617+
// If this is the first batch that begins at the checkpoint base, persist the
618+
// trusted predecessor filter header so we can verify the checkpoint filter.
619+
if self.sync_base_height > 0 && start_height == self.sync_base_height {
620+
if let Some(prev_height) = self.sync_base_height.checked_sub(1) {
621+
let record = TrustedCheckpointFilterHeader {
622+
height: prev_height,
623+
header: cfheaders.previous_filter_header,
624+
};
625+
self.persist_checkpoint_prev_filter_header(storage, record).await?;
626+
tracing::info!(
627+
"Stored trusted checkpoint predecessor filter header at height {}",
628+
prev_height
629+
);
630+
}
631+
} else if self.sync_base_height > 0
587632
&& start_height == self.sync_base_height + 1
588633
&& current_filter_tip < self.sync_base_height
589634
{

dash-spv/src/sync/filters/manager.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,51 @@
66
use dashcore::{hash_types::FilterHeader, network::message_filter::CFHeaders, BlockHash};
77
use dashcore_hashes::{sha256d, Hash};
88
use std::collections::{HashMap, HashSet, VecDeque};
9+
use tokio::sync::Mutex;
910

1011
use crate::client::ClientConfig;
1112
use crate::error::{SyncError, SyncResult};
1213
use crate::network::NetworkManager;
14+
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
1315
use crate::storage::StorageManager;
1416
use crate::types::SharedFilterHeights;
1517

1618
// Import types and constants from the types module
1719
use super::types::*;
1820

21+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
22+
pub(super) struct TrustedCheckpointFilterHeader {
23+
pub(super) height: u32,
24+
pub(super) header: FilterHeader,
25+
}
26+
27+
impl TrustedCheckpointFilterHeader {
28+
fn to_bytes(self) -> [u8; 36] {
29+
let mut buf = [0u8; 36];
30+
buf[..4].copy_from_slice(&self.height.to_le_bytes());
31+
buf[4..].copy_from_slice(self.header.as_byte_array());
32+
buf
33+
}
34+
35+
fn from_bytes(bytes: &[u8]) -> Option<Self> {
36+
if bytes.len() != 36 {
37+
return None;
38+
}
39+
40+
let mut height_bytes = [0u8; 4];
41+
height_bytes.copy_from_slice(&bytes[..4]);
42+
let height = u32::from_le_bytes(height_bytes);
43+
44+
let mut header_bytes = [0u8; 32];
45+
header_bytes.copy_from_slice(&bytes[4..36]);
46+
47+
Some(Self {
48+
height,
49+
header: FilterHeader::from_byte_array(header_bytes),
50+
})
51+
}
52+
}
53+
1954
/// Manages BIP157 compact block filter synchronization.
2055
///
2156
/// # Generic Parameters
@@ -102,6 +137,8 @@ pub struct FilterSyncManager<S: StorageManager, N: NetworkManager> {
102137
pub(super) max_concurrent_cfheader_requests: usize,
103138
/// Timeout for CFHeaders requests
104139
pub(super) cfheader_request_timeout: std::time::Duration,
140+
/// Trusted predecessor filter header for the configured checkpoint base
141+
checkpoint_prev_filter_header: Mutex<Option<TrustedCheckpointFilterHeader>>,
105142
}
106143

107144
impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
@@ -148,6 +185,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
148185
cfheader_request_timeout: std::time::Duration::from_secs(
149186
config.cfheaders_request_timeout_secs,
150187
),
188+
checkpoint_prev_filter_header: Mutex::new(None),
151189
_phantom_s: std::marker::PhantomData,
152190
_phantom_n: std::marker::PhantomData,
153191
}
@@ -264,6 +302,54 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
264302
Ok(new_filter_headers)
265303
}
266304

305+
pub(super) async fn persist_checkpoint_prev_filter_header(
306+
&self,
307+
storage: &mut S,
308+
header: TrustedCheckpointFilterHeader,
309+
) -> SyncResult<()> {
310+
storage
311+
.store_metadata(CHECKPOINT_PREV_FILTER_HEADER_KEY, &header.to_bytes())
312+
.await
313+
.map_err(|e| {
314+
SyncError::Storage(format!(
315+
"Failed to persist checkpoint predecessor filter header: {}",
316+
e
317+
))
318+
})?;
319+
320+
let mut guard = self.checkpoint_prev_filter_header.lock().await;
321+
*guard = Some(header);
322+
Ok(())
323+
}
324+
325+
pub(super) async fn load_checkpoint_prev_filter_header(
326+
&self,
327+
storage: &S,
328+
) -> SyncResult<Option<TrustedCheckpointFilterHeader>> {
329+
let mut guard = self.checkpoint_prev_filter_header.lock().await;
330+
if guard.is_none() {
331+
if let Some(bytes) =
332+
storage.load_metadata(CHECKPOINT_PREV_FILTER_HEADER_KEY).await.map_err(|e| {
333+
SyncError::Storage(format!(
334+
"Failed to load checkpoint predecessor filter header: {}",
335+
e
336+
))
337+
})?
338+
{
339+
if let Some(record) = TrustedCheckpointFilterHeader::from_bytes(&bytes) {
340+
*guard = Some(record);
341+
} else {
342+
tracing::warn!(
343+
"Stored checkpoint predecessor filter header has unexpected format ({} bytes)",
344+
bytes.len()
345+
);
346+
}
347+
}
348+
}
349+
350+
Ok(*guard)
351+
}
352+
267353
/// Handle overlapping filter headers by skipping already processed ones.
268354
pub fn has_pending_downloads(&self) -> bool {
269355
!self.pending_block_downloads.is_empty() || !self.downloading_blocks.is_empty()

0 commit comments

Comments
 (0)