Skip to content

Commit a8a1257

Browse files
authored
refactor: storage segments cleanup (#244)
* segments cache struct and evict method generalized * save segments to disk made generic inside the segments cache struct * save_dirty_segments logic refactorized * tbh I dont know waht I refactored here * removed io module inside storage/disk * you are right rabbit * store in segment moved to SegmentCache * sentinel headers creation moved to the Persistable trait and encapsulated there * unified sentinel item behaviour - no longer compiles bcs the tip_high calculation * renames * new struct to manage the hashmap of segments and tip height moved - doesnt compile yet, wip * get_headers generalized in the SegmentsCache struct - wip, not compiles * store headers logic moved to the SegmentsCache and introduced method to better handle tip_height and storage index - wip, no compiles * store_headers_impl using precomputed_hashes didn't provide real benefics with the current usage - wip, no compiles * removed unused StorageManager::get_headers_batch methos - wip, no compiles * removed warnings * ensure segment loaded moved inside the SegmentsCache with a logic change, we ask for a segment and if it doesn't exist in memory we load it from disk - wip, no compiles * const MAX_ACTIVE_SEGMENTS encapsulated - wip, no compiles * removed one commit as it is fixed * created a SegmentsCache::store_headers_at_height - wip, no compiles * removed inconsistency when loading segments metadata * removed to methods unused bcs now that behaviour is encapsulated * building SegmentCache yip_height when creating the struct * removed unused function * some refactor and removed the notification enum and related behaviour - wip, no compiles * disk storage manager worker no longer needs cloned headers to work * renamed segments stoage fields * removed new unused function * evict logic removed * save dirty segments logic moved into SegmentsCache * clippy warnings fixed * save dirty is now an instance method for the DiskStorageManager * when persisting segments we try to create the parent dirs to ensure they exist * improved logic to ensure different clear behaviour for SegmentsCache * correctly rebuilding the block reverse index * fixed bug found by test test_checkpoint_storage_indexing * fixed bug updating tip_height in SegmentCache thanks spotted by test test_filter_header_segments * fixed bug, we stop persisting segments after we find the first sentinel, to correctly initialize valid_count - bug spotted by test test_filter_header_persistence * refactor: HEADER_PER_SEGMENT encapsulated inside segment and renamed to ITEMS_PER_SEGMENT - wip, no compiles * block index rebuild logic moved into SegmentCache<BlockHeader> and load_segment_metadata renamed in flavor of a better name for its current behaviour being the block index contructor * added some cool inlines * fixed test that was creating a centinel filter header at height 0 making the segment not persist entirely * renamed header reference to item in segments.rs so its clear that the new struct can work with any struct * clippy warning fixed * logging when storing multiple items simplified * removed sentinel headers from the segments logic * unit tests for the segment and segment_cache structs after the refactor (#259) * removed unused methods after rebase * renamed and removed old documentation for store_headers_impl * refactorized and adjusted docs for conversion methods between stoage index, height and offset * removed old comments and forcing to give sync_base_height when creating the SegmentCache * quick fix to load sync_base_height if persisted before * review comments addressed * read block index operation made async * using atomic write where we write to disk
1 parent 8f97c4b commit a8a1257

File tree

15 files changed

+863
-1410
lines changed

15 files changed

+863
-1410
lines changed

dash-spv/src/storage/disk/filters.rs

Lines changed: 3 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -1,188 +1,11 @@
11
//! Filter storage operations for DiskStorageManager.
22
3-
use std::ops::Range;
4-
5-
use dashcore::hash_types::FilterHeader;
6-
use dashcore_hashes::Hash;
7-
83
use crate::error::StorageResult;
94

105
use super::io::atomic_write;
116
use super::manager::DiskStorageManager;
12-
use super::segments::SegmentState;
137

148
impl DiskStorageManager {
15-
/// Store filter headers.
16-
pub async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
17-
let sync_base_height = *self.sync_base_height.read().await;
18-
19-
// Determine the next blockchain height
20-
let mut next_blockchain_height = {
21-
let current_tip = self.cached_filter_tip_height.read().await;
22-
match *current_tip {
23-
Some(tip) => tip + 1,
24-
None => {
25-
// If we have a checkpoint, start from there, otherwise from 0
26-
if sync_base_height > 0 {
27-
sync_base_height
28-
} else {
29-
0
30-
}
31-
}
32-
}
33-
};
34-
35-
for header in headers {
36-
// Convert blockchain height to storage index
37-
let storage_index = if sync_base_height > 0 {
38-
// For checkpoint sync, storage index is relative to sync_base_height
39-
if next_blockchain_height >= sync_base_height {
40-
next_blockchain_height - sync_base_height
41-
} else {
42-
// This shouldn't happen in normal operation
43-
tracing::warn!(
44-
"Attempting to store filter header at height {} below sync_base_height {}",
45-
next_blockchain_height,
46-
sync_base_height
47-
);
48-
next_blockchain_height
49-
}
50-
} else {
51-
// For genesis sync, storage index equals blockchain height
52-
next_blockchain_height
53-
};
54-
55-
let segment_id = Self::get_segment_id(storage_index);
56-
let offset = Self::get_segment_offset(storage_index);
57-
58-
// Ensure segment is loaded
59-
super::segments::ensure_filter_segment_loaded(self, segment_id).await?;
60-
61-
// Update segment
62-
{
63-
let mut segments = self.active_filter_segments.write().await;
64-
if let Some(segment) = segments.get_mut(&segment_id) {
65-
// Ensure we have space in the segment
66-
if offset >= segment.filter_headers.len() {
67-
// Fill with zero filter headers up to the offset
68-
let zero_filter_header = FilterHeader::from_byte_array([0u8; 32]);
69-
segment.filter_headers.resize(offset + 1, zero_filter_header);
70-
}
71-
segment.filter_headers[offset] = *header;
72-
// Transition to Dirty state (from Clean, Dirty, or Saving)
73-
segment.state = SegmentState::Dirty;
74-
segment.last_accessed = std::time::Instant::now();
75-
}
76-
}
77-
78-
next_blockchain_height += 1;
79-
}
80-
81-
// Update cached tip height with blockchain height
82-
if next_blockchain_height > 0 {
83-
*self.cached_filter_tip_height.write().await = Some(next_blockchain_height - 1);
84-
}
85-
86-
// Save dirty segments periodically (every 1000 filter headers)
87-
if headers.len() >= 1000 || next_blockchain_height % 1000 == 0 {
88-
super::segments::save_dirty_segments(self).await?;
89-
}
90-
91-
Ok(())
92-
}
93-
94-
/// Load filter headers for a blockchain height range.
95-
pub async fn load_filter_headers(&self, range: Range<u32>) -> StorageResult<Vec<FilterHeader>> {
96-
let sync_base_height = *self.sync_base_height.read().await;
97-
let mut filter_headers = Vec::new();
98-
99-
// Convert blockchain height range to storage index range
100-
let storage_start = if sync_base_height > 0 && range.start >= sync_base_height {
101-
range.start - sync_base_height
102-
} else {
103-
range.start
104-
};
105-
106-
let storage_end = if sync_base_height > 0 && range.end > sync_base_height {
107-
range.end - sync_base_height
108-
} else {
109-
range.end
110-
};
111-
112-
let start_segment = Self::get_segment_id(storage_start);
113-
let end_segment = Self::get_segment_id(storage_end.saturating_sub(1));
114-
115-
for segment_id in start_segment..=end_segment {
116-
super::segments::ensure_filter_segment_loaded(self, segment_id).await?;
117-
118-
let segments = self.active_filter_segments.read().await;
119-
if let Some(segment) = segments.get(&segment_id) {
120-
let start_idx = if segment_id == start_segment {
121-
Self::get_segment_offset(storage_start)
122-
} else {
123-
0
124-
};
125-
126-
let end_idx = if segment_id == end_segment {
127-
Self::get_segment_offset(storage_end.saturating_sub(1)) + 1
128-
} else {
129-
segment.filter_headers.len()
130-
};
131-
132-
if start_idx < segment.filter_headers.len()
133-
&& end_idx <= segment.filter_headers.len()
134-
{
135-
filter_headers.extend_from_slice(&segment.filter_headers[start_idx..end_idx]);
136-
}
137-
}
138-
}
139-
140-
Ok(filter_headers)
141-
}
142-
143-
/// Get a filter header at a specific blockchain height.
144-
pub async fn get_filter_header(
145-
&self,
146-
blockchain_height: u32,
147-
) -> StorageResult<Option<FilterHeader>> {
148-
let sync_base_height = *self.sync_base_height.read().await;
149-
150-
// Convert blockchain height to storage index
151-
let storage_index = if sync_base_height > 0 {
152-
// For checkpoint sync, storage index is relative to sync_base_height
153-
if blockchain_height >= sync_base_height {
154-
blockchain_height - sync_base_height
155-
} else {
156-
// This shouldn't happen in normal operation, but handle it gracefully
157-
tracing::warn!(
158-
"Attempting to get filter header at height {} below sync_base_height {}",
159-
blockchain_height,
160-
sync_base_height
161-
);
162-
return Ok(None);
163-
}
164-
} else {
165-
// For genesis sync, storage index equals blockchain height
166-
blockchain_height
167-
};
168-
169-
let segment_id = Self::get_segment_id(storage_index);
170-
let offset = Self::get_segment_offset(storage_index);
171-
172-
super::segments::ensure_filter_segment_loaded(self, segment_id).await?;
173-
174-
let segments = self.active_filter_segments.read().await;
175-
Ok(segments
176-
.get(&segment_id)
177-
.and_then(|segment| segment.filter_headers.get(offset))
178-
.copied())
179-
}
180-
181-
/// Get the blockchain height of the filter tip.
182-
pub async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>> {
183-
Ok(*self.cached_filter_tip_height.read().await)
184-
}
185-
1869
/// Store a compact filter.
18710
pub async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> {
18811
let path = self.base_path.join(format!("filters/{}.dat", height));
@@ -205,11 +28,10 @@ impl DiskStorageManager {
20528
// Stop worker to prevent concurrent writes to filter directories
20629
self.stop_worker().await;
20730

208-
// Clear in-memory filter state
209-
self.active_filter_segments.write().await.clear();
210-
*self.cached_filter_tip_height.write().await = None;
31+
// Clear in-memory and on-disk filter headers segments
32+
self.filter_headers.write().await.clear_all().await?;
21133

212-
// Remove filter headers and compact filter files
34+
// Remove on-disk compact filter files
21335
let filters_dir = self.base_path.join("filters");
21436
if filters_dir.exists() {
21537
tokio::fs::remove_dir_all(&filters_dir).await?;

0 commit comments

Comments
 (0)