Skip to content

Commit 189a66d

Browse files
authored
refactor: storage manager trait splitted into multiple subtraits (#311)
* traits created * the disk storage manager worker is now a time based check, removed old command communciation * tests updated * replaced header_at_height * removed unused methods * init_from_checkpoint sync * removed two methos that where invovled in the same process * fixed clippy warnings * dropped unuseed code * everything moved where I want it to be * general structure made * persist segments caches now requires the directory where the user wants to write the data * using rwlock to allow segmentcache mutability behind inmutable ref * clear method fixed * default method implementations in storage traits * storage manager trait implemented * fixed code to pass the tests * storage documentation updated * rebase conflicts resolved * masternodestate storage was not being persisted following the pattern other storages do * replaced write() locks where a read() can be used
1 parent 42cd94e commit 189a66d

32 files changed

+1265
-1170
lines changed

dash-spv/benches/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::time::Duration;
22

33
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
44
use dash_spv::{
5-
storage::{DiskStorageManager, StorageManager},
5+
storage::{BlockHeaderStorage, DiskStorageManager, StorageManager},
66
Hash,
77
};
88
use dashcore::{block::Version, BlockHash, CompactTarget, Header};

dash-spv/examples/filter_sync.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2828
let network_manager = PeerNetworkManager::new(&config).await?;
2929

3030
// Create storage manager
31-
let storage_manager =
32-
DiskStorageManager::new("./.tmp/filter-sync-example-storage".into()).await?;
31+
let storage_manager = DiskStorageManager::new("./.tmp/filter-sync-example-storage").await?;
3332

3433
// Create wallet manager
3534
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

dash-spv/examples/simple_sync.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2424
let network_manager = PeerNetworkManager::new(&config).await?;
2525

2626
// Create storage manager
27-
let storage_manager =
28-
DiskStorageManager::new("./.tmp/simple-sync-example-storage".into()).await?;
27+
let storage_manager = DiskStorageManager::new("./.tmp/simple-sync-example-storage").await?;
2928

3029
// Create wallet manager
3130
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

dash-spv/examples/spv_with_wallet.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2626
let network_manager = PeerNetworkManager::new(&config).await?;
2727

2828
// Create storage manager - use disk storage for persistence
29-
let storage_manager =
30-
DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage".into()).await?;
29+
let storage_manager = DiskStorageManager::new("./.tmp/spv-with-wallet-example-storage").await?;
3130

3231
// Create wallet manager
3332
let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));

dash-spv/src/client/block_processor_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
mod tests {
55
use crate::client::block_processor::{BlockProcessingTask, BlockProcessor};
66

7-
use crate::storage::DiskStorageManager;
7+
use crate::storage::{BlockHeaderStorage, DiskStorageManager};
88
use crate::types::{SpvEvent, SpvStats};
99
use dashcore::{blockdata::constants::genesis_block, Block, Network, Transaction};
1010

dash-spv/src/client/lifecycle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
219219
// Shutdown storage to ensure all data is persisted
220220
{
221221
let mut storage = self.storage.lock().await;
222-
storage.shutdown().await.map_err(SpvError::Storage)?;
222+
storage.shutdown().await;
223223
tracing::info!("Storage shutdown completed - all data persisted");
224224
}
225225

dash-spv/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
//!
3131
//! // Create the required components
3232
//! let network = PeerNetworkManager::new(&config).await?;
33-
//! let storage = DiskStorageManager::new("./.tmp/example-storage".into()).await?;
33+
//! let storage = DiskStorageManager::new("./.tmp/example-storage").await?;
3434
//! let wallet = Arc::new(RwLock::new(WalletManager::<ManagedWalletInfo>::new(config.network)));
3535
//!
3636
//! // Create and start the client

dash-spv/src/storage/blocks.rs

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
//! Header storage operations for DiskStorageManager.
2+
3+
use std::collections::HashMap;
4+
use std::ops::Range;
5+
use std::path::PathBuf;
6+
7+
use async_trait::async_trait;
8+
use dashcore::block::Header as BlockHeader;
9+
use dashcore::BlockHash;
10+
use tokio::sync::RwLock;
11+
12+
use crate::error::StorageResult;
13+
use crate::storage::io::atomic_write;
14+
use crate::storage::segments::SegmentCache;
15+
use crate::storage::PersistentStorage;
16+
use crate::StorageError;
17+
18+
#[async_trait]
19+
pub trait BlockHeaderStorage {
20+
async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()>;
21+
22+
async fn store_headers_at_height(
23+
&mut self,
24+
headers: &[BlockHeader],
25+
height: u32,
26+
) -> StorageResult<()>;
27+
28+
async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>>;
29+
30+
async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
31+
if let Some(tip_height) = self.get_tip_height().await {
32+
if height > tip_height {
33+
return Ok(None);
34+
}
35+
} else {
36+
return Ok(None);
37+
}
38+
39+
if let Some(start_height) = self.get_start_height().await {
40+
if height < start_height {
41+
return Ok(None);
42+
}
43+
} else {
44+
return Ok(None);
45+
}
46+
47+
Ok(self.load_headers(height..height + 1).await?.first().copied())
48+
}
49+
50+
async fn get_tip_height(&self) -> Option<u32>;
51+
52+
async fn get_start_height(&self) -> Option<u32>;
53+
54+
async fn get_stored_headers_len(&self) -> u32;
55+
56+
async fn get_header_height_by_hash(
57+
&self,
58+
hash: &dashcore::BlockHash,
59+
) -> StorageResult<Option<u32>>;
60+
}
61+
62+
pub struct PersistentBlockHeaderStorage {
63+
block_headers: RwLock<SegmentCache<BlockHeader>>,
64+
header_hash_index: HashMap<BlockHash, u32>,
65+
}
66+
67+
impl PersistentBlockHeaderStorage {
68+
const FOLDER_NAME: &str = "block_headers";
69+
const INDEX_FILE_NAME: &str = "index.dat";
70+
}
71+
72+
#[async_trait]
73+
impl PersistentStorage for PersistentBlockHeaderStorage {
74+
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
75+
let storage_path = storage_path.into();
76+
let segments_folder = storage_path.join(Self::FOLDER_NAME);
77+
78+
let index_path = segments_folder.join(Self::INDEX_FILE_NAME);
79+
80+
let mut block_headers = SegmentCache::load_or_new(&segments_folder).await?;
81+
82+
let header_hash_index = match tokio::fs::read(&index_path)
83+
.await
84+
.ok()
85+
.and_then(|content| bincode::deserialize(&content).ok())
86+
{
87+
Some(index) => index,
88+
_ => {
89+
if segments_folder.exists() {
90+
block_headers.build_block_index_from_segments().await?
91+
} else {
92+
HashMap::new()
93+
}
94+
}
95+
};
96+
97+
Ok(Self {
98+
block_headers: RwLock::new(block_headers),
99+
header_hash_index,
100+
})
101+
}
102+
103+
async fn persist(&mut self, storage_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
104+
let block_headers_folder = storage_path.into().join(Self::FOLDER_NAME);
105+
let index_path = block_headers_folder.join(Self::INDEX_FILE_NAME);
106+
107+
tokio::fs::create_dir_all(&block_headers_folder).await?;
108+
109+
self.block_headers.write().await.persist(&block_headers_folder).await;
110+
111+
let data = bincode::serialize(&self.header_hash_index)
112+
.map_err(|e| StorageError::WriteFailed(format!("Failed to serialize index: {}", e)))?;
113+
114+
atomic_write(&index_path, &data).await
115+
}
116+
}
117+
118+
#[async_trait]
119+
impl BlockHeaderStorage for PersistentBlockHeaderStorage {
120+
async fn store_headers(&mut self, headers: &[BlockHeader]) -> StorageResult<()> {
121+
let height = self.block_headers.read().await.next_height();
122+
self.store_headers_at_height(headers, height).await
123+
}
124+
125+
async fn store_headers_at_height(
126+
&mut self,
127+
headers: &[BlockHeader],
128+
height: u32,
129+
) -> StorageResult<()> {
130+
let mut height = height;
131+
132+
let hashes = headers.iter().map(|header| header.block_hash()).collect::<Vec<_>>();
133+
134+
self.block_headers.write().await.store_items_at_height(headers, height).await?;
135+
136+
for hash in hashes {
137+
self.header_hash_index.insert(hash, height);
138+
height += 1;
139+
}
140+
141+
Ok(())
142+
}
143+
144+
async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>> {
145+
self.block_headers.write().await.get_items(range).await
146+
}
147+
148+
async fn get_tip_height(&self) -> Option<u32> {
149+
self.block_headers.read().await.tip_height()
150+
}
151+
152+
async fn get_start_height(&self) -> Option<u32> {
153+
self.block_headers.read().await.start_height()
154+
}
155+
156+
async fn get_stored_headers_len(&self) -> u32 {
157+
let block_headers = self.block_headers.read().await;
158+
159+
let start_height = if let Some(start_height) = block_headers.start_height() {
160+
start_height
161+
} else {
162+
return 0;
163+
};
164+
165+
let end_height = if let Some(end_height) = block_headers.tip_height() {
166+
end_height
167+
} else {
168+
return 0;
169+
};
170+
171+
end_height - start_height + 1
172+
}
173+
174+
async fn get_header_height_by_hash(
175+
&self,
176+
hash: &dashcore::BlockHash,
177+
) -> StorageResult<Option<u32>> {
178+
Ok(self.header_hash_index.get(hash).copied())
179+
}
180+
}

dash-spv/src/storage/chainstate.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::path::PathBuf;
2+
3+
use async_trait::async_trait;
4+
5+
use crate::{
6+
error::StorageResult,
7+
storage::{io::atomic_write, PersistentStorage},
8+
ChainState,
9+
};
10+
11+
#[async_trait]
12+
pub trait ChainStateStorage {
13+
async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()>;
14+
15+
async fn load_chain_state(&self) -> StorageResult<Option<ChainState>>;
16+
}
17+
18+
pub struct PersistentChainStateStorage {
19+
storage_path: PathBuf,
20+
}
21+
22+
impl PersistentChainStateStorage {
23+
const FOLDER_NAME: &str = "chainstate";
24+
const FILE_NAME: &str = "chainstate.json";
25+
}
26+
27+
#[async_trait]
28+
impl PersistentStorage for PersistentChainStateStorage {
29+
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
30+
Ok(PersistentChainStateStorage {
31+
storage_path: storage_path.into(),
32+
})
33+
}
34+
35+
async fn persist(&mut self, _storage_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
36+
// Current implementation persists data everytime data is stored
37+
Ok(())
38+
}
39+
}
40+
41+
#[async_trait]
42+
impl ChainStateStorage for PersistentChainStateStorage {
43+
async fn store_chain_state(&mut self, state: &ChainState) -> StorageResult<()> {
44+
let state_data = serde_json::json!({
45+
"last_chainlock_height": state.last_chainlock_height,
46+
"last_chainlock_hash": state.last_chainlock_hash,
47+
"current_filter_tip": state.current_filter_tip,
48+
"last_masternode_diff_height": state.last_masternode_diff_height,
49+
"sync_base_height": state.sync_base_height,
50+
});
51+
52+
let chainstate_folder = self.storage_path.join(Self::FOLDER_NAME);
53+
let path = chainstate_folder.join(Self::FILE_NAME);
54+
55+
tokio::fs::create_dir_all(chainstate_folder).await?;
56+
57+
let json = state_data.to_string();
58+
atomic_write(&path, json.as_bytes()).await?;
59+
60+
Ok(())
61+
}
62+
63+
async fn load_chain_state(&self) -> StorageResult<Option<ChainState>> {
64+
let path = self.storage_path.join(Self::FOLDER_NAME).join(Self::FILE_NAME);
65+
if !path.exists() {
66+
return Ok(None);
67+
}
68+
69+
let content = tokio::fs::read_to_string(path).await?;
70+
let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| {
71+
crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e))
72+
})?;
73+
74+
let state = ChainState {
75+
last_chainlock_height: value
76+
.get("last_chainlock_height")
77+
.and_then(|v| v.as_u64())
78+
.map(|h| h as u32),
79+
last_chainlock_hash: value
80+
.get("last_chainlock_hash")
81+
.and_then(|v| v.as_str())
82+
.and_then(|s| s.parse().ok()),
83+
current_filter_tip: value
84+
.get("current_filter_tip")
85+
.and_then(|v| v.as_str())
86+
.and_then(|s| s.parse().ok()),
87+
masternode_engine: None,
88+
last_masternode_diff_height: value
89+
.get("last_masternode_diff_height")
90+
.and_then(|v| v.as_u64())
91+
.map(|h| h as u32),
92+
sync_base_height: value
93+
.get("sync_base_height")
94+
.and_then(|v| v.as_u64())
95+
.map(|h| h as u32)
96+
.unwrap_or(0),
97+
};
98+
99+
Ok(Some(state))
100+
}
101+
}

0 commit comments

Comments
 (0)