Skip to content

Commit ad7a6e8

Browse files
authored
refactor: Less cloning in SPV message handling (#268)
We currently clone all network messages in `DashSpvClient::handle_network_message` when passed to `MessageHandler::handle_network_message`. Also some get cloned further down the track again. So this PR changes it to pass references.
1 parent 65787e8 commit ad7a6e8

File tree

8 files changed

+46
-55
lines changed

8 files changed

+46
-55
lines changed

dash-spv/src/client/message_handler.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ impl<
6060
/// Handle incoming network messages during monitoring.
6161
pub async fn handle_network_message(
6262
&mut self,
63-
message: dashcore::network::message::NetworkMessage,
63+
message: &dashcore::network::message::NetworkMessage,
6464
) -> Result<()> {
6565
use dashcore::network::message::NetworkMessage;
6666

67-
tracing::debug!("Client handling network message: {:?}", std::mem::discriminant(&message));
67+
tracing::debug!("Client handling network message: {:?}", std::mem::discriminant(message));
6868

6969
// First check if this is a message that ONLY the sync manager handles
7070
// These messages can be moved to the sync manager without cloning
@@ -147,17 +147,11 @@ impl<
147147
SpvError::Sync(e)
148148
});
149149
}
150-
_ => {}
151-
}
152-
153-
// Handle messages that may need sync manager processing
154-
// We optimize to avoid cloning expensive messages like blocks
155-
match &message {
156150
NetworkMessage::Headers(_) | NetworkMessage::CFilter(_) => {
157151
// Headers and CFilters are relatively small, cloning is acceptable
158152
if let Err(e) = self
159153
.sync_manager
160-
.handle_message(message.clone(), &mut *self.network, &mut *self.storage)
154+
.handle_message(message, &mut *self.network, &mut *self.storage)
161155
.await
162156
{
163157
tracing::error!("Sequential sync manager error handling message: {}", e);
@@ -183,13 +177,10 @@ impl<
183177
}
184178
}
185179
NetworkMessage::Block(_) => {
186-
// Blocks can be large - avoid cloning unless necessary
187-
// Check if sync manager actually needs to process this block
188180
if self.sync_manager.is_in_downloading_blocks_phase() {
189-
// Only clone if we're in the downloading blocks phase
190181
if let Err(e) = self
191182
.sync_manager
192-
.handle_message(message.clone(), &mut *self.network, &mut *self.storage)
183+
.handle_message(message, &mut *self.network, &mut *self.storage)
193184
.await
194185
{
195186
tracing::error!(
@@ -241,7 +232,7 @@ impl<
241232
let headers_msg = NetworkMessage::Headers(vec![block.header]);
242233
if let Err(e) = self
243234
.sync_manager
244-
.handle_message(headers_msg, &mut *self.network, &mut *self.storage)
235+
.handle_message(&headers_msg, &mut *self.network, &mut *self.storage)
245236
.await
246237
{
247238
tracing::error!(
@@ -261,7 +252,7 @@ impl<
261252
NetworkMessage::Inv(inv) => {
262253
tracing::debug!("Received inventory message with {} items", inv.len());
263254
// Handle inventory messages (new blocks, transactions, etc.)
264-
self.handle_inventory(inv).await?;
255+
self.handle_inventory(inv.clone()).await?;
265256
}
266257
NetworkMessage::Tx(tx) => {
267258
tracing::info!("📨 Received transaction: {}", tx.txid());
@@ -293,7 +284,7 @@ impl<
293284
// Emit event
294285
let event = SpvEvent::MempoolTransactionAdded {
295286
txid,
296-
transaction: Box::new(tx),
287+
transaction: Box::new(tx.clone()),
297288
amount,
298289
addresses,
299290
is_instant_send,
@@ -339,7 +330,7 @@ impl<
339330
NetworkMessage::SendDsq(wants_dsq) => {
340331
tracing::info!("Received SendDsq message - peer wants DSQ messages: {}", wants_dsq);
341332
// Store peer's DSQ preference
342-
if let Err(e) = self.network.update_peer_dsq_preference(wants_dsq).await {
333+
if let Err(e) = self.network.update_peer_dsq_preference(*wants_dsq).await {
343334
tracing::error!("Failed to update peer DSQ preference: {}", e);
344335
}
345336

@@ -351,7 +342,7 @@ impl<
351342
}
352343
_ => {
353344
// Ignore other message types for now
354-
tracing::debug!("Received network message: {:?}", std::mem::discriminant(&message));
345+
tracing::debug!("Received network message: {:?}", std::mem::discriminant(message));
355346
}
356347
}
357348

@@ -468,7 +459,7 @@ impl<
468459
// We just need to send them through the unified message interface
469460
let headers_msg = dashcore::network::message::NetworkMessage::Headers(headers);
470461
self.sync_manager
471-
.handle_message(headers_msg, &mut *self.network, &mut *self.storage)
462+
.handle_message(&headers_msg, &mut *self.network, &mut *self.storage)
472463
.await
473464
.map_err(SpvError::Sync)?;
474465

@@ -506,7 +497,7 @@ impl<
506497
// For sequential sync, route through the message handler
507498
let cfheaders_msg = dashcore::network::message::NetworkMessage::CFHeaders(cfheaders);
508499
self.sync_manager
509-
.handle_message(cfheaders_msg, &mut *self.network, &mut *self.storage)
500+
.handle_message(&cfheaders_msg, &mut *self.network, &mut *self.storage)
510501
.await
511502
.map_err(SpvError::Sync)?;
512503

@@ -520,15 +511,15 @@ impl<
520511
}
521512

522513
/// Process a new block.
523-
pub async fn process_new_block(&mut self, block: dashcore::Block) -> Result<()> {
514+
pub async fn process_new_block(&mut self, block: &dashcore::Block) -> Result<()> {
524515
let block_hash = block.block_hash();
525516

526517
tracing::info!("📦 Routing block {} to async block processor", block_hash);
527518

528519
// Send block to the background processor without waiting for completion
529520
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
530521
let task = crate::client::BlockProcessingTask::ProcessBlock {
531-
block: Box::new(block),
522+
block: Box::new(block.clone()),
532523
response_tx,
533524
};
534525

@@ -564,7 +555,7 @@ impl<
564555
// The sequential sync manager's handle_new_headers method will automatically
565556
// request filter headers and filters as needed
566557
self.sync_manager
567-
.handle_new_headers(headers.to_vec(), &mut *self.network, &mut *self.storage)
558+
.handle_new_headers(headers, &mut *self.network, &mut *self.storage)
568559
.await
569560
.map_err(SpvError::Sync)?;
570561

dash-spv/src/client/sync_coordinator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ impl<
633633
);
634634

635635
// Delegate message handling to the MessageHandler
636-
handler.handle_network_message(message.clone()).await
636+
handler.handle_network_message(&message).await
637637
};
638638

639639
// Handle result and process special messages after releasing storage lock

dash-spv/src/sync/filters/download.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
246246

247247
pub async fn store_filter_headers(
248248
&mut self,
249-
cfheaders: dashcore::network::message_filter::CFHeaders,
249+
cfheaders: &dashcore::network::message_filter::CFHeaders,
250250
storage: &mut S,
251251
) -> SyncResult<()> {
252252
if cfheaders.filter_hashes.is_empty() {
@@ -256,7 +256,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
256256

257257
// Get the height range for this batch
258258
let (start_height, stop_height, _header_tip_height) =
259-
self.get_batch_height_range(&cfheaders, storage).await?;
259+
self.get_batch_height_range(cfheaders, storage).await?;
260260

261261
tracing::info!(
262262
"Received {} filter headers from height {} to {}",
@@ -308,7 +308,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
308308
// Use the handle_overlapping_headers method which properly handles the chain continuity
309309
let expected_start = current_filter_tip + 1;
310310

311-
match self.handle_overlapping_headers(&cfheaders, expected_start, storage).await {
311+
match self.handle_overlapping_headers(cfheaders, expected_start, storage).await {
312312
Ok((stored_count, _)) => {
313313
if stored_count > 0 {
314314
tracing::info!("✅ Successfully handled overlapping filter headers");
@@ -328,7 +328,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
328328
}
329329
} else {
330330
// Process the filter headers to convert them to the proper format
331-
match self.process_filter_headers(&cfheaders, start_height, storage).await {
331+
match self.process_filter_headers(cfheaders, start_height, storage).await {
332332
Ok(new_filter_headers) => {
333333
if !new_filter_headers.is_empty() {
334334
// If this is the first batch (starting at height 1), store the genesis filter header first

dash-spv/src/sync/filters/headers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
671671
);
672672

673673
// Store the verified filter headers
674-
self.store_filter_headers(cf_headers.clone(), storage).await?;
674+
self.store_filter_headers(&cf_headers, storage).await?;
675675

676676
// Update next expected height
677677
self.next_cfheader_height_to_process = stop_height + 1;

dash-spv/src/sync/headers/manager.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
139139
/// Handle a Headers message
140140
pub async fn handle_headers_message(
141141
&mut self,
142-
headers: Vec<BlockHeader>,
142+
headers: &[BlockHeader],
143143
storage: &mut S,
144144
network: &mut N,
145145
) -> SyncResult<bool> {
@@ -251,7 +251,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
251251
// Update Chain State: Add all headers to in-memory chain_state
252252
{
253253
let mut cs = self.chain_state.write().await;
254-
for header in &headers {
254+
for header in headers {
255255
cs.add_header(*header);
256256
}
257257
}
@@ -261,13 +261,13 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
261261
storage.as_any_mut().downcast_mut::<crate::storage::disk::DiskStorageManager>()
262262
{
263263
disk_storage
264-
.store_headers(&headers)
264+
.store_headers(headers)
265265
.await
266266
.map_err(|e| SyncError::Storage(format!("Failed to store headers batch: {}", e)))?;
267267
} else {
268268
// Fallback to standard store_headers for other storage backends
269269
storage
270-
.store_headers(&headers)
270+
.store_headers(headers)
271271
.await
272272
.map_err(|e| SyncError::Storage(format!("Failed to store headers batch: {}", e)))?;
273273
}
@@ -452,7 +452,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
452452
/// Returns true if the message was processed and sync should continue, false if sync is complete.
453453
pub async fn handle_headers2_message(
454454
&mut self,
455-
headers2: dashcore::network::message_headers2::Headers2Message,
455+
headers2: &dashcore::network::message_headers2::Headers2Message,
456456
peer_id: crate::types::PeerId,
457457
_storage: &mut S,
458458
_network: &mut N,
@@ -553,7 +553,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
553553
);
554554

555555
// Process decompressed headers through the normal flow
556-
self.handle_headers_message(headers, _storage, _network).await
556+
self.handle_headers_message(&headers, _storage, _network).await
557557
}
558558
}
559559

dash-spv/src/sync/masternodes/manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -443,11 +443,11 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
443443
/// Handle incoming MnListDiff message
444444
pub async fn handle_mnlistdiff_message(
445445
&mut self,
446-
diff: MnListDiff,
446+
diff: &MnListDiff,
447447
storage: &mut S,
448448
_network: &mut dyn NetworkManager,
449449
) -> SyncResult<bool> {
450-
self.insert_mn_list_diff(&diff, storage).await;
450+
self.insert_mn_list_diff(diff, storage).await;
451451

452452
// Decrement pending request counter if we were expecting this response
453453
if self.pending_mnlistdiff_requests > 0 {

dash-spv/src/sync/message_handlers.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl<
2525
/// Handle incoming network messages with phase filtering
2626
pub async fn handle_message(
2727
&mut self,
28-
message: NetworkMessage,
28+
message: &NetworkMessage,
2929
network: &mut N,
3030
storage: &mut S,
3131
) -> SyncResult<()> {
@@ -55,10 +55,10 @@ impl<
5555
}
5656

5757
// Check if this message is expected in the current phase
58-
if !self.is_message_expected_in_phase(&message) {
58+
if !self.is_message_expected_in_phase(message) {
5959
tracing::debug!(
6060
"Ignoring unexpected {:?} message in phase {}",
61-
std::mem::discriminant(&message),
61+
std::mem::discriminant(message),
6262
self.current_phase.name()
6363
);
6464
return Ok(());
@@ -286,7 +286,7 @@ impl<
286286

287287
pub(super) async fn handle_headers2_message(
288288
&mut self,
289-
headers2: dashcore::network::message_headers2::Headers2Message,
289+
headers2: &dashcore::network::message_headers2::Headers2Message,
290290
peer_id: PeerId,
291291
network: &mut N,
292292
storage: &mut S,
@@ -345,12 +345,12 @@ impl<
345345

346346
pub(super) async fn handle_headers_message(
347347
&mut self,
348-
headers: Vec<dashcore::block::Header>,
348+
headers: &[dashcore::block::Header],
349349
network: &mut N,
350350
storage: &mut S,
351351
) -> SyncResult<()> {
352352
let continue_sync =
353-
self.header_sync.handle_headers_message(headers.clone(), storage, network).await?;
353+
self.header_sync.handle_headers_message(headers, storage, network).await?;
354354

355355
// Calculate blockchain height before borrowing self.current_phase
356356
let blockchain_height = self.get_blockchain_height_from_storage(storage).await.unwrap_or(0);
@@ -400,7 +400,7 @@ impl<
400400

401401
pub(super) async fn handle_mnlistdiff_message(
402402
&mut self,
403-
diff: dashcore::network::message_sml::MnListDiff,
403+
diff: &dashcore::network::message_sml::MnListDiff,
404404
network: &mut N,
405405
storage: &mut S,
406406
) -> SyncResult<()> {
@@ -451,7 +451,7 @@ impl<
451451

452452
pub(super) async fn handle_qrinfo_message(
453453
&mut self,
454-
qr_info: dashcore::network::message_qrinfo::QRInfo,
454+
qr_info: &dashcore::network::message_qrinfo::QRInfo,
455455
network: &mut N,
456456
storage: &mut S,
457457
) -> SyncResult<()> {
@@ -509,7 +509,7 @@ impl<
509509

510510
pub(super) async fn handle_cfheaders_message(
511511
&mut self,
512-
cfheaders: dashcore::network::message_filter::CFHeaders,
512+
cfheaders: &dashcore::network::message_filter::CFHeaders,
513513
network: &mut N,
514514
storage: &mut S,
515515
) -> SyncResult<()> {
@@ -563,7 +563,7 @@ impl<
563563

564564
pub(super) async fn handle_cfilter_message(
565565
&mut self,
566-
cfilter: dashcore::network::message_filter::CFilter,
566+
cfilter: &dashcore::network::message_filter::CFilter,
567567
network: &mut N,
568568
storage: &mut S,
569569
) -> SyncResult<()> {
@@ -752,7 +752,7 @@ impl<
752752

753753
pub(super) async fn handle_block_message(
754754
&mut self,
755-
block: Block,
755+
block: &Block,
756756
network: &mut N,
757757
storage: &mut S,
758758
) -> SyncResult<()> {
@@ -768,7 +768,7 @@ impl<
768768
.map_err(|e| SyncError::Storage(format!("Failed to get block height: {}", e)))?
769769
.unwrap_or(0);
770770

771-
let relevant_txids = wallet.process_block(&block, block_height, self.config.network).await;
771+
let relevant_txids = wallet.process_block(block, block_height, self.config.network).await;
772772

773773
drop(wallet);
774774

0 commit comments

Comments
 (0)