Skip to content

Commit a605406

Browse files
committed
Squashed commit of the following:
commit 2e7ea89 Author: xdustinface <[email protected]> Date: Sun Nov 16 23:51:16 2025 +1000 fix: Use non-blocking `TcpStream` in `dash-spv::network::TcpConnection`
1 parent f5d4677 commit a605406

File tree

16 files changed

+352
-399
lines changed

16 files changed

+352
-399
lines changed

dash-spv-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dashcore = { path = "../dash", package = "dashcore" }
1717
libc = "0.2"
1818
once_cell = "1.19"
1919
tokio = { version = "1", features = ["full"] }
20+
tokio-util = "0.7"
2021
serde = { version = "1.0", features = ["derive"] }
2122
serde_json = "1.0"
2223
log = "0.4"

dash-spv-ffi/src/client.rs

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ use std::collections::HashMap;
1717
use std::ffi::{CStr, CString};
1818
use std::os::raw::{c_char, c_void};
1919
use std::str::FromStr;
20-
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20+
use std::sync::atomic::{AtomicU64, Ordering};
2121
use std::sync::{Arc, Mutex};
2222
use std::time::Duration;
2323
use tokio::runtime::Runtime;
2424
use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver};
25+
use tokio_util::sync::CancellationToken;
2526

2627
/// Global callback registry for thread-safe callback management
2728
static CALLBACK_REGISTRY: Lazy<Arc<Mutex<CallbackRegistry>>> =
@@ -104,12 +105,6 @@ struct SyncCallbackData {
104105
_marker: std::marker::PhantomData<()>,
105106
}
106107

107-
async fn wait_for_shutdown_signal(signal: Arc<AtomicBool>) {
108-
while !signal.load(Ordering::Relaxed) {
109-
tokio::time::sleep(Duration::from_millis(50)).await;
110-
}
111-
}
112-
113108
/// FFIDashSpvClient structure
114109
type InnerClient = DashSpvClient<
115110
key_wallet_manager::wallet_manager::WalletManager<
@@ -126,7 +121,7 @@ pub struct FFIDashSpvClient {
126121
event_callbacks: Arc<Mutex<FFIEventCallbacks>>,
127122
active_threads: Arc<Mutex<Vec<std::thread::JoinHandle<()>>>>,
128123
sync_callbacks: Arc<Mutex<Option<SyncCallbackData>>>,
129-
shutdown_signal: Arc<AtomicBool>,
124+
shutdown_token: CancellationToken,
130125
// Stored event receiver for pull-based draining (no background thread by default)
131126
event_rx: Arc<Mutex<Option<UnboundedReceiver<dash_spv::types::SpvEvent>>>>,
132127
}
@@ -197,7 +192,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new(
197192
event_callbacks: Arc::new(Mutex::new(FFIEventCallbacks::default())),
198193
active_threads: Arc::new(Mutex::new(Vec::new())),
199194
sync_callbacks: Arc::new(Mutex::new(None)),
200-
shutdown_signal: Arc::new(AtomicBool::new(false)),
195+
shutdown_token: CancellationToken::new(),
201196
event_rx: Arc::new(Mutex::new(None)),
202197
};
203198
Box::into_raw(Box::new(ffi_client))
@@ -378,8 +373,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_drain_events(client: *mut FFIDashSp
378373
FFIErrorCode::Success as i32
379374
}
380375

381-
fn stop_client_internal(client: &FFIDashSpvClient) -> Result<(), dash_spv::SpvError> {
382-
client.shutdown_signal.store(true, Ordering::Relaxed);
376+
fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> {
377+
client.shutdown_token.cancel();
383378

384379
// Ensure callbacks are cleared so no further progress/completion notifications fire.
385380
{
@@ -411,7 +406,7 @@ fn stop_client_internal(client: &FFIDashSpvClient) -> Result<(), dash_spv::SpvEr
411406
res
412407
});
413408

414-
client.shutdown_signal.store(false, Ordering::Relaxed);
409+
client.shutdown_token = CancellationToken::new();
415410

416411
result
417412
}
@@ -525,7 +520,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_start(client: *mut FFIDashSpvClient
525520
pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) -> i32 {
526521
null_check!(client);
527522

528-
let client = &(*client);
523+
let client = &mut (*client);
529524
match stop_client_internal(client) {
530525
Ok(()) => FFIErrorCode::Success as i32,
531526
Err(e) => {
@@ -785,7 +780,6 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
785780
let inner = client.inner.clone();
786781
let runtime = client.runtime.clone();
787782
let sync_callbacks = client.sync_callbacks.clone();
788-
let shutdown_signal = client.shutdown_signal.clone();
789783

790784
// Take progress receiver from client
791785
let progress_receiver = {
@@ -797,7 +791,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
797791
if let Some(mut receiver) = progress_receiver {
798792
let runtime_handle = runtime.handle().clone();
799793
let sync_callbacks_clone = sync_callbacks.clone();
800-
let shutdown_signal_clone = shutdown_signal.clone();
794+
let shutdown_token_monitor = client.shutdown_token.clone();
801795

802796
let handle = std::thread::spawn(move || {
803797
runtime_handle.block_on(async move {
@@ -859,7 +853,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
859853
None => break,
860854
}
861855
}
862-
_ = wait_for_shutdown_signal(shutdown_signal_clone.clone()) => {
856+
_ = shutdown_token_monitor.cancelled() => {
863857
break;
864858
}
865859
}
@@ -874,15 +868,12 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
874868
// Spawn sync task in a separate thread with safe callback access
875869
let runtime_handle = runtime.handle().clone();
876870
let sync_callbacks_clone = sync_callbacks.clone();
877-
let shutdown_signal_for_thread = shutdown_signal.clone();
878-
let stop_triggered_for_thread = Arc::new(AtomicBool::new(false));
871+
let shutdown_token_sync = client.shutdown_token.clone();
879872
let sync_handle = std::thread::spawn(move || {
880-
let stop_triggered_for_callback = stop_triggered_for_thread.clone();
873+
let shutdown_token_callback = shutdown_token_sync.clone();
881874
// Run monitoring loop
882875
let monitor_result = runtime_handle.block_on({
883876
let inner = inner.clone();
884-
let shutdown_signal_for_thread = shutdown_signal_for_thread.clone();
885-
let stop_triggered_for_thread = stop_triggered_for_callback.clone();
886877
async move {
887878
let mut spv_client = {
888879
let mut guard = inner.lock().unwrap();
@@ -903,8 +894,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
903894
Ok(inner) => inner,
904895
Err(_) => Ok(()),
905896
},
906-
_ = wait_for_shutdown_signal(shutdown_signal_for_thread.clone()) => {
907-
stop_triggered_for_thread.store(true, Ordering::Relaxed);
897+
_ = shutdown_token_sync.cancelled() => {
908898
abort_handle.abort();
909899
match monitor_future.as_mut().await {
910900
Ok(inner) => inner,
@@ -930,7 +920,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
930920
..
931921
}) = registry.unregister(callback_data.callback_id)
932922
{
933-
if stop_triggered_for_callback.load(Ordering::Relaxed) {
923+
if shutdown_token_callback.is_cancelled() {
934924
let msg = CString::new("Sync stopped by request").unwrap_or_else(|_| {
935925
CString::new("Sync stopped").expect("hardcoded string is safe")
936926
});
@@ -984,7 +974,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
984974
pub unsafe extern "C" fn dash_spv_ffi_client_cancel_sync(client: *mut FFIDashSpvClient) -> i32 {
985975
null_check!(client);
986976

987-
let client = &(*client);
977+
let client = &mut (*client);
988978

989979
match stop_client_internal(client) {
990980
Ok(()) => FFIErrorCode::Success as i32,
@@ -1318,8 +1308,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie
13181308
if !client.is_null() {
13191309
let client = Box::from_raw(client);
13201310

1321-
// Set shutdown signal to stop all threads
1322-
client.shutdown_signal.store(true, Ordering::Relaxed);
1311+
// Cancel shutdown token to stop all threads
1312+
client.shutdown_token.cancel();
13231313

13241314
// Clean up any registered callbacks
13251315
if let Some(ref callback_data) = *client.sync_callbacks.lock().unwrap() {

dash-spv/ARCHITECTURE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ The network module handles all P2P communication with the Dash network.
680680
- Graceful shutdown
681681

682682
**Complex Types Used**:
683-
- `HashMap<PeerId, Arc<TcpConnection>>` - **JUSTIFIED**: Efficient peer lookup
683+
- `HashMap<PeerId, Arc<Peer>>` - **JUSTIFIED**: Efficient peer lookup
684684
- Multiple tokio::sync primitives - **JUSTIFIED**: Complex concurrent operations
685685

686686
**Critical Issues**:

dash-spv/CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ TCP-based networking with proper Dash protocol implementation:
116116
- **DNS-first peer discovery**: Automatically uses DNS seeds (`dnsseed.dash.org`, `testnet-seed.dashdot.io`) when no explicit peers are configured
117117
- **Immediate startup**: No delay for initial peer discovery (10-second delay only for subsequent searches)
118118
- **Exclusive mode**: When explicit peers are provided, uses only those peers (no DNS discovery)
119-
- Connection management via `TcpConnection`
119+
- Connection management via `Peer`
120120
- Handshake handling via `HandshakeManager`
121121
- Message routing via `MessageHandler`
122122
- Peer support via `PeerNetworkManager`

dash-spv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ clap = { version = "4.0", features = ["derive"] }
2323

2424
# Async runtime
2525
tokio = { version = "1.0", features = ["full"] }
26+
tokio-util = "0.7"
2627
async-trait = "0.1"
2728

2829
# Error handling

dash-spv/src/client/config.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ pub struct ClientConfig {
5656
/// Sync timeout.
5757
pub sync_timeout: Duration,
5858

59-
/// Read timeout for TCP socket operations.
60-
pub read_timeout: Duration,
61-
6259
/// Whether to enable filter syncing.
6360
pub enable_filters: bool,
6461

@@ -206,7 +203,6 @@ impl Default for ClientConfig {
206203
connection_timeout: Duration::from_secs(30),
207204
message_timeout: Duration::from_secs(60),
208205
sync_timeout: Duration::from_secs(300),
209-
read_timeout: Duration::from_millis(100),
210206
enable_filters: true,
211207
enable_masternodes: true,
212208
max_peers: 8,
@@ -326,12 +322,6 @@ impl ClientConfig {
326322
self
327323
}
328324

329-
/// Set read timeout for TCP socket operations.
330-
pub fn with_read_timeout(mut self, timeout: Duration) -> Self {
331-
self.read_timeout = timeout;
332-
self
333-
}
334-
335325
/// Set log level.
336326
pub fn with_log_level(mut self, level: &str) -> Self {
337327
self.log_level = level.to_string();

dash-spv/src/client/config_test.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ mod tests {
2222
assert_eq!(config.connection_timeout, Duration::from_secs(30));
2323
assert_eq!(config.message_timeout, Duration::from_secs(60));
2424
assert_eq!(config.sync_timeout, Duration::from_secs(300));
25-
assert_eq!(config.read_timeout, Duration::from_millis(100));
2625
assert!(config.enable_filters);
2726
assert!(config.enable_masternodes);
2827
assert_eq!(config.max_peers, 8);
@@ -65,7 +64,6 @@ mod tests {
6564
.with_storage_path(path.clone())
6665
.with_validation_mode(ValidationMode::Basic)
6766
.with_connection_timeout(Duration::from_secs(10))
68-
.with_read_timeout(Duration::from_secs(5))
6967
.with_log_level("debug")
7068
.with_max_concurrent_filter_requests(32)
7169
.with_filter_flow_control(false)
@@ -80,7 +78,6 @@ mod tests {
8078
assert!(config.enable_persistence);
8179
assert_eq!(config.validation_mode, ValidationMode::Basic);
8280
assert_eq!(config.connection_timeout, Duration::from_secs(10));
83-
assert_eq!(config.read_timeout, Duration::from_secs(5));
8481
assert_eq!(config.log_level, "debug");
8582
assert_eq!(config.max_concurrent_filter_requests, 32);
8683
assert!(!config.enable_filter_flow_control);

dash-spv/src/network/handshake.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use dashcore::Network;
1212

1313
use crate::client::config::MempoolStrategy;
1414
use crate::error::{NetworkError, NetworkResult};
15-
use crate::network::connection::TcpConnection;
15+
use crate::network::peer::Peer;
1616

1717
/// Handshake state.
1818
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -65,7 +65,7 @@ impl HandshakeManager {
6565
}
6666

6767
/// Perform the handshake with a peer.
68-
pub async fn perform_handshake(&mut self, connection: &mut TcpConnection) -> NetworkResult<()> {
68+
pub async fn perform_handshake(&mut self, connection: &mut Peer) -> NetworkResult<()> {
6969
use tokio::time::{timeout, Duration};
7070

7171
// Send version message
@@ -145,7 +145,7 @@ impl HandshakeManager {
145145
/// Handle a handshake message.
146146
async fn handle_handshake_message(
147147
&mut self,
148-
connection: &mut TcpConnection,
148+
connection: &mut Peer,
149149
message: NetworkMessage,
150150
) -> NetworkResult<Option<HandshakeState>> {
151151
match message {
@@ -238,7 +238,7 @@ impl HandshakeManager {
238238
}
239239

240240
/// Send version message.
241-
async fn send_version(&mut self, connection: &mut TcpConnection) -> NetworkResult<()> {
241+
async fn send_version(&mut self, connection: &mut Peer) -> NetworkResult<()> {
242242
let version_message = self.build_version_message(connection.peer_info().address)?;
243243
connection.send_message(NetworkMessage::Version(version_message)).await?;
244244
tracing::debug!("Sent version message");
@@ -309,7 +309,7 @@ impl HandshakeManager {
309309
}
310310

311311
/// Negotiate headers2 support with the peer after handshake completion.
312-
async fn negotiate_headers2(&self, connection: &mut TcpConnection) -> NetworkResult<()> {
312+
async fn negotiate_headers2(&self, connection: &mut Peer) -> NetworkResult<()> {
313313
// Headers2 is currently disabled due to protocol compatibility issues
314314
// Always send SendHeaders regardless of peer support
315315
tracing::info!("Headers2 is disabled - sending SendHeaders only");

0 commit comments

Comments
 (0)