Skip to content

Commit a86c5b6

Browse files
fix: Fix compressed headers protocol compatibility with Dash Core (#256)
* fix(headers2): Fix compressed headers protocol compatibility with Dash Core This commit fixes critical incompatibilities between the Rust headers2 implementation and the C++ Dash Core reference implementation (DIP-0025). - C++ uses offset=0 for "version not in cache" (full version present) - C++ uses offset=1-7 for "version at position offset-1 in cache" - Rust incorrectly used offset=7 for uncompressed, offset=0-6 for cached - Now matches C++ semantics exactly - C++ uses std::list with MRU (Most Recently Used) reordering - Rust used a circular buffer without MRU reordering - Changed to Vec<i32> with proper MRU behavior matching C++ - Fixed Decodable impl to read version when offset=0 (not offset=7) - Added MissingVersion error variant for proper error handling - Rewrote CompressionState to use Vec with MRU reordering - Fixed compress() to use offset=0 for uncompressed versions - Fixed decompress() to handle C++ offset semantics - Updated Decodable to read version when offset=0 - Added comprehensive tests for C++ compatibility - Enabled headers2 in handshake negotiation - Enabled headers2 in sync manager - Fixed phase transition when receiving empty headers2 response - Re-enabled has_headers2_peer() check - Added headers2_compatibility_test.rs with 12 tests verifying: - Version offset C++ semantics - MRU cache reordering behavior - Flag bit semantics - Serialization format compatibility - Cross-implementation compatibility - All existing tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * feat(network): Add support for Headers2 in peer selection logic This commit enhances the PeerNetworkManager to include support for the Headers2 protocol. It introduces logic to prefer peers that advertise Headers2 support when selecting a sync peer. The changes ensure that the current sync peer is updated accordingly, improving compatibility and efficiency in header synchronization. - Added logic to check for Headers2 support in peer selection. - Updated existing sync peer selection to prioritize peers with Headers2 capabilities. - Ensured proper logging when a new sync peer is selected for Headers2. This update aligns with ongoing efforts to improve protocol compatibility and performance in the network manager. * refactor(headers2): Address code review feedback - Consolidate redundant if/else blocks in compression state initialization (reduces 5 async lock acquisitions to 1) - Simplify block locator construction (both branches returned vec![hash]) - Change process_headers to take &[CompressedHeader] instead of Vec to avoid cloning headers during sync 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * refactor(sync): Unify header sync finalization for regular and compressed headers - Extract `finalize_headers_sync` helper to eliminate duplicated phase update logic - Return decompressed header count from `handle_headers2_message` so both paths can track `headers_downloaded` and `headers_per_second` stats uniformly - Remove special-case handling that skipped stats for compressed headers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * refactor(sync): Change handle_headers_message to take &[BlockHeader] instead of Vec Avoids unnecessary cloning of headers vector when passing to the handler. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * refactor(headers2): Remove speculative genesis decompression comment The defensive fallback mechanism (headers2_failed flag) is kept, but the speculative comment about genesis compression state issues is removed. With the C++ compatibility fix, this scenario should not occur. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * fix(clippy): Remove needless borrows after slice refactor Variables are already references from slice/iter, no need to borrow again. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]> * fix: clippy warning --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 705cdf6 commit a86c5b6

File tree

7 files changed

+799
-237
lines changed

7 files changed

+799
-237
lines changed

dash-spv/src/network/handshake.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ impl HandshakeManager {
252252
.unwrap_or(Duration::from_secs(0))
253253
.as_secs() as i64;
254254

255-
// SPV client doesn't advertise any special services since headers2 is disabled
256-
let services = ServiceFlags::NONE;
255+
// Advertise headers2 support (NODE_HEADERS_COMPRESSED)
256+
let services = ServiceFlags::NONE | NODE_HEADERS_COMPRESSED;
257257

258258
// Parse the local address safely
259259
let local_addr = "127.0.0.1:0"
@@ -310,10 +310,13 @@ impl HandshakeManager {
310310

311311
/// Negotiate headers2 support with the peer after handshake completion.
312312
async fn negotiate_headers2(&self, connection: &mut Peer) -> NetworkResult<()> {
313-
// Headers2 is currently disabled due to protocol compatibility issues
314-
// Always send SendHeaders regardless of peer support
315-
tracing::info!("Headers2 is disabled - sending SendHeaders only");
316-
connection.send_message(NetworkMessage::SendHeaders).await?;
313+
if self.peer_supports_headers2() {
314+
tracing::info!("Peer supports headers2 - sending SendHeaders2");
315+
connection.send_message(NetworkMessage::SendHeaders2).await?;
316+
} else {
317+
tracing::info!("Peer does not support headers2 - sending SendHeaders");
318+
connection.send_message(NetworkMessage::SendHeaders).await?;
319+
}
317320
Ok(())
318321
}
319322
}

dash-spv/src/network/manager.rs

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -423,19 +423,20 @@ impl PeerNetworkManager {
423423
headers.len()
424424
);
425425
// Check if peer supports headers2
426-
// TODO: Re-enable this warning once headers2 is fixed
427-
// Currently suppressed since headers2 is disabled
428-
/*
429426
let peer_guard = peer.read().await;
430-
if peer_guard.peer_info().services.map(|s| {
431-
dashcore::network::constants::ServiceFlags::from(s).has(
432-
dashcore::network::constants::ServiceFlags::from(2048u64)
433-
)
434-
}).unwrap_or(false) {
427+
if peer_guard
428+
.peer_info()
429+
.services
430+
.map(|s| {
431+
dashcore::network::constants::ServiceFlags::from(s).has(
432+
dashcore::network::constants::NODE_HEADERS_COMPRESSED,
433+
)
434+
})
435+
.unwrap_or(false)
436+
{
435437
log::warn!("⚠️ Peer {} supports headers2 but sent regular headers - possible protocol issue", addr);
436438
}
437439
drop(peer_guard);
438-
*/
439440
// Forward to client
440441
}
441442
NetworkMessage::Headers2(headers2) => {
@@ -779,6 +780,7 @@ impl PeerNetworkManager {
779780
// For filter-related messages, we need a peer that supports compact filters
780781
let requires_compact_filters =
781782
matches!(&message, NetworkMessage::GetCFHeaders(_) | NetworkMessage::GetCFilters(_));
783+
let requires_headers2 = matches!(&message, NetworkMessage::GetHeaders2(_));
782784

783785
let selected_peer = if requires_compact_filters {
784786
// Find a peer that supports compact filters
@@ -806,6 +808,37 @@ impl PeerNetworkManager {
806808
));
807809
}
808810
}
811+
} else if requires_headers2 {
812+
// Prefer a peer that advertises headers2 support
813+
let mut current_sync_peer = self.current_sync_peer.lock().await;
814+
let mut selected: Option<SocketAddr> = None;
815+
816+
if let Some(current_addr) = *current_sync_peer {
817+
if let Some((_, peer)) = peers.iter().find(|(addr, _)| *addr == current_addr) {
818+
let peer_guard = peer.read().await;
819+
if peer_guard.peer_info().supports_headers2() {
820+
selected = Some(current_addr);
821+
}
822+
}
823+
}
824+
825+
if selected.is_none() {
826+
for (addr, peer) in &peers {
827+
let peer_guard = peer.read().await;
828+
if peer_guard.peer_info().supports_headers2() {
829+
selected = Some(*addr);
830+
break;
831+
}
832+
}
833+
}
834+
835+
let chosen = selected.unwrap_or(peers[0].0);
836+
if Some(chosen) != *current_sync_peer {
837+
log::info!("Sync peer selected for Headers2: {}", chosen);
838+
*current_sync_peer = Some(chosen);
839+
}
840+
drop(current_sync_peer);
841+
chosen
809842
} else {
810843
// For non-filter messages, use the sticky sync peer
811844
let mut current_sync_peer = self.current_sync_peer.lock().await;
@@ -1060,6 +1093,7 @@ impl NetworkManager for PeerNetworkManager {
10601093
// For sync messages that require consistent responses, send to only one peer
10611094
match &message {
10621095
NetworkMessage::GetHeaders(_)
1096+
| NetworkMessage::GetHeaders2(_)
10631097
| NetworkMessage::GetCFHeaders(_)
10641098
| NetworkMessage::GetCFilters(_)
10651099
| NetworkMessage::GetData(_)
@@ -1296,9 +1330,7 @@ impl NetworkManager for PeerNetworkManager {
12961330
}
12971331

12981332
async fn has_headers2_peer(&self) -> bool {
1299-
// Headers2 is currently disabled due to protocol compatibility issues
1300-
// TODO: Fix headers2 decompression before re-enabling
1301-
false
1333+
self.has_peer_with_service(dashcore::network::constants::NODE_HEADERS_COMPRESSED).await
13021334
}
13031335

13041336
async fn get_last_message_peer_id(&self) -> crate::types::PeerId {

dash-spv/src/sync/headers/manager.rs

Lines changed: 49 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -318,32 +318,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
318318
base_hash: Option<BlockHash>,
319319
) -> SyncResult<()> {
320320
let block_locator = match base_hash {
321-
Some(hash) => {
322-
// When syncing from a checkpoint, we need to create a proper locator
323-
// that helps the peer understand we want headers AFTER this point
324-
if self.is_synced_from_checkpoint() {
325-
// For checkpoint sync, only include the checkpoint hash
326-
// Including genesis would allow peers to fall back to sending headers from genesis
327-
// if they don't recognize the checkpoint, which is exactly what we want to avoid
328-
tracing::debug!(
329-
"📍 Using checkpoint-only locator for height {}: [{}]",
330-
self.get_sync_base_height(),
331-
hash
332-
);
333-
vec![hash]
334-
} else if network.has_headers2_peer().await && !self.headers2_failed {
335-
// Check if this is genesis and we're using headers2
336-
let genesis_hash = self.config.network.known_genesis_block_hash();
337-
if genesis_hash == Some(hash) {
338-
tracing::info!("📍 Using empty locator for headers2 genesis sync");
339-
vec![]
340-
} else {
341-
vec![hash]
342-
}
343-
} else {
344-
vec![hash]
345-
}
346-
}
321+
Some(hash) => vec![hash],
347322
None => {
348323
// Check if we're syncing from a checkpoint
349324
if self.is_synced_from_checkpoint()
@@ -381,9 +356,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
381356
getheaders_msg.stop_hash
382357
);
383358

384-
// Headers2 is currently disabled due to protocol compatibility issues
385-
// TODO: Fix headers2 decompression before re-enabling
386-
let use_headers2 = false; // Disabled until headers2 implementation is fixed
359+
// Use headers2 if peer supports it and we haven't had failures
360+
let use_headers2 = network.has_headers2_peer().await && !self.headers2_failed;
387361

388362
// Log details about the request
389363
tracing::info!(
@@ -454,64 +428,39 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
454428
&mut self,
455429
headers2: &dashcore::network::message_headers2::Headers2Message,
456430
peer_id: crate::types::PeerId,
457-
_storage: &mut S,
458-
_network: &mut N,
459-
) -> SyncResult<bool> {
460-
tracing::warn!(
461-
"⚠️ Headers2 support is currently NON-FUNCTIONAL. Received {} compressed headers from peer {} but cannot process them.",
431+
storage: &mut S,
432+
network: &mut N,
433+
) -> SyncResult<(bool, usize)> {
434+
tracing::info!(
435+
"📦 Received {} compressed headers from peer {}",
462436
headers2.headers.len(),
463437
peer_id
464438
);
465439

466-
// Mark headers2 as failed for this session to avoid retrying
467-
self.headers2_failed = true;
468-
469-
// Return an error to trigger fallback to regular headers
470-
return Err(SyncError::Headers2DecompressionFailed(
471-
"Headers2 is currently disabled due to protocol compatibility issues".to_string(),
472-
));
473-
474-
#[allow(unreachable_code)]
475-
{
476-
// If this is the first headers2 message, and we need to initialize compression state
477-
if !headers2.headers.is_empty() {
478-
// Check if we need to initialize the compression state
479-
let state = self.headers2_state.get_state(peer_id);
480-
if state.prev_header.is_none() {
481-
// If we're syncing from genesis (height 0), initialize with genesis header
482-
if self.chain_state.read().await.tip_height() == 0 {
483-
// We have genesis header at index 0
484-
if let Some(genesis_header) =
485-
self.chain_state.read().await.header_at_height(0)
486-
{
487-
tracing::info!(
488-
"Initializing headers2 compression state for peer {} with genesis header",
489-
peer_id
490-
);
491-
self.headers2_state.init_peer_state(peer_id, *genesis_header);
492-
}
493-
} else if self.chain_state.read().await.tip_height() > 0 {
494-
// Get our current tip to use as the base for compression
495-
if let Some(tip_header) = self.chain_state.read().await.get_tip_header() {
496-
tracing::info!(
497-
"Initializing headers2 compression state for peer {} with tip header at height {}",
498-
peer_id,
499-
self.chain_state.read().await.tip_height()
500-
);
501-
self.headers2_state.init_peer_state(peer_id, tip_header);
502-
}
503-
}
440+
// If this is the first headers2 message, and we need to initialize compression state
441+
if !headers2.headers.is_empty() {
442+
// Check if we need to initialize the compression state
443+
let state = self.headers2_state.get_state(peer_id);
444+
if state.prev_header.is_none() {
445+
// Initialize with header at current tip height (works for both genesis and later)
446+
let chain_state = self.chain_state.read().await;
447+
let tip_height = chain_state.tip_height();
448+
if let Some(tip_header) = chain_state.header_at_height(tip_height) {
449+
tracing::info!(
450+
"Initializing headers2 compression state for peer {} with header at height {}",
451+
peer_id,
452+
tip_height
453+
);
454+
self.headers2_state.init_peer_state(peer_id, *tip_header);
504455
}
505456
}
457+
}
506458

507-
// Decompress headers using the peer's compression state
508-
let headers = match self
509-
.headers2_state
510-
.process_headers(peer_id, headers2.headers.clone())
511-
{
512-
Ok(headers) => headers,
513-
Err(e) => {
514-
tracing::error!(
459+
// Decompress headers using the peer's compression state
460+
let headers = match self.headers2_state.process_headers(peer_id, &headers2.headers) {
461+
Ok(headers) => headers,
462+
Err(e) => {
463+
tracing::error!(
515464
"Failed to decompress headers2 from peer {}: {}. Headers count: {}, first header compressed: {}, chain height: {}",
516465
peer_id,
517466
e,
@@ -524,37 +473,29 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
524473
self.chain_state.read().await.tip_height()
525474
);
526475

527-
// If we failed due to missing previous header, and we're at genesis,
528-
// this might be a protocol issue where peer expects us to have genesis in compression state
529-
if matches!(e, crate::sync::headers2::ProcessError::DecompressionError(0, _))
530-
&& self.chain_state.read().await.tip_height() == 0
531-
{
532-
tracing::warn!(
533-
"Headers2 decompression failed at genesis. Peer may be sending compressed headers that reference genesis. Consider falling back to regular headers."
534-
);
535-
}
476+
// Mark that headers2 failed for this sync session to trigger fallback to regular headers
477+
self.headers2_failed = true;
478+
return Err(SyncError::Headers2DecompressionFailed(format!(
479+
"Failed to decompress headers: {}",
480+
e
481+
)));
482+
}
483+
};
536484

537-
// Return a specific error that can trigger fallback
538-
// Mark that headers2 failed for this sync session
539-
self.headers2_failed = true;
540-
return Err(SyncError::Headers2DecompressionFailed(format!(
541-
"Failed to decompress headers: {}",
542-
e
543-
)));
544-
}
545-
};
485+
// Log compression statistics
486+
let stats = self.headers2_state.get_stats();
487+
tracing::info!(
488+
"📊 Headers2 compression stats: {:.1}% bandwidth saved, {:.1}% compression ratio",
489+
stats.bandwidth_savings,
490+
stats.compression_ratio * 100.0
491+
);
546492

547-
// Log compression statistics
548-
let stats = self.headers2_state.get_stats();
549-
tracing::info!(
550-
"📊 Headers2 compression stats: {:.1}% bandwidth saved, {:.1}% compression ratio",
551-
stats.bandwidth_savings,
552-
stats.compression_ratio * 100.0
553-
);
493+
let headers_count = headers.len();
554494

555-
// Process decompressed headers through the normal flow
556-
self.handle_headers_message(&headers, _storage, _network).await
557-
}
495+
// Process decompressed headers through the normal flow
496+
let continue_sync = self.handle_headers_message(&headers, storage, network).await?;
497+
498+
Ok((continue_sync, headers_count))
558499
}
559500

560501
/// Prepare sync state without sending network requests.

dash-spv/src/sync/headers2/manager.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl Headers2StateManager {
9696
pub fn process_headers(
9797
&mut self,
9898
peer_id: PeerId,
99-
headers: Vec<CompressedHeader>,
99+
headers: &[CompressedHeader],
100100
) -> Result<Vec<Header>, ProcessError> {
101101
if headers.is_empty() {
102102
return Ok(Vec::new());
@@ -116,7 +116,7 @@ impl Headers2StateManager {
116116
let mut decompressed = Vec::with_capacity(headers.len());
117117

118118
// Process headers and collect statistics
119-
for (i, compressed) in headers.into_iter().enumerate() {
119+
for (i, compressed) in headers.iter().enumerate() {
120120
// Update statistics
121121
self.total_headers_received += 1;
122122
self.total_bytes_received += compressed.encoded_size() as u64;
@@ -128,9 +128,8 @@ impl Headers2StateManager {
128128

129129
// Get state and decompress
130130
let state = self.get_state(peer_id);
131-
let header = state
132-
.decompress(&compressed)
133-
.map_err(|e| ProcessError::DecompressionError(i, e))?;
131+
let header =
132+
state.decompress(compressed).map_err(|e| ProcessError::DecompressionError(i, e))?;
134133

135134
decompressed.push(header);
136135
}
@@ -230,7 +229,7 @@ mod tests {
230229
let compressed2 = compress_state.compress(&header2);
231230

232231
// Process headers
233-
let result = manager.process_headers(peer_id, vec![compressed1, compressed2]);
232+
let result = manager.process_headers(peer_id, &[compressed1, compressed2]);
234233
assert!(result.is_ok());
235234

236235
let decompressed = result.expect("decompression should succeed in test");
@@ -261,7 +260,7 @@ mod tests {
261260

262261
// Try to process it as first header - should fail with DecompressionError
263262
// because the peer doesn't have the previous header state
264-
let result = manager.process_headers(peer_id, vec![compressed]);
263+
let result = manager.process_headers(peer_id, &[compressed]);
265264
assert!(matches!(result, Err(ProcessError::DecompressionError(0, _))));
266265
}
267266

0 commit comments

Comments
 (0)