Skip to content

Commit ffd3d2e

Browse files
authored
fix: Use non-blocking TcpStream in dash-spv::network::TcpConnection (#188)
1 parent f5d4677 commit ffd3d2e

File tree

7 files changed

+54
-134
lines changed

7 files changed

+54
-134
lines changed

dash-spv/src/client/config.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ pub struct ClientConfig {
5656
/// Sync timeout.
5757
pub sync_timeout: Duration,
5858

59-
/// Read timeout for TCP socket operations.
60-
pub read_timeout: Duration,
61-
6259
/// Whether to enable filter syncing.
6360
pub enable_filters: bool,
6461

@@ -206,7 +203,6 @@ impl Default for ClientConfig {
206203
connection_timeout: Duration::from_secs(30),
207204
message_timeout: Duration::from_secs(60),
208205
sync_timeout: Duration::from_secs(300),
209-
read_timeout: Duration::from_millis(100),
210206
enable_filters: true,
211207
enable_masternodes: true,
212208
max_peers: 8,
@@ -326,12 +322,6 @@ impl ClientConfig {
326322
self
327323
}
328324

329-
/// Set read timeout for TCP socket operations.
330-
pub fn with_read_timeout(mut self, timeout: Duration) -> Self {
331-
self.read_timeout = timeout;
332-
self
333-
}
334-
335325
/// Set log level.
336326
pub fn with_log_level(mut self, level: &str) -> Self {
337327
self.log_level = level.to_string();

dash-spv/src/client/config_test.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ mod tests {
2222
assert_eq!(config.connection_timeout, Duration::from_secs(30));
2323
assert_eq!(config.message_timeout, Duration::from_secs(60));
2424
assert_eq!(config.sync_timeout, Duration::from_secs(300));
25-
assert_eq!(config.read_timeout, Duration::from_millis(100));
2625
assert!(config.enable_filters);
2726
assert!(config.enable_masternodes);
2827
assert_eq!(config.max_peers, 8);
@@ -65,7 +64,6 @@ mod tests {
6564
.with_storage_path(path.clone())
6665
.with_validation_mode(ValidationMode::Basic)
6766
.with_connection_timeout(Duration::from_secs(10))
68-
.with_read_timeout(Duration::from_secs(5))
6967
.with_log_level("debug")
7068
.with_max_concurrent_filter_requests(32)
7169
.with_filter_flow_control(false)
@@ -80,7 +78,6 @@ mod tests {
8078
assert!(config.enable_persistence);
8179
assert_eq!(config.validation_mode, ValidationMode::Basic);
8280
assert_eq!(config.connection_timeout, Duration::from_secs(10));
83-
assert_eq!(config.read_timeout, Duration::from_secs(5));
8481
assert_eq!(config.log_level, "debug");
8582
assert_eq!(config.max_concurrent_filter_requests, 32);
8683
assert!(!config.enable_filter_flow_control);

dash-spv/src/network/connection.rs

Lines changed: 48 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
//! TCP connection management.
22
33
use std::collections::HashMap;
4-
use std::io::{BufReader, Read, Write};
5-
use std::net::{SocketAddr, TcpStream};
4+
use std::net::SocketAddr;
65
use std::sync::Arc;
76
use std::time::{Duration, SystemTime};
7+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
8+
use tokio::net::TcpStream;
89
use tokio::sync::Mutex;
910

1011
use dashcore::consensus::{encode, Decodable};
@@ -18,7 +19,6 @@ use crate::types::PeerInfo;
1819
/// Internal state for the TCP connection
1920
struct ConnectionState {
2021
stream: TcpStream,
21-
read_buffer: BufReader<TcpStream>,
2222
// Stateful message framing buffer to ensure full frames before decoding
2323
framing_buffer: Vec<u8>,
2424
}
@@ -30,7 +30,6 @@ pub struct TcpConnection {
3030
// This ensures no concurrent access to the underlying socket
3131
state: Option<Arc<Mutex<ConnectionState>>>,
3232
timeout: Duration,
33-
read_timeout: Duration,
3433
connected_at: Option<SystemTime>,
3534
bytes_sent: u64,
3635
network: Network,
@@ -56,17 +55,11 @@ impl TcpConnection {
5655
self.address
5756
}
5857
/// Create a new TCP connection to the given address.
59-
pub fn new(
60-
address: SocketAddr,
61-
timeout: Duration,
62-
read_timeout: Duration,
63-
network: Network,
64-
) -> Self {
58+
pub fn new(address: SocketAddr, timeout: Duration, network: Network) -> Self {
6559
Self {
6660
address,
6761
state: None,
6862
timeout,
69-
read_timeout,
7063
connected_at: None,
7164
bytes_sent: 0,
7265
network,
@@ -88,47 +81,32 @@ impl TcpConnection {
8881
pub async fn connect(
8982
address: SocketAddr,
9083
timeout_secs: u64,
91-
read_timeout: Duration,
9284
network: Network,
9385
) -> NetworkResult<Self> {
9486
let timeout = Duration::from_secs(timeout_secs);
9587

96-
let stream = TcpStream::connect_timeout(&address, timeout).map_err(|e| {
97-
NetworkError::ConnectionFailed(format!("Failed to connect to {}: {}", address, e))
98-
})?;
88+
let stream = tokio::time::timeout(timeout, TcpStream::connect(address))
89+
.await
90+
.map_err(|_| {
91+
NetworkError::ConnectionFailed(format!("Connection to {} timed out", address))
92+
})?
93+
.map_err(|e| {
94+
NetworkError::ConnectionFailed(format!("Failed to connect to {}: {}", address, e))
95+
})?;
9996

10097
stream.set_nodelay(true).map_err(|e| {
10198
NetworkError::ConnectionFailed(format!("Failed to set TCP_NODELAY: {}", e))
10299
})?;
103100

104-
// CRITICAL: Read timeout configuration affects message integrity
105-
//
106-
// WARNING: Timeout values below 100ms risk TCP partial reads causing
107-
// corrupted message framing and checksum validation failures.
108-
// See git commit 16d55f09 for historical context.
109-
//
110-
// Set a read timeout instead of non-blocking mode
111-
// This allows us to return None when no data is available
112-
stream.set_read_timeout(Some(read_timeout)).map_err(|e| {
113-
NetworkError::ConnectionFailed(format!("Failed to set read timeout: {}", e))
114-
})?;
115-
116-
// Clone the stream for the BufReader
117-
let read_stream = stream.try_clone().map_err(|e| {
118-
NetworkError::ConnectionFailed(format!("Failed to clone stream: {}", e))
119-
})?;
120-
121101
let state = ConnectionState {
122102
stream,
123-
read_buffer: BufReader::new(read_stream),
124103
framing_buffer: Vec::new(),
125104
};
126105

127106
Ok(Self {
128107
address,
129108
state: Some(Arc::new(Mutex::new(state))),
130109
timeout,
131-
read_timeout,
132110
connected_at: Some(SystemTime::now()),
133111
bytes_sent: 0,
134112
network,
@@ -148,47 +126,25 @@ impl TcpConnection {
148126

149127
/// Connect to the peer (instance method for compatibility).
150128
pub async fn connect_instance(&mut self) -> NetworkResult<()> {
151-
let stream = TcpStream::connect_timeout(&self.address, self.timeout).map_err(|e| {
152-
NetworkError::ConnectionFailed(format!("Failed to connect to {}: {}", self.address, e))
153-
})?;
154-
155-
// Don't set socket timeouts - we handle timeouts at the application level
156-
// and socket timeouts can interfere with async operations
129+
let stream = tokio::time::timeout(self.timeout, TcpStream::connect(self.address))
130+
.await
131+
.map_err(|_| {
132+
NetworkError::ConnectionFailed(format!("Connection to {} timed out", self.address))
133+
})?
134+
.map_err(|e| {
135+
NetworkError::ConnectionFailed(format!(
136+
"Failed to connect to {}: {}",
137+
self.address, e
138+
))
139+
})?;
157140

158141
// Disable Nagle's algorithm for lower latency
159142
stream.set_nodelay(true).map_err(|e| {
160143
NetworkError::ConnectionFailed(format!("Failed to set TCP_NODELAY: {}", e))
161144
})?;
162145

163-
// CRITICAL: Read timeout configuration affects message integrity
164-
//
165-
// WARNING: DO NOT MODIFY TIMEOUT VALUES WITHOUT UNDERSTANDING THE IMPLICATIONS
166-
//
167-
// Previous bug (git commit 16d55f09): 15ms timeout caused TCP partial reads
168-
// leading to corrupted message framing and checksum validation failures
169-
// with debug output like: "CHECKSUM DEBUG: len=2, checksum=[15, 1d, fc, 66]"
170-
//
171-
// The timeout must be long enough to receive complete network messages
172-
// but short enough to maintain responsiveness. 100ms is the tested value
173-
// that balances performance with correctness.
174-
//
175-
// TODO: Future refactor should eliminate this duplication by having
176-
// connect_instance() delegate to connect() or use a shared connection setup method
177-
//
178-
// Set a read timeout instead of non-blocking mode
179-
// This allows us to return None when no data is available
180-
stream.set_read_timeout(Some(self.read_timeout)).map_err(|e| {
181-
NetworkError::ConnectionFailed(format!("Failed to set read timeout: {}", e))
182-
})?;
183-
184-
// Clone stream for reading
185-
let read_stream = stream.try_clone().map_err(|e| {
186-
NetworkError::ConnectionFailed(format!("Failed to clone stream: {}", e))
187-
})?;
188-
189146
let state = ConnectionState {
190147
stream,
191-
read_buffer: BufReader::new(read_stream),
192148
framing_buffer: Vec::new(),
193149
};
194150

@@ -204,8 +160,8 @@ impl TcpConnection {
204160
pub async fn disconnect(&mut self) -> NetworkResult<()> {
205161
if let Some(state_arc) = self.state.take() {
206162
if let Ok(state_mutex) = Arc::try_unwrap(state_arc) {
207-
let state = state_mutex.into_inner();
208-
let _ = state.stream.shutdown(std::net::Shutdown::Both);
163+
let mut state = state_mutex.into_inner();
164+
let _ = state.stream.shutdown().await;
209165
}
210166
}
211167
self.connected_at = None;
@@ -316,6 +272,19 @@ impl TcpConnection {
316272
);
317273
}
318274

275+
/// Helper function to read some bytes into the framing buffer.
276+
async fn read_some(state: &mut ConnectionState) -> std::io::Result<usize> {
277+
let mut tmp = [0u8; 8192];
278+
match state.stream.read(&mut tmp).await {
279+
Ok(0) => Ok(0),
280+
Ok(n) => {
281+
state.framing_buffer.extend_from_slice(&tmp[..n]);
282+
Ok(n)
283+
}
284+
Err(e) => Err(e),
285+
}
286+
}
287+
319288
/// Send a message to the peer.
320289
pub async fn send_message(&mut self, message: NetworkMessage) -> NetworkResult<()> {
321290
let state_arc = self
@@ -352,24 +321,16 @@ impl TcpConnection {
352321
let mut state = state_arc.lock().await;
353322

354323
// Write with error handling
355-
match state.stream.write_all(&serialized) {
324+
match state.stream.write_all(&serialized).await {
356325
Ok(_) => {
357326
// Flush to ensure data is sent immediately
358-
if let Err(e) = state.stream.flush() {
359-
if e.kind() != std::io::ErrorKind::WouldBlock {
360-
tracing::warn!("Failed to flush socket {}: {}", self.address, e);
361-
}
327+
if let Err(e) = state.stream.flush().await {
328+
tracing::warn!("Failed to flush socket {}: {}", self.address, e);
362329
}
363330
self.bytes_sent += serialized.len() as u64;
364331
tracing::debug!("Sent message to {}: {:?}", self.address, raw_message.payload);
365332
Ok(())
366333
}
367-
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
368-
// For non-blocking writes that would block, we could retry later
369-
// For now, treat as a temporary failure
370-
tracing::debug!("Write would block to {}, socket buffer may be full", self.address);
371-
Err(NetworkError::Timeout)
372-
}
373334
Err(e) => {
374335
tracing::warn!("Disconnecting {} due to write error: {}", self.address, e);
375336
// Drop the lock before clearing connection state
@@ -398,27 +359,14 @@ impl TcpConnection {
398359
const HEADER_LEN: usize = 24; // magic[4] + cmd[12] + length[4] + checksum[4]
399360
const MAX_RESYNC_STEPS_PER_CALL: usize = 64;
400361

401-
let result = (|| -> NetworkResult<Option<NetworkMessage>> {
402-
// Helper: try to read some bytes into framing buffer
403-
let read_some = |state: &mut ConnectionState| -> std::io::Result<usize> {
404-
let mut tmp = [0u8; 8192];
405-
match state.read_buffer.read(&mut tmp) {
406-
Ok(0) => Ok(0),
407-
Ok(n) => {
408-
state.framing_buffer.extend_from_slice(&tmp[..n]);
409-
Ok(n)
410-
}
411-
Err(e) => Err(e),
412-
}
413-
};
414-
362+
let result = async {
415363
let magic_bytes = self.network.magic().to_le_bytes();
416364
let mut resync_steps = 0usize;
417365

418366
loop {
419367
// Ensure header availability
420368
if state.framing_buffer.len() < HEADER_LEN {
421-
match read_some(&mut state) {
369+
match Self::read_some(&mut state).await {
422370
Ok(0) => {
423371
tracing::info!("Peer {} closed connection (EOF)", self.address);
424372
return Err(NetworkError::PeerDisconnected);
@@ -482,7 +430,7 @@ impl TcpConnection {
482430
}
483431
}
484432
// Need more data
485-
match read_some(&mut state) {
433+
match Self::read_some(&mut state).await {
486434
Ok(0) => {
487435
tracing::info!("Peer {} closed connection (EOF)", self.address);
488436
return Err(NetworkError::PeerDisconnected);
@@ -507,7 +455,7 @@ impl TcpConnection {
507455

508456
// Ensure full header
509457
if state.framing_buffer.len() < HEADER_LEN {
510-
match read_some(&mut state) {
458+
match Self::read_some(&mut state).await {
511459
Ok(0) => {
512460
tracing::info!("Peer {} closed connection (EOF)", self.address);
513461
return Err(NetworkError::PeerDisconnected);
@@ -561,7 +509,7 @@ impl TcpConnection {
561509

562510
// Ensure full frame available
563511
if state.framing_buffer.len() < total_len {
564-
match read_some(&mut state) {
512+
match Self::read_some(&mut state).await {
565513
Ok(0) => {
566514
tracing::info!("Peer {} closed connection (EOF)", self.address);
567515
return Err(NetworkError::PeerDisconnected);
@@ -681,7 +629,8 @@ impl TcpConnection {
681629
}
682630
}
683631
}
684-
})();
632+
}
633+
.await;
685634

686635
// Drop the lock before disconnecting
687636
drop(state);

dash-spv/src/network/manager.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ pub struct PeerNetworkManager {
6262
mempool_strategy: MempoolStrategy,
6363
/// Last peer that sent us a message
6464
last_message_peer: Arc<Mutex<Option<SocketAddr>>>,
65-
/// Read timeout for TCP connections
66-
read_timeout: Duration,
6765
/// Track which peers have sent us Headers2 messages
6866
peers_sent_headers2: Arc<Mutex<HashSet<SocketAddr>>>,
6967
/// Optional user agent to advertise
@@ -121,7 +119,6 @@ impl PeerNetworkManager {
121119
data_dir,
122120
mempool_strategy: config.mempool_strategy,
123121
last_message_peer: Arc::new(Mutex::new(None)),
124-
read_timeout: config.read_timeout,
125122
peers_sent_headers2: Arc::new(Mutex::new(HashSet::new())),
126123
user_agent: config.user_agent.clone(),
127124
exclusive_mode,
@@ -210,7 +207,6 @@ impl PeerNetworkManager {
210207
let shutdown = self.shutdown.clone();
211208
let reputation_manager = self.reputation_manager.clone();
212209
let mempool_strategy = self.mempool_strategy;
213-
let read_timeout = self.read_timeout;
214210
let user_agent = self.user_agent.clone();
215211
let connected_peer_count = self.connected_peer_count.clone();
216212

@@ -219,9 +215,7 @@ impl PeerNetworkManager {
219215
tasks.spawn(async move {
220216
log::debug!("Attempting to connect to {}", addr);
221217

222-
match TcpConnection::connect(addr, CONNECTION_TIMEOUT.as_secs(), read_timeout, network)
223-
.await
224-
{
218+
match TcpConnection::connect(addr, CONNECTION_TIMEOUT.as_secs(), network).await {
225219
Ok(mut conn) => {
226220
// Perform handshake
227221
let mut handshake_manager =
@@ -999,7 +993,6 @@ impl Clone for PeerNetworkManager {
999993
data_dir: self.data_dir.clone(),
1000994
mempool_strategy: self.mempool_strategy,
1001995
last_message_peer: self.last_message_peer.clone(),
1002-
read_timeout: self.read_timeout,
1003996
peers_sent_headers2: self.peers_sent_headers2.clone(),
1004997
user_agent: self.user_agent.clone(),
1005998
exclusive_mode: self.exclusive_mode,

0 commit comments

Comments
 (0)