Skip to content

Commit 8ea1a2c

Browse files
authored
refactor: rework storage to periodically persist (#278)
* the disk storage manager worker is now a time based check, removed old command communciation * tests updated * thanks coderabbit * clippy warnings * taking the background worker handle before aborting to ensure it is not used again
1 parent aacd841 commit 8ea1a2c

File tree

5 files changed

+132
-278
lines changed

5 files changed

+132
-278
lines changed

dash-spv/src/storage/headers.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ impl DiskStorageManager {
2020
) -> StorageResult<()> {
2121
let hashes = headers.iter().map(|header| header.block_hash()).collect::<Vec<_>>();
2222

23-
self.block_headers.write().await.store_items(headers, height, self).await?;
23+
self.block_headers.write().await.store_items_at_height(headers, height).await?;
2424

2525
// Update reverse index
2626
let mut reverse_index = self.header_hash_index.write().await;
@@ -30,9 +30,6 @@ impl DiskStorageManager {
3030
height += 1;
3131
}
3232

33-
// Release locks before saving (to avoid deadlocks during background saves)
34-
drop(reverse_index);
35-
3633
Ok(())
3734
}
3835

dash-spv/src/storage/manager.rs

Lines changed: 15 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
use std::collections::HashMap;
44
use std::path::PathBuf;
55
use std::sync::Arc;
6-
use tokio::sync::{mpsc, RwLock};
6+
use std::time::Duration;
7+
use tokio::sync::RwLock;
78

89
use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid};
910

@@ -14,24 +15,6 @@ use crate::types::{MempoolState, UnconfirmedTransaction};
1415

1516
use super::lockfile::LockFile;
1617

17-
/// Commands for the background worker
18-
#[derive(Debug, Clone)]
19-
pub(super) enum WorkerCommand {
20-
SaveBlockHeaderSegmentCache {
21-
segment_id: u32,
22-
},
23-
SaveFilterHeaderSegmentCache {
24-
segment_id: u32,
25-
},
26-
SaveFilterSegmentCache {
27-
segment_id: u32,
28-
},
29-
SaveIndex {
30-
index: HashMap<BlockHash, u32>,
31-
},
32-
Shutdown,
33-
}
34-
3518
/// Disk-based storage manager with segmented files and async background saving.
3619
pub struct DiskStorageManager {
3720
pub(super) base_path: PathBuf,
@@ -45,12 +28,8 @@ pub struct DiskStorageManager {
4528
pub(super) header_hash_index: Arc<RwLock<HashMap<BlockHash, u32>>>,
4629

4730
// Background worker
48-
pub(super) worker_tx: Option<mpsc::Sender<WorkerCommand>>,
4931
pub(super) worker_handle: Option<tokio::task::JoinHandle<()>>,
5032

51-
// Index save tracking to avoid redundant saves
52-
pub(super) last_index_save_count: Arc<RwLock<usize>>,
53-
5433
// Mempool storage
5534
pub(super) mempool_transactions: Arc<RwLock<HashMap<Txid, UnconfirmedTransaction>>>,
5635
pub(super) mempool_state: Arc<RwLock<Option<MempoolState>>>,
@@ -94,9 +73,7 @@ impl DiskStorageManager {
9473
)),
9574
filters: Arc::new(RwLock::new(SegmentCache::load_or_new(base_path.clone()).await?)),
9675
header_hash_index: Arc::new(RwLock::new(HashMap::new())),
97-
worker_tx: None,
9876
worker_handle: None,
99-
last_index_save_count: Arc::new(RwLock::new(0)),
10077
mempool_transactions: Arc::new(RwLock::new(HashMap::new())),
10178
mempool_state: Arc::new(RwLock::new(None)),
10279
_lock_file: lock_file,
@@ -107,7 +84,8 @@ impl DiskStorageManager {
10784
tracing::debug!("Loaded sync_base_height: {}", state.sync_base_height);
10885
}
10986

110-
// Start background worker
87+
// Start background worker that
88+
// persists data when appropriate
11189
storage.start_worker().await;
11290

11391
// Rebuild index
@@ -136,118 +114,29 @@ impl DiskStorageManager {
136114

137115
/// Start the background worker
138116
pub(super) async fn start_worker(&mut self) {
139-
let (worker_tx, mut worker_rx) = mpsc::channel::<WorkerCommand>(100);
140-
141-
let worker_base_path = self.base_path.clone();
142-
let base_path = self.base_path.clone();
143-
144117
let block_headers = Arc::clone(&self.block_headers);
145118
let filter_headers = Arc::clone(&self.filter_headers);
146-
let cfilters = Arc::clone(&self.filters);
119+
let filters = Arc::clone(&self.filters);
147120

148121
let worker_handle = tokio::spawn(async move {
149-
while let Some(cmd) = worker_rx.recv().await {
150-
match cmd {
151-
WorkerCommand::SaveBlockHeaderSegmentCache {
152-
segment_id,
153-
} => {
154-
let mut cache = block_headers.write().await;
155-
let segment = match cache.get_segment_mut(&segment_id).await {
156-
Ok(segment) => segment,
157-
Err(e) => {
158-
eprintln!("Failed to get segment {}: {}", segment_id, e);
159-
continue;
160-
}
161-
};
162-
163-
match segment.persist(&base_path).await {
164-
Ok(()) => {
165-
tracing::trace!(
166-
"Background worker completed saving header segment {}",
167-
segment_id
168-
);
169-
}
170-
Err(e) => {
171-
eprintln!("Failed to save segment {}: {}", segment_id, e);
172-
}
173-
}
174-
}
175-
WorkerCommand::SaveFilterHeaderSegmentCache {
176-
segment_id,
177-
} => {
178-
let mut cache = filter_headers.write().await;
179-
let segment = match cache.get_segment_mut(&segment_id).await {
180-
Ok(segment) => segment,
181-
Err(e) => {
182-
eprintln!("Failed to get segment {}: {}", segment_id, e);
183-
continue;
184-
}
185-
};
186-
187-
match segment.persist(&base_path).await {
188-
Ok(()) => {
189-
tracing::trace!(
190-
"Background worker completed saving header segment {}",
191-
segment_id
192-
);
193-
}
194-
Err(e) => {
195-
eprintln!("Failed to save segment {}: {}", segment_id, e);
196-
}
197-
}
198-
}
199-
WorkerCommand::SaveFilterSegmentCache {
200-
segment_id,
201-
} => {
202-
let mut cache = cfilters.write().await;
203-
let segment = match cache.get_segment_mut(&segment_id).await {
204-
Ok(segment) => segment,
205-
Err(e) => {
206-
eprintln!("Failed to get segment {}: {}", segment_id, e);
207-
continue;
208-
}
209-
};
210-
211-
match segment.persist(&base_path).await {
212-
Ok(()) => {
213-
tracing::trace!(
214-
"Background worker completed saving filter segment {}",
215-
segment_id
216-
);
217-
}
218-
Err(e) => {
219-
eprintln!("Failed to save segment {}: {}", segment_id, e);
220-
}
221-
}
222-
}
223-
WorkerCommand::SaveIndex {
224-
index,
225-
} => {
226-
let path = worker_base_path.join("headers/index.dat");
227-
if let Err(e) = super::headers::save_index_to_disk(&path, &index).await {
228-
eprintln!("Failed to save index: {}", e);
229-
} else {
230-
tracing::trace!("Background worker completed saving index");
231-
}
232-
}
233-
WorkerCommand::Shutdown => {
234-
break;
235-
}
236-
}
122+
let mut ticker = tokio::time::interval(Duration::from_secs(5));
123+
124+
loop {
125+
ticker.tick().await;
126+
127+
block_headers.write().await.persist_evicted().await;
128+
filter_headers.write().await.persist_evicted().await;
129+
filters.write().await.persist_evicted().await;
237130
}
238131
});
239132

240-
self.worker_tx = Some(worker_tx);
241133
self.worker_handle = Some(worker_handle);
242134
}
243135

244136
/// Stop the background worker without forcing a save.
245-
pub(super) async fn stop_worker(&mut self) {
246-
if let Some(tx) = self.worker_tx.take() {
247-
let _ = tx.send(WorkerCommand::Shutdown).await;
248-
}
137+
pub(super) fn stop_worker(&mut self) {
249138
if let Some(handle) = self.worker_handle.take() {
250-
let _ = handle.await;
139+
handle.abort();
251140
}
252141
}
253142
}

0 commit comments

Comments
 (0)