Skip to content

Commit a7f38de

Browse files
authored
Introduce QUIC connection lifecycle tracing and dead connection pruning (#50)
* Introduce verbose QUIC connection lifecycle tracing Log close_reason when detecting dead connections on client side and when server-side stream accept loop terminates. Log when server replaces a validator connection during re-handshake. * Introduce dead connection pruning and synapse-level query tracing Investigate systematic QUIC synapse query failures by adding diagnostic tracing to send_synapse_packet and server dispatch paths. Resolve stale connections persisting in the registry by pruning dead connections during update_miner_registry, and report actual connection health status in get_connection_stats. * Resolve all clippy warnings and replace fixed sleep with bounded poll Eliminate field_reassign_with_default in client config tests using struct update syntax. Replace unnecessary_get_then_check in registry with contains_key. Reorder signing.rs to place feature-gated items before test module. Replace unconditional 5s sleep in integration test with bounded poll loop that retries at 250ms intervals with a 10s deadline.
1 parent 5f879e0 commit a7f38de

File tree

6 files changed

+312
-58
lines changed

6 files changed

+312
-58
lines changed

crates/btlightning/src/client.rs

Lines changed: 91 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use std::time::Duration;
2121
use tokio::sync::RwLock;
2222
use tokio::time::Instant;
23-
use tracing::{error, info, instrument, warn};
23+
use tracing::{debug, error, info, instrument, warn};
2424

2525
#[cfg(feature = "subtensor")]
2626
use crate::metagraph::{Metagraph, MetagraphMonitorConfig};
@@ -556,9 +556,26 @@ impl LightningClient {
556556
let max_fp = self.config.max_frame_payload_bytes;
557557
match connection {
558558
Some(conn) if conn.close_reason().is_none() => {
559+
debug!(
560+
addr = %addr_key,
561+
stable_id = conn.stable_id(),
562+
"query_axon: connection alive, sending synapse"
563+
);
559564
send_synapse_packet(&conn, request, max_fp).await
560565
}
561-
_ => {
566+
Some(conn) => {
567+
let reason = conn.close_reason();
568+
warn!(
569+
addr = %addr_key,
570+
stable_id = conn.stable_id(),
571+
close_reason = ?reason,
572+
"QUIC connection closed, triggering reconnect"
573+
);
574+
self.try_reconnect_and_query(&addr_key, &axon_info, request)
575+
.await
576+
}
577+
None => {
578+
debug!(addr = %addr_key, "query_axon: no connection in registry");
562579
self.try_reconnect_and_query(&addr_key, &axon_info, request)
563580
.await
564581
}
@@ -605,7 +622,17 @@ impl LightningClient {
605622
)
606623
.await
607624
}
608-
_ => {
625+
Some(conn) => {
626+
let reason = conn.close_reason();
627+
warn!(
628+
addr = %addr_key,
629+
close_reason = ?reason,
630+
"QUIC connection closed, triggering reconnect (stream)"
631+
);
632+
self.try_reconnect_and_stream(&addr_key, &axon_info, request)
633+
.await
634+
}
635+
None => {
609636
self.try_reconnect_and_stream(&addr_key, &axon_info, request)
610637
.await
611638
}
@@ -843,7 +870,17 @@ impl LightningClient {
843870
);
844871

845872
for addr_key in state.registry.connection_addrs() {
846-
stats.insert(format!("connection_{}", addr_key), "active".to_string());
873+
let status = match state.registry.get_connection(addr_key) {
874+
Some(conn) => {
875+
if let Some(reason) = conn.close_reason() {
876+
format!("closed({:?})", reason)
877+
} else {
878+
"active".to_string()
879+
}
880+
}
881+
None => "missing".to_string(),
882+
};
883+
stats.insert(format!("connection_{}", addr_key), status);
847884
}
848885

849886
Ok(stats)
@@ -1078,6 +1115,30 @@ async fn update_miner_registry_inner(
10781115
}
10791116
}
10801117

1118+
let dead_addrs: Vec<PeerAddr> = active_addrs
1119+
.iter()
1120+
.filter(|addr| {
1121+
st.registry
1122+
.get_connection(addr)
1123+
.is_some_and(|c| c.close_reason().is_some())
1124+
})
1125+
.cloned()
1126+
.collect();
1127+
for addr_key in &dead_addrs {
1128+
if let Some(conn) = st.registry.remove_connection(addr_key) {
1129+
let hotkeys = st.registry.hotkeys_at_addr(addr_key);
1130+
info!(
1131+
addr = %addr_key,
1132+
close_reason = ?conn.close_reason(),
1133+
hotkeys = ?hotkeys,
1134+
"Pruning dead connection and deregistering miners"
1135+
);
1136+
for hk in &hotkeys {
1137+
st.registry.deregister(hk);
1138+
}
1139+
}
1140+
}
1141+
10811142
for new_miner in new_by_hotkey.values() {
10821143
if let Some(old_miner) = st.registry.active_miner(&new_miner.hotkey) {
10831144
let old_addr = old_miner.addr_key();
@@ -1500,16 +1561,24 @@ async fn send_synapse_packet(
15001561
request: QuicRequest,
15011562
max_frame_payload: usize,
15021563
) -> Result<QuicResponse> {
1564+
let stable_id = connection.stable_id();
1565+
debug!(stable_id, "send_synapse_packet: opening bi stream");
15031566
let (mut send, mut recv) = connection
15041567
.open_bi()
15051568
.await
15061569
.map_err(|e| LightningError::Connection(format!("Failed to open stream: {}", e)))?;
1570+
debug!(stable_id, "send_synapse_packet: bi stream opened");
15071571

15081572
let start = Instant::now();
15091573

15101574
send_synapse_frame(&mut send, request).await?;
1575+
debug!(
1576+
stable_id,
1577+
"send_synapse_packet: frame sent, awaiting response"
1578+
);
15111579

15121580
let (msg_type, payload) = read_frame(&mut recv, max_frame_payload).await?;
1581+
debug!(stable_id, msg_type = ?msg_type, elapsed_ms = start.elapsed().as_millis() as u64, "send_synapse_packet: response received");
15131582

15141583
match msg_type {
15151584
MessageType::SynapseResponse => {
@@ -1685,8 +1754,10 @@ mod tests {
16851754

16861755
#[test]
16871756
fn with_config_rejects_frame_payload_below_minimum() {
1688-
let mut cfg = LightningClientConfig::default();
1689-
cfg.max_frame_payload_bytes = 512;
1757+
let cfg = LightningClientConfig {
1758+
max_frame_payload_bytes: 512,
1759+
..LightningClientConfig::default()
1760+
};
16901761
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
16911762
}
16921763

@@ -1697,23 +1768,30 @@ mod tests {
16971768
Ok(v) => v,
16981769
Err(_) => return,
16991770
};
1700-
let mut cfg = LightningClientConfig::default();
1701-
cfg.max_frame_payload_bytes = val;
1702-
cfg.max_stream_payload_bytes = val;
1771+
let cfg = LightningClientConfig {
1772+
max_frame_payload_bytes: val,
1773+
max_stream_payload_bytes: val,
1774+
..LightningClientConfig::default()
1775+
};
17031776
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
17041777
}
17051778

17061779
#[test]
17071780
fn with_config_rejects_stream_below_frame() {
1708-
let mut cfg = LightningClientConfig::default();
1709-
cfg.max_stream_payload_bytes = cfg.max_frame_payload_bytes - 1;
1781+
let base = LightningClientConfig::default();
1782+
let cfg = LightningClientConfig {
1783+
max_stream_payload_bytes: base.max_frame_payload_bytes - 1,
1784+
..base
1785+
};
17101786
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
17111787
}
17121788

17131789
#[test]
17141790
fn with_config_rejects_zero_stream_chunk_timeout() {
1715-
let mut cfg = LightningClientConfig::default();
1716-
cfg.stream_chunk_timeout = Some(Duration::ZERO);
1791+
let cfg = LightningClientConfig {
1792+
stream_chunk_timeout: Some(Duration::ZERO),
1793+
..LightningClientConfig::default()
1794+
};
17171795
assert!(LightningClient::with_config("hk".into(), cfg).is_err());
17181796
}
17191797

crates/btlightning/src/registry.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,10 +329,10 @@ mod tests {
329329
reg.register(QuicAxonInfo::new("hk1".into(), "1.2.3.4".into(), 8080, 4));
330330
let rs = reg.reconnect_state_or_insert(old_addr.clone());
331331
rs.attempts = 3;
332-
assert!(reg.reconnect_states.get(&old_addr).is_some());
332+
assert!(reg.reconnect_states.contains_key(&old_addr));
333333

334334
reg.register(QuicAxonInfo::new("hk1".into(), "5.6.7.8".into(), 9090, 4));
335-
assert!(reg.reconnect_states.get(&old_addr).is_none());
335+
assert!(!reg.reconnect_states.contains_key(&old_addr));
336336
reg.assert_invariants();
337337
}
338338

crates/btlightning/src/server/dispatch.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ use crate::util::unix_timestamp_secs;
88
use quinn::{Connection, RecvStream, SendStream};
99
use std::sync::Arc;
1010
use std::time::Duration;
11-
use tracing::{debug, error, warn};
11+
use tracing::{debug, error, info, warn};
1212

1313
pub(super) async fn handle_connection(connection: Connection, ctx: ServerContext) {
1414
let connection = Arc::new(connection);
15+
let stable_id = connection.stable_id();
16+
let remote = connection.remote_address();
17+
debug!(stable_id, %remote, "handle_connection: entering accept_bi loop");
1518

1619
loop {
1720
match connection.accept_bi().await {
1821
Ok((send, recv)) => {
22+
debug!(stable_id, "handle_connection: accepted bi stream");
1923
let conn = connection.clone();
2024
let ctx = ctx.clone();
2125

@@ -24,7 +28,13 @@ pub(super) async fn handle_connection(connection: Connection, ctx: ServerContext
2428
});
2529
}
2630
Err(e) => {
27-
debug!("Connection ended: {}", e);
31+
let close_reason = connection.close_reason();
32+
info!(
33+
remote = %connection.remote_address(),
34+
error = %e,
35+
close_reason = ?close_reason,
36+
"QUIC connection stream loop ended"
37+
);
2838
break;
2939
}
3040
}
@@ -74,6 +84,7 @@ async fn handle_stream(
7484
}
7585
};
7686

87+
debug!(msg_type = ?frame.0, payload_len = frame.1.len(), "handle_stream: frame received");
7788
match frame {
7889
(MessageType::SynapsePacket, payload) => {
7990
let packet: SynapsePacket = match rmp_serde::from_slice(&payload) {

crates/btlightning/src/server/handshake.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@ pub(super) async fn process_handshake(
111111
connections_guard.insert(request.validator_hotkey.clone(), validator_conn)
112112
{
113113
if !Arc::ptr_eq(&prev_conn.connection, &connection) {
114+
warn!(
115+
validator = %request.validator_hotkey,
116+
prev_addr = %prev_conn.connection.remote_address(),
117+
new_addr = %remote_addr,
118+
"closing previous connection for validator (replaced by new handshake)"
119+
);
114120
prev_conn.connection.close(0u32.into(), b"replaced");
115121
let prev_addr = prev_conn.connection.remote_address();
116122
if prev_addr != remote_addr {

crates/btlightning/src/signing.rs

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,47 @@ impl<F: Fn(&[u8]) -> Result<Vec<u8>> + Send + Sync> Signer for CallbackSigner<F>
8686
}
8787
}
8888

89+
/// [`Signer`] backed by a `btwallet` keypair loaded from the Bittensor wallet directory.
90+
///
91+
/// Requires the `btwallet` feature.
92+
#[cfg(feature = "btwallet")]
93+
pub struct BtWalletSigner {
94+
keypair: bittensor_wallet::Keypair,
95+
}
96+
97+
#[cfg(feature = "btwallet")]
98+
impl BtWalletSigner {
99+
/// Wraps an existing `btwallet::Keypair`.
100+
pub fn new(keypair: bittensor_wallet::Keypair) -> Self {
101+
Self { keypair }
102+
}
103+
104+
/// Loads the hotkey keypair from `~/<path>/<name>/hotkeys/<hotkey_name>`.
105+
pub fn from_wallet(name: &str, path: &str, hotkey_name: &str) -> Result<Self> {
106+
let wallet = bittensor_wallet::Wallet::new(
107+
Some(name.to_string()),
108+
Some(path.to_string()),
109+
Some(hotkey_name.to_string()),
110+
None,
111+
);
112+
let keypair = wallet
113+
.get_hotkey(Some(hotkey_name.to_string()))
114+
.map_err(|e| {
115+
LightningError::Config(format!("failed to load hotkey from wallet: {}", e))
116+
})?;
117+
Ok(Self { keypair })
118+
}
119+
}
120+
121+
#[cfg(feature = "btwallet")]
122+
impl Signer for BtWalletSigner {
123+
fn sign(&self, message: &[u8]) -> Result<Vec<u8>> {
124+
self.keypair
125+
.sign(message.to_vec())
126+
.map_err(LightningError::Signing)
127+
}
128+
}
129+
89130
#[cfg(test)]
90131
mod tests {
91132
use super::*;
@@ -157,44 +198,3 @@ mod tests {
157198
assert!(err.to_string().contains("network down"));
158199
}
159200
}
160-
161-
/// [`Signer`] backed by a `btwallet` keypair loaded from the Bittensor wallet directory.
162-
///
163-
/// Requires the `btwallet` feature.
164-
#[cfg(feature = "btwallet")]
165-
pub struct BtWalletSigner {
166-
keypair: bittensor_wallet::Keypair,
167-
}
168-
169-
#[cfg(feature = "btwallet")]
170-
impl BtWalletSigner {
171-
/// Wraps an existing `btwallet::Keypair`.
172-
pub fn new(keypair: bittensor_wallet::Keypair) -> Self {
173-
Self { keypair }
174-
}
175-
176-
/// Loads the hotkey keypair from `~/<path>/<name>/hotkeys/<hotkey_name>`.
177-
pub fn from_wallet(name: &str, path: &str, hotkey_name: &str) -> Result<Self> {
178-
let wallet = bittensor_wallet::Wallet::new(
179-
Some(name.to_string()),
180-
Some(path.to_string()),
181-
Some(hotkey_name.to_string()),
182-
None,
183-
);
184-
let keypair = wallet
185-
.get_hotkey(Some(hotkey_name.to_string()))
186-
.map_err(|e| {
187-
LightningError::Config(format!("failed to load hotkey from wallet: {}", e))
188-
})?;
189-
Ok(Self { keypair })
190-
}
191-
}
192-
193-
#[cfg(feature = "btwallet")]
194-
impl Signer for BtWalletSigner {
195-
fn sign(&self, message: &[u8]) -> Result<Vec<u8>> {
196-
self.keypair
197-
.sign(message.to_vec())
198-
.map_err(LightningError::Signing)
199-
}
200-
}

0 commit comments

Comments
 (0)