Skip to content

Commit a927a4c

Browse files
authored
refactor: move Headers2 processing into network layer (#369)
This PR: - drops the `Headers2StateManager` and instead creates the state per peer in the peer run loop. - moves `Headers2StateManager.process_headers()` to `CompressionState.process_headers()` - upgrades `GetHeaders` to `GetHeaders2` in `send_message` when peer supports it - decompresses `Headers2` to `Headers` before forwarding to sync layer This simplifies the sync code and centralizes compression handling.
1 parent 831b889 commit a927a4c

File tree

7 files changed

+157
-445
lines changed

7 files changed

+157
-445
lines changed

dash-spv/src/network/manager.rs

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@ use tokio::sync::{mpsc, Mutex};
1010
use tokio::task::JoinSet;
1111
use tokio::time;
1212

13-
use async_trait::async_trait;
14-
use dashcore::network::constants::ServiceFlags;
15-
use dashcore::network::message::NetworkMessage;
16-
use dashcore::Network;
17-
use tokio_util::sync::CancellationToken;
18-
1913
use crate::client::config::MempoolStrategy;
2014
use crate::client::ClientConfig;
2115
use crate::error::{NetworkError, NetworkResult, SpvError as Error};
@@ -29,6 +23,12 @@ use crate::network::reputation::{
2923
};
3024
use crate::network::{HandshakeManager, NetworkManager, Peer};
3125
use crate::types::PeerInfo;
26+
use async_trait::async_trait;
27+
use dashcore::network::constants::ServiceFlags;
28+
use dashcore::network::message::NetworkMessage;
29+
use dashcore::network::message_headers2::CompressionState;
30+
use dashcore::Network;
31+
use tokio_util::sync::CancellationToken;
3232

3333
/// Peer network manager
3434
pub struct PeerNetworkManager {
@@ -71,6 +71,8 @@ pub struct PeerNetworkManager {
7171
exclusive_mode: bool,
7272
/// Cached count of currently connected peers for fast, non-blocking queries
7373
connected_peer_count: Arc<AtomicUsize>,
74+
/// Disable headers2 after decompression failure
75+
headers2_disabled: Arc<Mutex<HashSet<SocketAddr>>>,
7476
}
7577

7678
impl PeerNetworkManager {
@@ -124,6 +126,7 @@ impl PeerNetworkManager {
124126
user_agent: config.user_agent.clone(),
125127
exclusive_mode,
126128
connected_peer_count: Arc::new(AtomicUsize::new(0)),
129+
headers2_disabled: Arc::new(Mutex::new(HashSet::new())),
127130
})
128131
}
129132

@@ -210,6 +213,7 @@ impl PeerNetworkManager {
210213
let mempool_strategy = self.mempool_strategy;
211214
let user_agent = self.user_agent.clone();
212215
let connected_peer_count = self.connected_peer_count.clone();
216+
let headers2_disabled = self.headers2_disabled.clone();
213217

214218
// Spawn connection task
215219
let mut tasks = self.tasks.lock().await;
@@ -249,6 +253,7 @@ impl PeerNetworkManager {
249253
shutdown_token,
250254
reputation_manager.clone(),
251255
connected_peer_count.clone(),
256+
headers2_disabled.clone(),
252257
)
253258
.await;
254259
}
@@ -283,6 +288,7 @@ impl PeerNetworkManager {
283288
}
284289

285290
/// Start reading messages from a peer
291+
#[allow(clippy::too_many_arguments)] // TODO: refactor to reduce arguments
286292
async fn start_peer_reader(
287293
addr: SocketAddr,
288294
pool: Arc<PeerPool>,
@@ -291,10 +297,12 @@ impl PeerNetworkManager {
291297
shutdown_token: CancellationToken,
292298
reputation_manager: Arc<PeerReputationManager>,
293299
connected_peer_count: Arc<AtomicUsize>,
300+
headers2_disabled: Arc<Mutex<HashSet<SocketAddr>>>,
294301
) {
295302
tokio::spawn(async move {
296303
log::debug!("Starting peer reader loop for {}", addr);
297304
let mut loop_iteration = 0;
305+
let mut headers2_state = CompressionState::default();
298306

299307
loop {
300308
loop_iteration += 1;
@@ -440,9 +448,49 @@ impl PeerNetworkManager {
440448
// Forward to client
441449
}
442450
NetworkMessage::Headers2(headers2) => {
443-
// Log compressed headers messages specifically
444-
log::info!("📨 Received Headers2 message from {} with {} compressed headers!", addr, headers2.headers.len());
445-
// Forward to client (decompression handled by sync manager)
451+
// Decompress headers in network layer and forward as regular Headers
452+
log::info!(
453+
"Received Headers2 from {} with {} compressed headers - decompressing",
454+
addr,
455+
headers2.headers.len()
456+
);
457+
458+
match headers2_state.process_headers(&headers2.headers) {
459+
Ok(headers) => {
460+
log::info!(
461+
"Decompressed {} headers from {} - forwarding as regular Headers",
462+
headers.len(),
463+
addr
464+
);
465+
// Forward as regular Headers message
466+
let headers_msg = NetworkMessage::Headers(headers);
467+
if message_tx.send((addr, headers_msg)).await.is_err() {
468+
log::warn!(
469+
"Breaking peer reader loop for {} - failed to send decompressed headers",
470+
addr
471+
);
472+
break;
473+
}
474+
continue; // Already sent, don't forward the original Headers2
475+
}
476+
Err(e) => {
477+
log::error!(
478+
"Headers2 decompression failed from {}: {} - disabling headers2",
479+
addr,
480+
e
481+
);
482+
headers2_disabled.lock().await.insert(addr);
483+
// Apply reputation penalty
484+
reputation_manager
485+
.update_reputation(
486+
addr,
487+
misbehavior_scores::INVALID_MESSAGE,
488+
"Headers2 decompression failed",
489+
)
490+
.await;
491+
continue; // Don't forward corrupted message
492+
}
493+
}
446494
}
447495
NetworkMessage::GetHeaders(_) => {
448496
// SPV clients don't serve headers to peers
@@ -573,6 +621,8 @@ impl PeerNetworkManager {
573621
connected_peer_count.fetch_sub(1, Ordering::Relaxed);
574622
}
575623

624+
headers2_disabled.lock().await.remove(&addr);
625+
576626
// Give small positive reputation if peer maintained long connection
577627
let conn_duration = Duration::from_secs(60 * loop_iteration); // Rough estimate
578628
if conn_duration > Duration::from_secs(3600) {
@@ -780,7 +830,8 @@ impl PeerNetworkManager {
780830
// For filter-related messages, we need a peer that supports compact filters
781831
let requires_compact_filters =
782832
matches!(&message, NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_));
783-
let requires_headers2 = matches!(&message, NetworkMessage::GetHeaders2(_));
833+
let check_headers2 =
834+
matches!(&message, NetworkMessage::GetHeaders(_) | NetworkMessage::GetHeaders2(_));
784835

785836
let selected_peer = if requires_compact_filters {
786837
// Find a peer that supports compact filters
@@ -808,7 +859,7 @@ impl PeerNetworkManager {
808859
));
809860
}
810861
}
811-
} else if requires_headers2 {
862+
} else if check_headers2 {
812863
// Prefer a peer that advertises headers2 support
813864
let mut current_sync_peer = self.current_sync_peer.lock().await;
814865
let mut selected: Option<SocketAddr> = None;
@@ -875,6 +926,25 @@ impl PeerNetworkManager {
875926
.find(|(a, _)| *a == selected_peer)
876927
.ok_or_else(|| NetworkError::ConnectionFailed("Selected peer not found".to_string()))?;
877928

929+
// Upgrade GetHeaders to GetHeaders2 if this specific peer supports it and not disabled
930+
let peer_supports_headers2 = {
931+
let peer_guard = peer.read().await;
932+
peer_guard.can_request_headers2()
933+
};
934+
let message = match message {
935+
NetworkMessage::GetHeaders(get_headers)
936+
if !self.headers2_disabled.lock().await.contains(addr)
937+
&& peer_supports_headers2 =>
938+
{
939+
log::debug!(
940+
"Upgrading GetHeaders to GetHeaders2 for peer {}: {:?}",
941+
addr,
942+
get_headers
943+
);
944+
NetworkMessage::GetHeaders2(get_headers)
945+
}
946+
other => other,
947+
};
878948
// Reduce verbosity for common sync messages
879949
match &message {
880950
NetworkMessage::GetHeaders(_)
@@ -1069,6 +1139,7 @@ impl Clone for PeerNetworkManager {
10691139
user_agent: self.user_agent.clone(),
10701140
exclusive_mode: self.exclusive_mode,
10711141
connected_peer_count: self.connected_peer_count.clone(),
1142+
headers2_disabled: self.headers2_disabled.clone(),
10721143
}
10731144
}
10741145
}

dash-spv/src/sync/headers/manager.rs

Lines changed: 7 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use crate::error::{SyncError, SyncResult};
1313
use crate::network::NetworkManager;
1414
use crate::storage::StorageManager;
1515
use crate::sync::headers::validate_headers;
16-
use crate::sync::headers2::Headers2StateManager;
1716
use crate::types::{ChainState, HashedBlockHeader};
1817
use crate::ValidationMode;
1918
use std::sync::Arc;
@@ -51,11 +50,8 @@ pub struct HeaderSyncManager<S: StorageManager, N: NetworkManager> {
5150
checkpoint_manager: CheckpointManager,
5251
reorg_config: ReorgConfig,
5352
chain_state: Arc<RwLock<ChainState>>,
54-
// WalletState removed - wallet functionality is now handled externally
55-
headers2_state: Headers2StateManager,
5653
syncing_headers: bool,
5754
last_sync_progress: std::time::Instant,
58-
headers2_failed: bool,
5955
// Cached flag for quick access without locking
6056
cached_sync_base_height: u32,
6157
}
@@ -83,11 +79,8 @@ impl<S: StorageManager, N: NetworkManager> HeaderSyncManager<S, N> {
8379
checkpoint_manager,
8480
reorg_config,
8581
chain_state,
86-
// WalletState removed
87-
headers2_state: Headers2StateManager::new(),
8882
syncing_headers: false,
8983
last_sync_progress: std::time::Instant::now(),
90-
headers2_failed: false,
9184
cached_sync_base_height: 0,
9285
_phantom_s: std::marker::PhantomData,
9386
_phantom_n: std::marker::PhantomData,
@@ -324,144 +317,22 @@ impl<S: StorageManager, N: NetworkManager> HeaderSyncManager<S, N> {
324317
getheaders_msg.stop_hash
325318
);
326319

327-
// Use headers2 if peer supports it and we haven't had failures
328-
let use_headers2 = network.has_headers2_peer().await && !self.headers2_failed;
329-
330320
// Log details about the request
331321
tracing::info!(
332-
"Preparing headers request - height: {}, base_hash: {:?}, headers2_supported: {}",
322+
"Preparing headers request - height: {}, base_hash: {:?}",
333323
storage.get_tip_height().await.unwrap_or(0),
334-
base_hash,
335-
use_headers2
324+
base_hash
336325
);
337326

338-
// Try GetHeaders2 first if peer supports it, with fallback to regular GetHeaders
339-
if use_headers2 {
340-
tracing::info!("📤 Sending GetHeaders2 message (compressed headers)");
341-
tracing::debug!(
342-
"GetHeaders2 details: version={}, locator_hashes={:?}, stop_hash={}",
343-
getheaders_msg.version,
344-
getheaders_msg.locator_hashes,
345-
getheaders_msg.stop_hash
346-
);
347-
348-
// Log the raw message bytes for debugging
349-
let msg_bytes = dashcore::consensus::encode::serialize(&getheaders_msg);
350-
tracing::debug!(
351-
"GetHeaders2 raw bytes ({}): {:02x?}",
352-
msg_bytes.len(),
353-
&msg_bytes[..std::cmp::min(100, msg_bytes.len())]
354-
);
355-
356-
// Send GetHeaders2 message for compressed headers
357-
let result =
358-
network.send_message(NetworkMessage::GetHeaders2(getheaders_msg.clone())).await;
359-
360-
match result {
361-
Ok(_) => {
362-
// TODO: Implement timeout and fallback mechanism
363-
// For now, we rely on the network layer's timeout handling
364-
// In the future, we should:
365-
// 1. Track the request with a unique ID
366-
// 2. Set a specific timeout for GetHeaders2 response
367-
// 3. Fall back to GetHeaders if no response within timeout
368-
// 4. Mark peers that don't respond to GetHeaders2 properly
369-
}
370-
Err(e) => {
371-
tracing::warn!("Failed to send GetHeaders2, falling back to GetHeaders: {}", e);
372-
// Fall back to regular GetHeaders
373-
network
374-
.send_message(NetworkMessage::GetHeaders(getheaders_msg))
375-
.await
376-
.map_err(|e| {
377-
SyncError::Network(format!("Failed to send GetHeaders: {}", e))
378-
})?;
379-
}
380-
}
381-
} else {
382-
tracing::info!("📤 Sending GetHeaders message (uncompressed headers)");
383-
// Send regular GetHeaders message
384-
network
385-
.send_message(NetworkMessage::GetHeaders(getheaders_msg))
386-
.await
387-
.map_err(|e| SyncError::Network(format!("Failed to send GetHeaders: {}", e)))?;
388-
}
327+
tracing::debug!("Sending GetHeaders message");
328+
network
329+
.send_message(NetworkMessage::GetHeaders(getheaders_msg))
330+
.await
331+
.map_err(|e| SyncError::Network(format!("Failed to send GetHeaders: {}", e)))?;
389332

390333
Ok(())
391334
}
392335

393-
/// Handle a Headers2 message with compressed headers.
394-
/// Returns true if the message was processed and sync should continue, false if sync is complete.
395-
pub async fn handle_headers2_message(
396-
&mut self,
397-
headers2: &dashcore::network::message_headers2::Headers2Message,
398-
peer_id: crate::types::PeerId,
399-
storage: &mut S,
400-
network: &mut N,
401-
) -> SyncResult<(bool, usize)> {
402-
tracing::info!(
403-
"📦 Received {} compressed headers from peer {}",
404-
headers2.headers.len(),
405-
peer_id
406-
);
407-
408-
// If this is the first headers2 message, and we need to initialize compression state
409-
if !headers2.headers.is_empty() {
410-
// Check if we need to initialize the compression state
411-
let state = self.headers2_state.get_state(peer_id);
412-
if state.prev_header.is_none() {
413-
// Initialize with header at current tip height (works for both genesis and later)
414-
let tip_height = storage.get_tip_height().await.unwrap_or(0);
415-
if let Some(tip_header) = storage.get_header(tip_height).await.map_err(|e| {
416-
SyncError::Storage(format!(
417-
"Error trying to get block at height {} from storage: {}",
418-
tip_height, e
419-
))
420-
})? {
421-
tracing::info!(
422-
"Initializing headers2 compression state for peer {} with header at height {}",
423-
peer_id,
424-
tip_height
425-
);
426-
self.headers2_state.init_peer_state(peer_id, tip_header);
427-
}
428-
}
429-
}
430-
431-
// Decompress headers using the peer's compression state
432-
let headers = match self.headers2_state.process_headers(peer_id, &headers2.headers) {
433-
Ok(headers) => headers,
434-
Err(e) => {
435-
tracing::error!(
436-
"Failed to decompress headers2 from peer {}: {}. Headers count: {}, first header compressed: {}, chain height: {}",
437-
peer_id,
438-
e,
439-
headers2.headers.len(),
440-
if headers2.headers.is_empty() {
441-
"N/A (empty)".to_string()
442-
} else {
443-
(!headers2.headers[0].is_full()).to_string()
444-
},
445-
storage.get_tip_height().await.unwrap_or(0)
446-
);
447-
448-
// Mark that headers2 failed for this sync session to trigger fallback to regular headers
449-
self.headers2_failed = true;
450-
return Err(SyncError::Headers2DecompressionFailed(format!(
451-
"Failed to decompress headers: {}",
452-
e
453-
)));
454-
}
455-
};
456-
457-
let headers_count = headers.len();
458-
459-
// Process decompressed headers through the normal flow
460-
let continue_sync = self.handle_headers_message(&headers, storage, network).await?;
461-
462-
Ok((continue_sync, headers_count))
463-
}
464-
465336
/// Prepare sync state without sending network requests.
466337
/// This allows monitoring to be set up before requests are sent.
467338
pub async fn prepare_sync(&mut self, storage: &mut S) -> SyncResult<Option<BlockHash>> {

0 commit comments

Comments
 (0)