Skip to content

Commit 65787e8

Browse files
feat: filter storage using segmentscache (#267)
* Implement segmented filter storage * Store filters during initial sync * Add startup initialization * Add tests * filter data storage uses SegmentCache generic struct to persist teh data * fixed clippy warning * correctly setting sync_base_height using the chain state in the new filters segment cache --------- Co-authored-by: xdustinface <xdustinfacex@gmail.com>
1 parent a8a1257 commit 65787e8

File tree

7 files changed

+72
-52
lines changed

7 files changed

+72
-52
lines changed

dash-spv/src/storage/disk/filters.rs

Lines changed: 0 additions & 46 deletions
This file was deleted.

dash-spv/src/storage/disk/manager.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ pub(super) enum WorkerCommand {
2424
SaveFilterHeaderSegmentCache {
2525
segment_id: u32,
2626
},
27+
SaveFilterSegmentCache {
28+
segment_id: u32,
29+
},
2730
SaveIndex {
2831
index: HashMap<BlockHash, u32>,
2932
},
@@ -37,6 +40,7 @@ pub struct DiskStorageManager {
3740
// Segmented header storage
3841
pub(super) block_headers: Arc<RwLock<SegmentCache<BlockHeader>>>,
3942
pub(super) filter_headers: Arc<RwLock<SegmentCache<FilterHeader>>>,
43+
pub(super) filters: Arc<RwLock<SegmentCache<Vec<u8>>>>,
4044

4145
// Reverse index for O(1) lookups
4246
pub(super) header_hash_index: Arc<RwLock<HashMap<BlockHash, u32>>>,
@@ -107,6 +111,9 @@ impl DiskStorageManager {
107111
filter_headers: Arc::new(RwLock::new(
108112
SegmentCache::load_or_new(base_path.clone(), sync_base_height).await?,
109113
)),
114+
filters: Arc::new(RwLock::new(
115+
SegmentCache::load_or_new(base_path.clone(), sync_base_height).await?,
116+
)),
110117
header_hash_index: Arc::new(RwLock::new(HashMap::new())),
111118
worker_tx: None,
112119
worker_handle: None,
@@ -120,6 +127,7 @@ impl DiskStorageManager {
120127
if let Ok(Some(state)) = storage.load_chain_state().await {
121128
storage.filter_headers.write().await.set_sync_base_height(state.sync_base_height);
122129
storage.block_headers.write().await.set_sync_base_height(state.sync_base_height);
130+
storage.filters.write().await.set_sync_base_height(state.sync_base_height);
123131
tracing::debug!("Loaded sync_base_height: {}", state.sync_base_height);
124132
}
125133

@@ -151,6 +159,7 @@ impl DiskStorageManager {
151159

152160
let block_headers = Arc::clone(&self.block_headers);
153161
let filter_headers = Arc::clone(&self.filter_headers);
162+
let cfilters = Arc::clone(&self.filters);
154163

155164
let worker_handle = tokio::spawn(async move {
156165
while let Some(cmd) = worker_rx.recv().await {
@@ -203,6 +212,30 @@ impl DiskStorageManager {
203212
}
204213
}
205214
}
215+
WorkerCommand::SaveFilterSegmentCache {
216+
segment_id,
217+
} => {
218+
let mut cache = cfilters.write().await;
219+
let segment = match cache.get_segment_mut(&segment_id).await {
220+
Ok(segment) => segment,
221+
Err(e) => {
222+
eprintln!("Failed to get segment {}: {}", segment_id, e);
223+
continue;
224+
}
225+
};
226+
227+
match segment.persist(&base_path).await {
228+
Ok(()) => {
229+
tracing::trace!(
230+
"Background worker completed saving filter segment {}",
231+
segment_id
232+
);
233+
}
234+
Err(e) => {
235+
eprintln!("Failed to save segment {}: {}", segment_id, e);
236+
}
237+
}
238+
}
206239
WorkerCommand::SaveIndex {
207240
index,
208241
} => {

dash-spv/src/storage/disk/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
//! - Better concurrency
2020
//! - Simpler code
2121
22-
mod filters;
2322
mod headers;
2423
pub(crate) mod io;
2524
mod lockfile;

dash-spv/src/storage/disk/segments.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ pub trait Persistable: Sized + Encodable + Decodable + PartialEq + Clone {
5151
fn make_save_command(segment: &Segment<Self>) -> WorkerCommand;
5252
}
5353

54+
impl Persistable for Vec<u8> {
55+
const FOLDER_NAME: &'static str = "filters";
56+
57+
fn make_save_command(segment: &Segment<Self>) -> WorkerCommand {
58+
WorkerCommand::SaveFilterSegmentCache {
59+
segment_id: segment.segment_id,
60+
}
61+
}
62+
}
63+
5464
impl Persistable for BlockHeader {
5565
const FOLDER_NAME: &'static str = "block_headers";
5666

dash-spv/src/storage/disk/state.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ impl DiskStorageManager {
2121
// Update our sync_base_height
2222
self.filter_headers.write().await.set_sync_base_height(state.sync_base_height);
2323
self.block_headers.write().await.set_sync_base_height(state.sync_base_height);
24+
self.filters.write().await.set_sync_base_height(state.sync_base_height);
2425

2526
// First store all headers
2627
// For checkpoint sync, we need to store headers starting from the checkpoint height
@@ -355,6 +356,8 @@ impl DiskStorageManager {
355356
// Clear in-memory state
356357
self.block_headers.write().await.clear_in_memory();
357358
self.filter_headers.write().await.clear_in_memory();
359+
self.filters.write().await.clear_in_memory();
360+
358361
self.header_hash_index.write().await.clear();
359362
self.mempool_transactions.write().await.clear();
360363
*self.mempool_state.write().await = None;
@@ -449,8 +452,8 @@ impl DiskStorageManager {
449452
/// Save all dirty segments to disk via background worker.
450453
pub(super) async fn save_dirty(&self) {
451454
self.filter_headers.write().await.persist_dirty(self).await;
452-
453455
self.block_headers.write().await.persist_dirty(self).await;
456+
self.filters.write().await.persist_dirty(self).await;
454457

455458
if let Some(tx) = &self.worker_tx {
456459
// Save the index only if it has grown significantly (every 10k new entries)
@@ -594,11 +597,16 @@ impl StorageManager for DiskStorageManager {
594597
}
595598

596599
async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> {
597-
Self::store_filter(self, height, filter).await
600+
self.filters.write().await.store_items_at_height(&[filter.to_vec()], height, self).await
598601
}
599602

600603
async fn load_filter(&self, height: u32) -> StorageResult<Option<Vec<u8>>> {
601-
Self::load_filter(self, height).await
604+
self.filters
605+
.write()
606+
.await
607+
.get_items(height..height + 1)
608+
.await
609+
.map(|items| items.first().cloned())
602610
}
603611

604612
async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()> {
@@ -614,7 +622,17 @@ impl StorageManager for DiskStorageManager {
614622
}
615623

616624
async fn clear_filters(&mut self) -> StorageResult<()> {
617-
Self::clear_filters(self).await
625+
// Stop worker to prevent concurrent writes to filter directories
626+
self.stop_worker().await;
627+
628+
// Clear in-memory and on-disk filter headers segments
629+
self.filter_headers.write().await.clear_all().await?;
630+
self.filters.write().await.clear_all().await?;
631+
632+
// Restart background worker for future operations
633+
self.start_worker().await;
634+
635+
Ok(())
618636
}
619637

620638
async fn stats(&self) -> StorageResult<StorageStats> {

dash-spv/src/sync/message_handlers.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,12 @@ impl<
611611
return Ok(());
612612
}
613613

614+
// Store the verified filter to disk
615+
storage
616+
.store_filter(height, &cfilter.filter)
617+
.await
618+
.map_err(|e| SyncError::Storage(format!("Failed to store filter: {}", e)))?;
619+
614620
let matches = self
615621
.filter_sync
616622
.check_filter_for_matches(

dash-spv/tests/segmented_storage_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ async fn test_mixed_operations() {
302302
storage.store_filter_headers(&filter_headers).await.unwrap();
303303

304304
// Store some filters
305-
for height in [1000, 5000, 50_000, 70_000] {
305+
for height in 0..75_000 {
306306
let filter_data = vec![height as u8; 100];
307307
storage.store_filter(height, &filter_data).await.unwrap();
308308
}

0 commit comments

Comments
 (0)