Skip to content

Commit e364976

Browse files
committed
refactor: move manager initialization from trait method to constructors
- Moves initialization logic out of the `SyncManager::initialize()` trait method and into each manager's constructor. - Genesis block is now stored before managers are created, so they can read the tip during construction. - The `SyncCoordinator` seeds initial progress from actual manager state instead of default and uses `WatchStream::from_changes` to avoid re-emitting all initial progresses.
1 parent 7cf2284 commit e364976

File tree

15 files changed

+226
-223
lines changed

15 files changed

+226
-223
lines changed

dash-spv/ARCHITECTURE.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,6 @@ pub trait SyncManager: Send + Sync + Debug {
10131013
fn state(&self) -> SyncState;
10141014
fn wanted_message_types(&self) -> &'static [MessageType];
10151015

1016-
async fn initialize(&mut self) -> SyncResult<()>;
10171016
async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>>;
10181017
async fn handle_message(&mut self, msg: Message, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>>;
10191018
async fn handle_sync_event(&mut self, event: &SyncEvent, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>>;

dash-spv/src/client/lifecycle.rs

Lines changed: 86 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
3636
pub async fn new(
3737
config: ClientConfig,
3838
network: N,
39-
storage: S,
39+
mut storage: S,
4040
wallet: Arc<RwLock<W>>,
4141
) -> Result<Self> {
4242
// Validate configuration
4343
config.validate().map_err(SpvError::Config)?;
4444

45+
// Initialize genesis block or checkpoint before creating managers,
46+
// so they can read the tip from storage during construction.
47+
Self::initialize_genesis_block(&config, &mut storage).await?;
48+
4549
let masternode_engine = {
4650
if config.enable_masternodes {
4751
Some(Arc::new(RwLock::new(MasternodeListEngine::default_for_network(
@@ -68,49 +72,60 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
6872
};
6973
let checkpoint_manager = Arc::new(CheckpointManager::new(checkpoints));
7074
managers.block_headers =
71-
Some(BlockHeadersManager::new(storage.block_headers(), checkpoint_manager));
75+
Some(BlockHeadersManager::new(storage.block_headers(), checkpoint_manager).await?);
7276

7377
if config.enable_filters {
74-
managers.filter_headers =
75-
Some(FilterHeadersManager::new(storage.block_headers(), storage.filter_headers()));
76-
managers.filters = Some(FiltersManager::new(
77-
wallet.clone(),
78-
storage.block_headers(),
79-
storage.filter_headers(),
80-
storage.filters(),
81-
));
82-
managers.blocks =
83-
Some(BlocksManager::new(wallet.clone(), storage.block_headers(), storage.blocks()));
78+
managers.filter_headers = Some(
79+
FilterHeadersManager::new(storage.block_headers(), storage.filter_headers())
80+
.await?,
81+
);
82+
managers.filters = Some(
83+
FiltersManager::new(
84+
wallet.clone(),
85+
storage.block_headers(),
86+
storage.filter_headers(),
87+
storage.filters(),
88+
)
89+
.await,
90+
);
91+
managers.blocks = Some(
92+
BlocksManager::new(wallet.clone(), storage.block_headers(), storage.blocks()).await,
93+
);
8494
}
8595

8696
// Build masternode manager if enabled
8797
if config.enable_masternodes {
8898
let masternode_list_engine = masternode_engine
8999
.clone()
90100
.expect("Masternode list engine must exist if masternodes are enabled");
91-
managers.masternode = Some(MasternodesManager::new(
92-
storage.block_headers(),
93-
masternode_list_engine.clone(),
94-
config.network,
95-
));
96-
managers.chainlock = Some(ChainLockManager::new(
97-
storage.block_headers(),
98-
storage.metadata(),
99-
masternode_list_engine.clone(),
100-
));
101+
managers.masternode = Some(
102+
MasternodesManager::new(
103+
storage.block_headers(),
104+
masternode_list_engine.clone(),
105+
config.network,
106+
)
107+
.await,
108+
);
109+
managers.chainlock = Some(
110+
ChainLockManager::new(
111+
storage.block_headers(),
112+
storage.metadata(),
113+
masternode_list_engine.clone(),
114+
)
115+
.await,
116+
);
101117
managers.instantsend = Some(InstantSendManager::new(masternode_list_engine.clone()));
102118
}
103119

104-
// Create sync coordinator (managers are passed to start() later)
105-
let sync_coordinator = SyncCoordinator::new(managers);
120+
let sync_coordinator = SyncCoordinator::new(managers).await;
106121

107122
// Create mempool state
108123
let mempool_state = Arc::new(RwLock::new(MempoolState::default()));
109124

110125
// Wrap storage in Arc<Mutex>
111126
let storage = Arc::new(Mutex::new(storage));
112127

113-
Ok(Self {
128+
let client = Self {
114129
config: Arc::new(RwLock::new(config)),
115130
network: Arc::new(Mutex::new(network)),
116131
storage,
@@ -120,56 +135,51 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
120135
running: Arc::new(RwLock::new(false)),
121136
mempool_state,
122137
mempool_filter: Arc::new(RwLock::new(None)),
123-
})
124-
}
125-
126-
/// Start the SPV client.
127-
pub(super) async fn start(&self) -> Result<()> {
128-
{
129-
let running = self.running.read().await;
130-
if *running {
131-
return Err(SpvError::Config("Client already running".to_string()));
132-
}
133-
}
138+
};
134139

135140
// Load wallet data from storage
136-
self.load_wallet_data().await?;
137-
138-
let config = self.config.read().await;
141+
client.load_wallet_data().await?;
139142

140143
// Initialize mempool filter if mempool tracking is enabled
141-
if config.enable_mempool_tracking {
142-
// TODO: Get monitored addresses from wallet
143-
let filter = Arc::new(MempoolFilter::new(
144-
config.mempool_strategy,
145-
config.max_mempool_transactions,
146-
self.mempool_state.clone(),
147-
HashSet::new(), // Will be populated from wallet's monitored addresses
148-
config.network,
149-
));
150-
151-
*self.mempool_filter.write().await = Some(filter);
152-
153-
// Load mempool state from storage if persistence is enabled
154-
if config.persist_mempool {
155-
if let Some(state) = self
156-
.storage
157-
.lock()
158-
.await
159-
.load_mempool_state()
160-
.await
161-
.map_err(SpvError::Storage)?
162-
{
163-
*self.mempool_state.write().await = state;
144+
{
145+
let config = client.config.read().await;
146+
if config.enable_mempool_tracking {
147+
let filter = Arc::new(MempoolFilter::new(
148+
config.mempool_strategy,
149+
config.max_mempool_transactions,
150+
client.mempool_state.clone(),
151+
HashSet::new(), // TODO: populate from wallet's monitored addresses
152+
config.network,
153+
));
154+
*client.mempool_filter.write().await = Some(filter);
155+
156+
// Load mempool state from storage if persistence is enabled
157+
if config.persist_mempool {
158+
if let Some(state) = client
159+
.storage
160+
.lock()
161+
.await
162+
.load_mempool_state()
163+
.await
164+
.map_err(SpvError::Storage)?
165+
{
166+
*client.mempool_state.write().await = state;
167+
}
164168
}
165169
}
166170
}
167171

168-
// Drop config before calling methods that also read it
169-
drop(config);
172+
Ok(client)
173+
}
170174

171-
// Initialize genesis block if not already present
172-
self.initialize_genesis_block().await?;
175+
/// Start the SPV client: spawn sync tasks and connect to the network.
176+
pub(super) async fn start(&self) -> Result<()> {
177+
{
178+
let running = self.running.read().await;
179+
if *running {
180+
return Err(SpvError::Config("Client already running".to_string()));
181+
}
182+
}
173183

174184
// Start the storage background worker for periodic persistence
175185
self.storage.lock().await.start().await;
@@ -233,15 +243,12 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
233243
self.stop().await
234244
}
235245

236-
/// Initialize genesis block or checkpoint.
237-
pub(super) async fn initialize_genesis_block(&self) -> Result<()> {
238-
let config = self.config.read().await;
239-
246+
/// Initialize genesis block or checkpoint in storage.
247+
///
248+
/// Called before creating managers so they can read the tip during construction.
249+
async fn initialize_genesis_block(config: &ClientConfig, storage: &mut S) -> Result<()> {
240250
// Check if we already have any headers in storage
241-
let current_tip = {
242-
let storage = self.storage.lock().await;
243-
storage.get_tip_height().await
244-
};
251+
let current_tip = storage.get_tip_height().await;
245252

246253
if current_tip.is_some() {
247254
// We already have headers, genesis block should be at height 0
@@ -301,12 +308,9 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
301308
calculated_hash
302309
);
303310
} else {
304-
{
305-
let mut storage = self.storage.lock().await;
306-
storage
307-
.store_headers_at_height(&[checkpoint_header], checkpoint.height)
308-
.await?;
309-
}
311+
storage
312+
.store_headers_at_height(&[checkpoint_header], checkpoint.height)
313+
.await?;
310314

311315
tracing::info!(
312316
"✅ Initialized from checkpoint at height {}, skipping {} headers",
@@ -346,17 +350,10 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
346350
tracing::debug!("Using genesis block header with hash: {}", calculated_hash);
347351

348352
// Store the genesis header at height 0
349-
let genesis_headers = vec![genesis_header];
350-
{
351-
let mut storage = self.storage.lock().await;
352-
storage.store_headers(&genesis_headers).await.map_err(SpvError::Storage)?;
353-
}
353+
storage.store_headers(&[genesis_header]).await.map_err(SpvError::Storage)?;
354354

355355
// Verify it was stored correctly
356-
let stored_height = {
357-
let storage = self.storage.lock().await;
358-
storage.get_tip_height().await
359-
};
356+
let stored_height = storage.get_tip_height().await;
360357
tracing::info!(
361358
"✅ Genesis block initialized at height 0, storage reports tip height: {:?}",
362359
stored_height

dash-spv/src/sync/block_headers/manager.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,30 @@ impl<H: BlockHeaderStorage> std::fmt::Debug for BlockHeadersManager<H> {
5252

5353
impl<H: BlockHeaderStorage> BlockHeadersManager<H> {
5454
/// Create a new headers manager with the given storage and checkpoint manager.
55-
pub fn new(header_storage: Arc<RwLock<H>>, checkpoint_manager: Arc<CheckpointManager>) -> Self {
56-
Self {
57-
progress: BlockHeadersProgress::default(),
55+
pub async fn new(
56+
header_storage: Arc<RwLock<H>>,
57+
checkpoint_manager: Arc<CheckpointManager>,
58+
) -> SyncResult<Self> {
59+
let tip = header_storage
60+
.read()
61+
.await
62+
.get_tip()
63+
.await
64+
.ok_or_else(|| SyncError::MissingDependency("No tip in storage".to_string()))?;
65+
66+
let mut initial_progress = BlockHeadersProgress::default();
67+
initial_progress.set_state(SyncState::WaitingForConnections);
68+
initial_progress.update_tip_height(tip.height());
69+
initial_progress.update_target_height(tip.height());
70+
71+
tracing::info!("BlockHeadersManager initialized at height {}", tip.height());
72+
73+
Ok(Self {
74+
progress: initial_progress,
5875
header_storage,
59-
pipeline: HeadersPipeline::new(checkpoint_manager.clone()),
76+
pipeline: HeadersPipeline::new(checkpoint_manager),
6077
pending_announcements: HashMap::new(),
61-
}
78+
})
6279
}
6380

6481
pub(super) async fn tip(&self) -> SyncResult<BlockHeaderTip> {
@@ -227,16 +244,21 @@ mod tests {
227244
}
228245

229246
async fn create_test_manager() -> TestBlockHeadersManager {
230-
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
247+
let mut storage = DiskStorageManager::with_temp_dir().await.unwrap();
248+
// Store a genesis header so the manager can initialize
249+
let genesis = Header::dummy_batch(0..1);
250+
storage.store_headers(&genesis).await.unwrap();
231251
let checkpoint_manager = create_test_checkpoint_manager();
232252
BlockHeadersManager::new(storage.block_headers(), checkpoint_manager)
253+
.await
254+
.expect("Failed to create BlockHeadersManager")
233255
}
234256

235257
#[tokio::test]
236258
async fn test_block_headers_manager_new() {
237259
let manager = create_test_manager().await;
238260
assert_eq!(manager.identifier(), ManagerIdentifier::BlockHeader);
239-
assert_eq!(manager.state(), SyncState::WaitForEvents);
261+
assert_eq!(manager.state(), SyncState::WaitingForConnections);
240262
assert_eq!(manager.wanted_message_types(), vec![MessageType::Headers, MessageType::Inv]);
241263
}
242264

@@ -249,7 +271,7 @@ mod tests {
249271

250272
let progress = manager.progress();
251273
if let SyncManagerProgress::BlockHeaders(progress) = progress {
252-
assert_eq!(progress.state(), SyncState::WaitForEvents);
274+
assert_eq!(progress.state(), SyncState::WaitingForConnections);
253275
assert_eq!(progress.tip_height(), 100);
254276
assert_eq!(progress.target_height(), 200);
255277
assert_eq!(progress.processed(), 50);

dash-spv/src/sync/block_headers/sync_manager.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use crate::sync::{
66
BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager,
77
SyncManagerProgress, SyncState,
88
};
9-
use crate::SyncError;
109
use async_trait::async_trait;
1110
use dashcore::network::message::NetworkMessage;
1211
use dashcore::BlockHash;
@@ -37,24 +36,6 @@ impl<H: BlockHeaderStorage> SyncManager for BlockHeadersManager<H> {
3736
&[MessageType::Headers, MessageType::Inv]
3837
}
3938

40-
async fn initialize(&mut self) -> SyncResult<()> {
41-
let tip = self
42-
.header_storage
43-
.read()
44-
.await
45-
.get_tip()
46-
.await
47-
.ok_or_else(|| SyncError::MissingDependency("No tip in storage".to_string()))?;
48-
49-
self.progress.set_state(SyncState::WaitingForConnections);
50-
self.progress.update_tip_height(tip.height());
51-
self.progress.update_target_height(tip.height());
52-
53-
tracing::info!("BlockHeadersManager initialized at height {}", tip.height());
54-
55-
Ok(())
56-
}
57-
5839
async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
5940
ensure_not_started(self.state(), self.identifier())?;
6041
self.progress.set_state(SyncState::Syncing);

dash-spv/src/sync/blocks/manager.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,18 @@ pub struct BlocksManager<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterf
4343

4444
impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface> BlocksManager<H, B, W> {
4545
/// Create a new blocks manager with the given storage references.
46-
pub fn new(
46+
pub async fn new(
4747
wallet: Arc<RwLock<W>>,
4848
header_storage: Arc<RwLock<H>>,
4949
block_storage: Arc<RwLock<B>>,
5050
) -> Self {
51+
let synced_height = wallet.read().await.synced_height();
52+
53+
let mut initial_progress = BlocksProgress::default();
54+
initial_progress.update_last_processed(synced_height);
55+
5156
Self {
52-
progress: BlocksProgress::default(),
57+
progress: initial_progress,
5358
header_storage,
5459
block_storage,
5560
wallet,
@@ -170,7 +175,7 @@ mod tests {
170175
async fn create_test_manager() -> TestBlocksManager {
171176
let storage = DiskStorageManager::with_temp_dir().await.unwrap();
172177
let wallet = Arc::new(RwLock::new(MockWallet::new()));
173-
BlocksManager::new(wallet, storage.block_headers(), storage.blocks())
178+
BlocksManager::new(wallet, storage.block_headers(), storage.blocks()).await
174179
}
175180

176181
#[tokio::test]

0 commit comments

Comments
 (0)