Skip to content

Commit a71aef0

Browse files
feat: implement request timeout handling and tracking for network messages
1 parent 9836113 commit a71aef0

File tree

4 files changed

+317
-2
lines changed

4 files changed

+317
-2
lines changed

dash-spv/src/client/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ impl DashSpvClient {
241241
let mut last_status_update = Instant::now();
242242
let status_update_interval = std::time::Duration::from_secs(5);
243243

244+
// Timer for request timeout checking
245+
let mut last_timeout_check = Instant::now();
246+
let timeout_check_interval = std::time::Duration::from_secs(1);
247+
244248
loop {
245249
// Check if we should stop
246250
let running = self.running.read().await;
@@ -303,6 +307,14 @@ impl DashSpvClient {
303307
// Check for sync timeouts and handle recovery
304308
let _ = self.sync_manager.check_sync_timeouts(&mut *self.storage, &mut *self.network).await;
305309

310+
// Check for request timeouts and handle retries
311+
if last_timeout_check.elapsed() >= timeout_check_interval {
312+
if let Err(e) = self.handle_request_timeouts().await {
313+
tracing::error!("Error handling request timeouts: {}", e);
314+
}
315+
last_timeout_check = Instant::now();
316+
}
317+
306318
// Listen for network messages from the per-peer message channel
307319
match self.network.receive_message().await {
308320
Ok(Some(message)) => {
@@ -452,6 +464,11 @@ impl DashSpvClient {
452464
let block_hash = block.header.block_hash();
453465
tracing::info!("Received new block: {}", block_hash);
454466
tracing::debug!("📋 Block {} contains {} transactions", block_hash, block.txdata.len());
467+
468+
// Store this as the last successfully received block
469+
// This helps identify what block comes next when decoding fails
470+
tracing::info!("LAST SUCCESSFUL BLOCK BEFORE POTENTIAL FAILURE: {}", block_hash);
471+
455472
// Process new block (update state, check watched items)
456473
self.process_new_block(block).await?;
457474
}
@@ -1541,4 +1558,21 @@ impl DashSpvClient {
15411558
tracing::info!("✅ Completed post-sync filter header requests for {} new blocks", headers.len());
15421559
Ok(())
15431560
}
1561+
1562+
/// Handle request timeouts by calling the network manager's timeout handler.
1563+
/// This checks for requests that have timed out and initiates retries with exponential backoff.
1564+
async fn handle_request_timeouts(&mut self) -> Result<()> {
1565+
// Downcast the network manager to MultiPeerNetworkManager to access request tracker
1566+
let network = self.network.as_any()
1567+
.downcast_ref::<crate::network::multi_peer::MultiPeerNetworkManager>()
1568+
.ok_or_else(|| SpvError::Config("Network manager does not support request tracking".to_string()))?;
1569+
1570+
// Handle timeouts and retries
1571+
network.handle_request_timeouts().await
1572+
.map_err(|e| SpvError::Network(crate::error::NetworkError::ConnectionFailed(
1573+
format!("Failed to handle request timeouts: {}", e)
1574+
)))?;
1575+
1576+
Ok(())
1577+
}
15441578
}

dash-spv/src/network/connection.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ impl TcpConnection {
194194

195195
// Message received successfully
196196
tracing::trace!("Successfully decoded message from {}: {:?}", self.address, raw_message.payload.cmd());
197+
198+
// Log block messages specifically for debugging
199+
if let NetworkMessage::Block(ref block) = raw_message.payload {
200+
let block_hash = block.block_hash();
201+
tracing::info!("Successfully decoded block {} from {}", block_hash, self.address);
202+
}
203+
197204
Ok(Some(raw_message.payload))
198205
}
199206
Err(encode::Error::Io(ref e)) if e.kind() == std::io::ErrorKind::WouldBlock => {
@@ -226,8 +233,23 @@ impl TcpConnection {
226233
tracing::error!("Failed to decode message from {}: {}", self.address, e);
227234

228235
// Check if this is the specific "unknown special transaction type" error
229-
if e.to_string().contains("unknown special transaction type") {
236+
let error_msg = e.to_string();
237+
if error_msg.contains("unknown special transaction type") {
230238
tracing::warn!("Peer {} sent block with unsupported transaction type: {}", self.address, e);
239+
tracing::error!("BLOCK DECODE FAILURE - Error details: {}", error_msg);
240+
} else if error_msg.contains("Failed to decode transactions for block") {
241+
// Extract block hash from the enhanced error message
242+
tracing::error!("Peer {} sent block that failed transaction decoding: {}", self.address, e);
243+
if let Some(hash_start) = error_msg.find("block ") {
244+
if let Some(hash_end) = error_msg[hash_start + 6..].find(':') {
245+
let block_hash = &error_msg[hash_start + 6..hash_start + 6 + hash_end];
246+
tracing::error!("FAILING BLOCK HASH: {}", block_hash);
247+
}
248+
}
249+
} else if error_msg.contains("IO error") {
250+
// This might be our wrapped error - log it prominently
251+
tracing::error!("BLOCK DECODE FAILURE - IO error (possibly unknown transaction type) from peer {}", self.address);
252+
tracing::error!("Raw error details: {:?}", e);
231253
}
232254

233255
Err(NetworkError::Serialization(e))

dash-spv/src/network/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod multi_peer;
1010
pub mod peer;
1111
pub mod persist;
1212
pub mod pool;
13+
pub mod request_tracker;
1314

1415
#[cfg(test)]
1516
mod tests;

0 commit comments

Comments
 (0)