Skip to content

Conversation

@PastaPastaPasta
Copy link
Member

@PastaPastaPasta PastaPastaPasta commented Jun 17, 2025

feat: Add Complete dash-spv SPV Client + Modular Wallet Architecture

Summary

This PR introduces a brand new dash-spv SPV client implementation (26,000+ lines) along with a modular wallet architecture. The dash-spv crate provides a complete SPV (Simplified Payment Verification) client for the Dash network with advanced features like filter synchronization, masternode management, and wallet functionality. Additionally, wallet functionality has been extracted into standalone key-wallet components with cross-platform FFI bindings.

Key Features Added

Complete dash-spv SPV Client - Full-featured SPV client with 26,000+ lines of new code
Advanced Filter Synchronization - BIP157 compact block filters with gap detection and auto-restart
Wallet Integration - UTXO tracking, address monitoring, and transaction processing
Masternode Support - Synchronization and validation of Dash masternode lists
Multi-peer Networking - Robust P2P networking with connection management and message routing
Comprehensive Storage - Both memory and disk-based storage with segmented architecture

New dash-spv Architecture

Client Layer:

  • High-level DashSpvClient with comprehensive configuration options
  • Block processing with dedicated worker threads and async task handling
  • Message handling with request tracking and timeout management
  • Filter synchronization with flow control and progress reporting
  • Wallet utilities and watch manager for address monitoring

Network Layer:

  • Multi-peer connection management with exclusive peer mode
  • TCP connection handling with handshake protocols
  • Peer discovery and connection pooling
  • Message routing and network monitoring
  • Address v2 support and peer persistence

Storage Layer:

  • Memory and disk-based storage implementations
  • Segmented storage for headers (10,000-header segments)
  • Separate storage for filter headers and compact filters
  • State persistence and consistency management
  • Reverse indexing for efficient lookups

Synchronization Layer:

  • Header synchronization with validation
  • Filter header and compact filter sync coordination
  • Masternode list synchronization
  • Interleaved sync processes with state management
  • Timeout handling and recovery mechanisms

Validation Layer:

  • Header validation with proof-of-work verification
  • ChainLock validation for enhanced security
  • InstantLock validation for InstantSend transactions
  • Comprehensive validation framework

Wallet Layer:

  • UTXO tracking with in-memory cache and disk persistence
  • Transaction processing and balance calculations
  • Address indexing and monitoring
  • Watch item management for payment detection

Core Library Enhancements

Enhanced Special Transactions:

  • Improved coinbase payload handling with version-specific parsing
  • Enhanced ProTx parsing with BLS version support
  • Updated provider registration/update service transactions
  • Better error handling and validation

Network Protocol Improvements:

  • Updated network constants and genesis block configuration
  • Enhanced masternode list and quorum management
  • Improved message handling and serialization

Summary by CodeRabbit

  • New Features

    • Introduced a modular, async Dash SPV client library with support for block header sync, BIP157 filter sync, masternode list sync, ChainLock and InstantLock validation, wallet monitoring, and persistent storage.
    • Added a command-line interface for configuring and running the client, including terminal UI status display.
    • Provided comprehensive configuration management, error handling, and flexible storage backends (memory and disk).
    • Implemented multi-peer networking, DNS peer discovery, and peer persistence.
    • Enabled wallet UTXO tracking, address watching, transaction/block processing, and detailed balance reporting.
    • Added extensive documentation, usage guides, and a test plan.
  • Bug Fixes

    • Corrected version-aware serialization/deserialization of special transaction payloads to prevent over-reading and ensure compatibility.
    • Updated Dash mainnet genesis block constants for accuracy.
    • Fixed deserialization logic for special transaction types to avoid incorrect segwit interpretation.
  • Tests

    • Added a wide suite of unit, integration, and edge case tests covering network, storage, sync, validation, wallet, and transaction logic, including real-node integration and performance benchmarks.
  • Documentation

    • Added detailed README, architecture, integration test guides, and a comprehensive test plan.
  • Chores

    • Updated workspace configuration to include the new Dash SPV package.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 17, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This change introduces a new "dash-spv" Rust package implementing a modular, async Dash SPV client library, including its configuration, documentation, CLI, core modules, and comprehensive test suite. It also updates Dash mainnet genesis block parameters, fixes version-aware coinbase payload decoding, and adds support for the MNHF Signal special transaction type. The update includes major new modules for networking, storage, sync, validation, wallet, and error handling.

Changes

File(s) / Path(s) Change Summary
Cargo.toml Added "dash-spv" to workspace members.
dash-spv/Cargo.toml, dash-spv/README.md, dash-spv/CLAUDE.md, dash-spv/run_integration_tests.md Added new package manifest, documentation, and integration test instructions for the Dash SPV client.
dash-spv/src/lib.rs, dash-spv/src/main.rs Introduced library and CLI entrypoint for the Dash SPV client.
dash-spv/src/client/* Implemented high-level client API, configuration, block processing, consistency checks, filter sync, message handling, status display, wallet utilities, and watch management.
dash-spv/src/error.rs Added comprehensive error handling enums and result types.
dash-spv/src/filters/mod.rs Added BIP157 filter manager placeholder.
dash-spv/src/network/* Implemented full networking stack: TCP/multi-peer, handshake, peer management, address exchange, DNS discovery, connection pooling, persistence, and message routing.
dash-spv/src/storage/* Added async, segmented disk and in-memory storage managers, with trait abstraction, UTXO/index handling, and storage types.
dash-spv/src/sync/* Added synchronization managers for headers, filters, masternodes, with state tracking and flow control.
dash-spv/src/terminal.rs Added terminal UI/status bar utilities.
dash-spv/src/types.rs Defined common types: sync progress, chain state, validation modes, peer info, filter matches, watch items, stats, balances.
dash-spv/src/validation/* Added modular validation for headers, chainlocks, instantlocks, with a central validation manager.
dash-spv/src/wallet/* Implemented wallet with watched addresses, UTXO tracking, transaction/block processing, and balance calculation.
dash-spv/examples/* Added example programs for header and filter synchronization.
dash-spv/tests/* Added extensive async/unit/integration tests for networking, storage, sync, wallet, filter headers, transactions, and overall client behavior.
block_with_pro_reg_tx.data Added raw block data file with ProRegTx for test purposes.
dash-network/src/lib.rs Updated Regtest network magic number constant.
dash/src/blockdata/constants.rs Updated Dash mainnet genesis block header fields (time, bits, nonce).
dash/src/blockdata/transaction/mod.rs Fixed deserialization logic for MnhfSignal special transaction type (non-segwit).
dash/src/blockdata/transaction/special_transaction/coinbase.rs Made merkle_root_quorums field version-aware in coinbase payload encoding/decoding; added regression and versioning tests.
dash/src/blockdata/transaction/special_transaction/mnhf_signal.rs Added new module for MNHF Signal special transaction type and payload, with encoding/decoding and tests.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI/Main
    participant DashSpvClient
    participant NetworkManager
    participant StorageManager
    participant Wallet
    participant SyncManager

    User->>CLI/Main: Run dash-spv with config
    CLI/Main->>DashSpvClient: new(config)
    DashSpvClient->>NetworkManager: connect()
    DashSpvClient->>StorageManager: load headers, state
    DashSpvClient->>Wallet: load watched addresses, UTXOs
    CLI/Main->>DashSpvClient: start()
    DashSpvClient->>SyncManager: sync_to_tip()
    SyncManager->>NetworkManager: request headers/filters
    NetworkManager->>DashSpvClient: deliver network messages
    DashSpvClient->>SyncManager: handle headers/filters
    SyncManager->>StorageManager: store headers/filters
    DashSpvClient->>Wallet: update UTXOs/balances on relevant txs
    DashSpvClient->>CLI/Main: report sync progress
    User->>CLI/Main: Ctrl-C (shutdown)
    CLI/Main->>DashSpvClient: stop()
    DashSpvClient->>NetworkManager: disconnect
    DashSpvClient->>StorageManager: shutdown
Loading

Possibly related PRs

  • dashpay/rust-dashcore#50: Adds new masternode list-related network messages (getmnlistd, mnlistdiff) to Dash core library, which are directly used by the SPV client’s masternode sync features introduced in this PR.

Poem

(\(\
( -.-) A dash of code, a hop, a leap—
o_(")(") Now SPV bunnies never sleep!

With headers, wallets, blocks in tow,
Through network fields we swiftly go.

Modular, async, tests abound—
The Dashy future’s rabbit-bound!

🥕✨


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

PastaPastaPasta and others added 23 commits June 16, 2025 19:36
- Replace placeholder filter_matches_scripts with real BIP158 GCS implementation
- Add comprehensive integration test framework with Docker support
- Implement network monitoring for ChainLocks and InstantLocks with signature verification
- Enhance masternode engine with proper block header feeding and state management
- Add watch item persistence and improved transaction discovery
- Increase filter search range from 50 to 1000 blocks for better coverage
- Enable X11 hashing and BLS signature verification in dependencies
- Add proper error handling and logging throughout the sync pipeline

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Ping and Pong Handling: Added mechanisms to send periodic pings and handle incoming pings/pongs, enhancing network reliability.
- Block Processing: Implemented functions to process new block hashes immediately and manage block headers and filters effectively.
- Filter Headers and Filters: Added logic to handle CFHeaders and CFilter network messages and check them against watch items.
- Logging Enhancements: Improved logging for better traceability, including filter matches and network message receipt.
- Error Handling: Strengthened error handling for network messages and block processing errors.

This update enhances network responsiveness and block synchronization, enabling better SPV client performance."
- Add get_header_height_by_hash() method for O(1) hash-to-height lookups
- Add get_headers_batch() method for efficient bulk header loading
- Implement reverse index in both disk and memory storage
- Add as_any_mut() trait for storage downcasting
- Leverage existing segmented file structure for batch operations

These optimizations enable efficient masternode sync by reducing
individual storage reads from millions to thousands.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Replace inefficient strategy that fed ALL 2.2+ million headers individually
with selective feeding of only required headers:

- Use reverse index for O(1) hash-to-height lookups
- Feed only target, base, and quorum block hashes
- Use batch loading for recent header ranges (~1000 headers)
- Eliminate "Feeding 2278524 block headers" bottleneck

Performance improvement: ~2.2M individual reads → ~1K batch operations

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Implement a status bar showing sync progress at the bottom of the terminal:
- Headers count and filter headers count
- Latest ChainLock height and peer count
- Network name (Dash/Testnet/Regtest)
- Updates every 100ms without interfering with log output

Features:
- Uses crossterm for cross-platform terminal control
- RAII cleanup with TerminalGuard
- Logs stream normally above persistent status bar
- Optional --no-terminal-ui flag to disable

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Add comprehensive terminal UI integration to the SPV client:
- enable_terminal_ui() and get_terminal_ui() methods
- Real-time status updates after network connections
- Status updates after header processing and ChainLock events
- update_status_display() method with storage data integration
- Proper shutdown sequence ensuring storage persistence
- Network configuration getter for UI display

The client now displays live sync progress including header counts
from storage, peer connections, and ChainLock heights.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
CLI improvements:
- Add --no-terminal-ui flag to disable status bar
- Proper terminal UI initialization timing
- Network name display integration
- Remove unused Arc import

Logging improvements:
- Fix log level handling in init_logging()
- Improve tracing-subscriber configuration
- Remove thread IDs for cleaner output

The CLI now provides a modern terminal experience with optional
real-time status display alongside streaming logs.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Small enhancements to header and filter sync:
- Improve logging and error handling
- Better progress reporting during sync operations
- Consistent formatting across sync modules

These changes support the terminal UI integration and provide
better visibility into sync progress.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
PastaPastaPasta and others added 5 commits June 16, 2025 20:44
- Add thread-safe Mutex wrapper around BufReader to prevent race conditions
- Implement sticky peer selection for sync consistency during operations
- Increase peer count limits (2-5 peers) for better network resilience
- Add single-peer message routing for sync operations requiring consistency
- Improve connection error handling and peer disconnection detection
- Add timeout-based message receiving to prevent indefinite blocking
- Reduce log verbosity for common sync messages to improve readability

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Add comprehensive sync state management with timeout detection
- Implement overlapping header handling for improved sync reliability
- Add coordinated message routing between sync managers and main client
- Enhance filter sync with batch processing and progress tracking
- Add sync timeout detection and recovery mechanisms
- Improve masternode sync coordination and state management
- Add detailed sync progress logging and error handling
- Implement proper chain validation during sync operations

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Add centralized network message handling to prevent race conditions
- Implement message routing between monitoring loop and sync operations
- Add comprehensive sync timeout detection and recovery mechanisms
- Enhance filter sync coordination with monitoring loop management
- Add detailed documentation for network message architecture
- Improve sync progress reporting and status updates
- Reduce debug noise from transaction input checking
- Add sync_and_check_filters_with_monitoring method for better coordination

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Add filter header verification test for chain validation
- Enhance multi-peer test with better error handling and timeouts
- Add checksum utility for data integrity verification
- Improve consensus encoding with better error messages
- Add test infrastructure for sync coordination scenarios

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Remove premature finish_sync() calls that were marking header and filter
header synchronization as complete immediately after starting. The sync
should only be marked as finished when handle_*_message() returns false,
indicating actual sync completion.

- Remove finish_sync() calls after starting header sync
- Remove finish_sync() calls after starting filter header sync
- Add sync_state_mut() accessor for proper state management
- Add proper sync completion in client message handlers

This fixes the issue where sync would complete with 0 headers because
the sync state was marked as finished before any headers were processed.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

Comment on lines +288 to +316
for (i, header) in headers.iter().enumerate() {
// Get the previous header for validation
let prev_header = if i == 0 {
// First header in batch - get from storage
let current_tip_height = storage.get_tip_height().await
.map_err(|e| SyncError::SyncFailed(format!("Failed to get tip height: {}", e)))?;

if let Some(height) = current_tip_height {
storage.get_header(height).await
.map_err(|e| SyncError::SyncFailed(format!("Failed to get previous header: {}", e)))?
} else {
None
}
} else {
Some(headers[i - 1])
};

// Validate the header
// tracing::trace!("Validating header {} at index {}", header.block_hash(), i);
// if let Some(prev) = prev_header.as_ref() {
// tracing::trace!("Previous header: {}", prev.block_hash());
// }

self.validation.validate_header(header, prev_header.as_ref())
.map_err(|e| SyncError::SyncFailed(format!("Header validation failed for block {}: {}", header.block_hash(), e)))?;

validated.push(*header);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix ownership errors in validate_headers()

BlockHeader isn’t Copy; dereferencing or moving it out of the slice won’t compile.

-            let prev_header = if i == 0 {
-                /* … */
-            } else {
-                Some(headers[i - 1])
-            };
+            let prev_header = if i == 0 {
+                /* … */
+            } else {
+                Some(headers[i - 1].clone())
+            };-            validated.push(*header);
+            validated.push(header.clone());

(Or carry &BlockHeader throughout and clone only once before storage.)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (i, header) in headers.iter().enumerate() {
// Get the previous header for validation
let prev_header = if i == 0 {
// First header in batch - get from storage
let current_tip_height = storage.get_tip_height().await
.map_err(|e| SyncError::SyncFailed(format!("Failed to get tip height: {}", e)))?;
if let Some(height) = current_tip_height {
storage.get_header(height).await
.map_err(|e| SyncError::SyncFailed(format!("Failed to get previous header: {}", e)))?
} else {
None
}
} else {
Some(headers[i - 1])
};
// Validate the header
// tracing::trace!("Validating header {} at index {}", header.block_hash(), i);
// if let Some(prev) = prev_header.as_ref() {
// tracing::trace!("Previous header: {}", prev.block_hash());
// }
self.validation.validate_header(header, prev_header.as_ref())
.map_err(|e| SyncError::SyncFailed(format!("Header validation failed for block {}: {}", header.block_hash(), e)))?;
validated.push(*header);
}
for (i, header) in headers.iter().enumerate() {
// Get the previous header for validation
let prev_header = if i == 0 {
// First header in batch - get from storage
let current_tip_height = storage.get_tip_height().await
.map_err(|e| SyncError::SyncFailed(format!("Failed to get tip height: {}", e)))?;
if let Some(height) = current_tip_height {
storage.get_header(height).await
.map_err(|e| SyncError::SyncFailed(format!("Failed to get previous header: {}", e)))?
} else {
None
}
} else {
// clone the slice item instead of moving it
Some(headers[i - 1].clone())
};
// Validate the header
self.validation.validate_header(header, prev_header.as_ref())
.map_err(|e| SyncError::SyncFailed(format!(
"Header validation failed for block {}: {}",
header.block_hash(),
e
)))?;
// clone here as well instead of `*header`
validated.push(header.clone());
}
🤖 Prompt for AI Agents
In dash-spv/src/sync/headers.rs around lines 288 to 316, the code attempts to
dereference or move BlockHeader values from a slice, but BlockHeader is not
Copy, causing ownership errors. To fix this, change the code to work with
references (&BlockHeader) throughout the validation loop instead of moving or
copying headers. Only clone the BlockHeader once when pushing it into the
validated vector to maintain ownership rules and avoid compilation errors.

Comment on lines +173 to +343
/// Start reading messages from a peer
async fn start_peer_reader(
addr: SocketAddr,
pool: Arc<ConnectionPool>,
message_tx: mpsc::Sender<(SocketAddr, NetworkMessage)>,
addrv2_handler: Arc<AddrV2Handler>,
shutdown: Arc<AtomicBool>,
) {
tokio::spawn(async move {
log::debug!("Starting peer reader loop for {}", addr);
let mut loop_iteration = 0;

while !shutdown.load(Ordering::Relaxed) {
loop_iteration += 1;
log::trace!("Peer reader loop iteration {} for {}", loop_iteration, addr);

// Check shutdown signal first with detailed logging
if shutdown.load(Ordering::Relaxed) {
log::info!("Breaking peer reader loop for {} - shutdown signal received (iteration {})", addr, loop_iteration);
break;
}

// Get connection
let conn = match pool.get_connection(&addr).await {
Some(conn) => conn,
None => {
log::warn!("Breaking peer reader loop for {} - connection no longer in pool (iteration {})", addr, loop_iteration);
break;
}
};

// Read message with minimal lock time
let msg_result = {
// Try to get a read lock first to check if connection is available
let conn_guard = conn.read().await;
if !conn_guard.is_connected() {
log::warn!("Breaking peer reader loop for {} - connection no longer connected (iteration {})", addr, loop_iteration);
drop(conn_guard);
break;
}
drop(conn_guard);

// Now get write lock only for the duration of the read
let mut conn_guard = conn.write().await;
conn_guard.receive_message().await
};

match msg_result {
Ok(Some(msg)) => {
log::trace!("Received {:?} from {}", msg.cmd(), addr);

// Handle some messages directly
match &msg {
NetworkMessage::SendAddrV2 => {
addrv2_handler.handle_sendaddrv2(addr).await;
continue; // Don't forward to client
}
NetworkMessage::AddrV2(addresses) => {
addrv2_handler.handle_addrv2(addresses.clone()).await;
continue; // Don't forward to client
}
NetworkMessage::GetAddr => {
log::trace!("Received GetAddr from {}, sending known addresses", addr);
// Send our known addresses
let response = addrv2_handler.build_addr_response().await;
let mut conn_guard = conn.write().await;
if let Err(e) = conn_guard.send_message(response).await {
log::error!("Failed to send addr response to {}: {}", addr, e);
}
continue; // Don't forward GetAddr to client
}
NetworkMessage::Ping(nonce) => {
// Handle ping directly
let mut conn_guard = conn.write().await;
if let Err(e) = conn_guard.handle_ping(*nonce).await {
log::error!("Failed to handle ping from {}: {}", addr, e);
// If we can't send pong, connection is likely broken
if matches!(e, NetworkError::ConnectionFailed(_)) {
log::warn!("Breaking peer reader loop for {} - failed to send pong response (iteration {})", addr, loop_iteration);
break;
}
}
continue; // Don't forward ping to client
}
NetworkMessage::Pong(nonce) => {
// Handle pong directly
let mut conn_guard = conn.write().await;
if let Err(e) = conn_guard.handle_pong(*nonce) {
log::error!("Failed to handle pong from {}: {}", addr, e);
}
continue; // Don't forward pong to client
}
NetworkMessage::Version(_) | NetworkMessage::Verack => {
// These are handled during handshake, ignore here
log::trace!("Ignoring handshake message {:?} from {}", msg.cmd(), addr);
continue;
}
NetworkMessage::Addr(_) => {
// Handle legacy addr messages (convert to AddrV2 if needed)
log::trace!("Received legacy addr message from {}", addr);
continue;
}
_ => {
// Forward other messages to client
log::trace!("Forwarding {:?} from {} to client", msg.cmd(), addr);
}
}


// Forward message to client
if message_tx.send((addr, msg)).await.is_err() {
log::warn!("Breaking peer reader loop for {} - failed to send message to client channel (iteration {})", addr, loop_iteration);
break;
}
}
Ok(None) => {
// No message available, brief pause to avoid aggressive polling but stay responsive
time::sleep(MESSAGE_POLL_INTERVAL).await;
}
Err(e) => {
match e {
NetworkError::PeerDisconnected => {
log::info!("Peer {} disconnected", addr);
break;
}
NetworkError::Timeout => {
log::debug!("Timeout reading from {}, continuing...", addr);
continue;
}
_ => {
log::error!("Fatal error reading from {}: {}", addr, e);

// Check if this is a serialization error that might have context
if let NetworkError::Serialization(ref decode_error) = e {
let error_msg = decode_error.to_string();
if error_msg.contains("unknown special transaction type") {
log::warn!("Peer {} sent block with unsupported transaction type: {}", addr, decode_error);
log::error!("BLOCK DECODE FAILURE - Error details: {}", error_msg);
} else if error_msg.contains("Failed to decode transactions for block") {
// The error now includes the block hash
log::error!("Peer {} sent block that failed transaction decoding: {}", addr, decode_error);
// Try to extract the block hash from the error message
if let Some(hash_start) = error_msg.find("block ") {
if let Some(hash_end) = error_msg[hash_start + 6..].find(':') {
let block_hash = &error_msg[hash_start + 6..hash_start + 6 + hash_end];
log::error!("FAILING BLOCK HASH: {}", block_hash);
}
}
} else if error_msg.contains("IO error") {
// This might be our wrapped error - log it prominently
log::error!("BLOCK DECODE FAILURE - IO error (possibly unknown transaction type) from peer {}", addr);
log::error!("Serialization error from {}: {}", addr, decode_error);
} else {
log::error!("Serialization error from {}: {}", addr, decode_error);
}
}

// For other errors, wait a bit then break
tokio::time::sleep(Duration::from_secs(1)).await;
break;
}
}
}
}
}

// Remove from pool
log::warn!("Disconnecting from {} (peer reader loop ended)", addr);
pool.remove_connection(&addr).await;
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Track and await peer-reader tasks

start_peer_reader spawns detached tasks, so shutdown() never waits for them, leaving sockets alive beyond shutdown and causing flappy tests.

Add them to the existing JoinSet:

-        tokio::spawn(async move {
+        let join_handle = tokio::spawn(async move {
             /* reader loop … */
         });
+
+        // push into JoinSet held in `self.tasks`
+        if let Some(set) = pool.task_set() {   // pseudo-helper returning Arc<Mutex<JoinSet<()>>>
+            set.lock().await.spawn(join_handle);
+        }

Or pass a JoinSet reference when spawning.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In dash-spv/src/network/multi_peer.rs between lines 173 and 343, the
start_peer_reader function spawns peer reader tasks without tracking or awaiting
them, causing these tasks to remain alive after shutdown and leading to flappy
tests. Modify the function to accept a mutable reference to a JoinSet and add
each spawned task to this JoinSet instead of spawning detached tasks. This
ensures that shutdown can await all peer reader tasks properly, allowing clean
termination of connections and preventing lingering sockets.

Comment on lines +27 to +35
/// Storage manager for persistence.
storage: Arc<RwLock<dyn StorageManager>>,

/// Set of addresses being watched.
watched_addresses: Arc<RwLock<HashSet<Address>>>,

/// Current UTXO set indexed by outpoint.
utxo_set: Arc<RwLock<HashMap<OutPoint, Utxo>>>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Trait object must be Send + Sync for cross-task use

All accesses to storage occur across tokio tasks; the trait object should be bounded accordingly:

-    storage: Arc<RwLock<dyn StorageManager>>,
+    storage: Arc<RwLock<dyn StorageManager + Send + Sync>>,

Mirror this bound wherever the trait is taken by reference.

🤖 Prompt for AI Agents
In dash-spv/src/wallet/mod.rs around lines 27 to 35, the trait object for
storage must be bounded by Send + Sync to allow safe access across tokio tasks.
Update the declaration of storage to use Arc<RwLock<dyn StorageManager + Send +
Sync>> and ensure that all references to this trait object also include these
bounds to maintain thread safety.

Comment on lines +322 to +333
println!("🐛 BUG REPRODUCTION TEST - This test should FAIL to demonstrate the bug!");

// This test simulates the REAL production scenario that causes crashes:
// - Peer A sends heights 1000-2000
// - Peer B sends heights 1500-2500 (overlapping!)
// Each peer provides different (but potentially valid) previous_filter_header values
//
// The system should handle this gracefully, but currently it crashes.
// This test will FAIL until we implement the fix.

let mut storage = MemoryStorageManager::new().await
.expect("Failed to create storage");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Failing-by-design tests will break CI

The test is intended to fail (panic!("🚨 BUG REPRODUCED …")).
Mark it #[ignore] or #[should_panic] (with an explanatory message) so the suite can pass until the underlying fix lands.

🤖 Prompt for AI Agents
In dash-spv/tests/filter_header_verification_test.rs around lines 322 to 333,
the test is designed to fail to demonstrate a bug but currently causes CI
failures. To fix this, annotate the test function with #[ignore] or
#[should_panic(expected = "explanatory message")] to indicate the failure is
intentional, allowing the test suite to pass until the bug is fixed.

Comment on lines +181 to +188
let initial_headers = create_test_headers_range(1000, 5000); // Headers 1000-4999
storage.store_headers(&initial_headers).await
.expect("Failed to store initial headers");

let tip_height = storage.get_tip_height().await.unwrap().unwrap();
println!("Initial header chain stored: tip height = {}", tip_height);
assert_eq!(tip_height, 4999);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Off-by-one: tip_height assertion is wrong

create_test_headers_range(1000, 5000) generates headers 1000 … 5999 (5000 total), therefore the tip should be 5999, not 4999.
The assertion will always fail.

-assert_eq!(tip_height, 4999);
+assert_eq!(tip_height, 5999);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let initial_headers = create_test_headers_range(1000, 5000); // Headers 1000-4999
storage.store_headers(&initial_headers).await
.expect("Failed to store initial headers");
let tip_height = storage.get_tip_height().await.unwrap().unwrap();
println!("Initial header chain stored: tip height = {}", tip_height);
assert_eq!(tip_height, 4999);
let initial_headers = create_test_headers_range(1000, 5000);
storage.store_headers(&initial_headers).await
.expect("Failed to store initial headers");
let tip_height = storage.get_tip_height().await.unwrap().unwrap();
println!("Initial header chain stored: tip height = {}", tip_height);
assert_eq!(tip_height, 5999);
🤖 Prompt for AI Agents
In dash-spv/tests/filter_header_verification_test.rs around lines 181 to 188,
the assertion for tip_height is off by one because
create_test_headers_range(1000, 5000) generates headers from 1000 to 5999, not
4999. Update the assert_eq line to expect 5999 as the tip height instead of 4999
to correctly reflect the last header in the range.

Comment on lines +231 to +236
let (block_processor_tx, _block_processor_rx) = mpsc::unbounded_channel();

// Create a placeholder wallet - will be properly initialized in start()
let placeholder_storage = Arc::new(RwLock::new(crate::storage::MemoryStorageManager::new().await.map_err(|e| SpvError::Storage(e))?));
let wallet = Arc::new(RwLock::new(crate::wallet::Wallet::new(placeholder_storage)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Wallet is wired to volatile storage even when persistence is enabled

DashSpvClient::new builds the main storage (which may be a DiskStorageManager) but then creates

let placeholder_storage = Arc::new(RwLock::new(MemoryStorageManager::new().await?));
let wallet = Wallet::new(placeholder_storage);

The wallet therefore always uses in-memory storage and will lose data between runs, defeating enable_persistence.
Pass the already-created self.storage (or a wallet-specific on-disk store) instead of a fresh memory backend.

- let placeholder_storage = Arc::new(RwLock::new(...MemoryStorageManager::new()...));
- let wallet = Arc::new(RwLock::new(Wallet::new(placeholder_storage)));
+ let wallet_storage = Arc::clone(&self.storage); // ensure StorageManager is Arc<RwLock<dyn>>
+ let wallet = Arc::new(RwLock::new(Wallet::new(wallet_storage)));

Without this, users will observe empty wallets after restart.

🤖 Prompt for AI Agents
In dash-spv/src/client/mod.rs around lines 231 to 236, the wallet is incorrectly
initialized with a new in-memory storage instead of the existing persistent
storage. To fix this, replace the placeholder in-memory storage with the
already-created persistent storage instance (self.storage or equivalent) when
constructing the wallet, ensuring the wallet uses the correct storage backend
and retains data across restarts.

Comment on lines +920 to +932
// Check if this input spends a UTXO from our watched addresses
if let Ok(Some(spent_utxo)) = self.wallet.read().await.remove_utxo(&input.previous_output).await {
transaction_relevant = true;
let amount = spent_utxo.value();

tracing::info!("💸 Found relevant input: {}:{} spending UTXO {} (value: {})",
txid, vin, input.previous_output, amount);

// Update balance change for this address (subtract)
*balance_changes.entry(spent_utxo.address.clone()).or_insert(0) -= amount.to_sat() as i64;
}

// Also check against explicitly watched outpoints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

RwLock guard is held across .await – code will not compile

Expressions like

self.wallet.read().await.remove_utxo(&input.previous_output).await

hold the RwLockReadGuard returned by read().await across another .await.
The guard is not Send, so the compiler will reject this (borrow of guard across await).
Even if it compiled, it would risk dead-locks by keeping the lock while waiting on I/O.

Apply a pattern that releases the guard before the await, e.g.:

let removed = {
    let wallet = self.wallet.clone();      // Arc clone, no lock yet
    wallet                                 // `Wallet` internal APIs do their own locking
        .remove_utxo(&input.previous_output)
        .await
}?;  // guard free while awaiting

Refactor every similar occurrence (add_utxo, get_balance*, update_confirmation_status, etc.) in this file.
This is a build-blocking issue.

Also applies to: 968-978, 1228-1236, 1284-1294, 1624-1634

🤖 Prompt for AI Agents
In dash-spv/src/client/mod.rs around lines 920 to 932, the code holds an
RwLockReadGuard across an await point when calling
self.wallet.read().await.remove_utxo(...).await, which is not allowed and causes
compilation errors. To fix this, refactor the code to acquire the lock and
extract the needed data before the await, or clone the Arc to the wallet and
call async methods that internally handle locking without holding the guard
across await. Apply this pattern to all similar cases in the file, including
lines 968-978, 1228-1236, 1284-1294, and 1624-1634, ensuring no RwLock guards
are held across await points.

Comment on lines +126 to +134
let mut storage = MemoryStorageManager::new().await.unwrap();
let block_result = processor.process_block(&block, 100, &wallet, &mut storage).await.unwrap();

// Verify block processing results
assert_eq!(block_result.height, 100);
assert_eq!(block_result.relevant_transaction_count, 1);
assert_eq!(block_result.total_utxos_added, 1);
assert_eq!(block_result.total_utxos_spent, 0);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Processor and wallet use different MemoryStorageManager instances

Each process_block call receives a fresh MemoryStorageManager, while the Wallet was constructed with its own storage handle. This means block/chain state written during processing is invisible to the wallet, potentially masking bugs and producing false positives in the tests. Re-use the wallet’s internal storage (e.g. wallet.storage.clone()) for the processor to ensure the same view of chain data.

Also applies to: 198-201

🤖 Prompt for AI Agents
In dash-spv/tests/wallet_integration_test.rs around lines 126 to 134, the
processor and wallet are using different MemoryStorageManager instances, causing
the processor's block/chain state changes to be invisible to the wallet. To fix
this, replace the new MemoryStorageManager instance passed to process_block with
a clone of the wallet's internal storage (wallet.storage.clone()) so both share
the same storage view. Apply the same fix to lines 198-201 as well.

Comment on lines +150 to +158
assert!(!utxo.is_confirmed); // Should start unconfirmed
assert!(!utxo.is_instantlocked);

// Verify wallet state after payment discovery
let final_balance = wallet.get_balance().await.unwrap();
assert_eq!(final_balance.confirmed, Amount::from_sat(payment_amount)); // Will be confirmed due to high mock current height
assert_eq!(final_balance.pending, Amount::ZERO);
assert_eq!(final_balance.instantlocked, Amount::ZERO);
assert_eq!(final_balance.total(), Amount::from_sat(payment_amount));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent confirmation expectations – test will likely fail

You assert !utxo.is_confirmed (line 150) yet immediately afterwards expect the same amount to appear in final_balance.confirmed (line 155). Both cannot be true at the same time with a single-block height of 100. Decide whether the payment should be treated as confirmed or pending and align both assertions accordingly.

🤖 Prompt for AI Agents
In dash-spv/tests/wallet_integration_test.rs around lines 150 to 158, the test
inconsistently asserts that the UTXO is unconfirmed while expecting the final
balance to show the payment amount as confirmed. To fix this, decide if the
payment should be confirmed or pending at the test height, then update the
assertion on utxo.is_confirmed to match the expected final_balance.confirmed
state, ensuring both reflect the same confirmation status.

Comment on lines 420 to 444
// Start masternode sync
let sync_started = self.masternode_sync.start_sync(network, storage).await?;

if !sync_started {
// Already up to date
self.state.finish_sync(SyncComponent::Masternodes);

let final_height = match storage.load_masternode_state().await {
Ok(Some(state)) => state.last_height,
_ => 0,
};

return Ok(SyncProgress {
masternode_height: final_height,
masternodes_synced: true,
..SyncProgress::default()
});
}

// Note: The actual sync now happens through the monitoring loop
// calling handle_mnlistdiff_message() and check_sync_timeout()
tracing::info!("Masternode sync started - will be completed through monitoring loop");

self.state.finish_sync(SyncComponent::Masternodes);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

sync_masternodes marks the component finished immediately

self.state.finish_sync(SyncComponent::Masternodes) is called right after start_sync, even when the sync is still running asynchronously. This clears the “syncing” flag, so is_syncing() will return false while work is in progress, breaking timeout/recovery logic. Defer finish_sync until the masternode sync manager reports completion (e.g. when handle_mnlistdiff_message returns false).

🤖 Prompt for AI Agents
In dash-spv/src/sync/mod.rs around lines 420 to 444, the call to
self.state.finish_sync(SyncComponent::Masternodes) happens immediately after
starting the masternode sync, which incorrectly clears the syncing flag while
the sync is still running asynchronously. To fix this, remove or defer this
finish_sync call here and instead call finish_sync only when the masternode sync
manager signals completion, such as when handle_mnlistdiff_message returns
false, ensuring the syncing state accurately reflects ongoing work and preserves
timeout/recovery logic.

@QuantumExplorer QuantumExplorer merged commit fb6ecde into v0.40-dev Jun 17, 2025
17 checks passed
@QuantumExplorer
Copy link
Member

Good job!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants