Skip to content

Commit 854360d

Browse files
committed
fix(network): add gossip stall detection and force disconnect (TM-B3)
Add mechanism to detect and recover from GossipSub mesh stalls where TCP connections remain up but gossip messages stop flowing. This occurs after network partitions when listen addresses expire but add_explicit_peer() alone cannot repair the stale mesh. Changes: - Add ForceDisconnect SwarmCommand for mesh recovery - Track per-peer last gossip timestamp for stall detection - Track consecutive failed mesh reform attempts per peer - Enhance periodic mesh health check: - Detect gossip stall (no messages > 60s despite connection) - Force disconnect after 3 failed reform attempts - Clear failure count on successful gossip receipt - Clear mesh reform cooldowns on ExpiredListenAddr event The force disconnect allows fresh TCP connection and mesh formation, recovering from stale mesh state that persisted after partition recovery.
1 parent 5f23d31 commit 854360d

File tree

1 file changed

+86
-2
lines changed

1 file changed

+86
-2
lines changed

app/src/actors_v2/network/network_actor.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ pub enum SwarmCommand {
9191
attempt: u32,
9292
response_tx: tokio::sync::oneshot::Sender<Result<bool, String>>,
9393
},
94+
/// TM-B3: Force disconnect for stale mesh recovery
95+
/// When gossip stalls despite TCP connection being up, forcibly disconnect
96+
/// to allow fresh connection and mesh formation
97+
ForceDisconnect {
98+
peer_id: PeerId,
99+
reason: String,
100+
},
94101
}
95102

96103
/// Phase 4: Rate limiter for DOS protection
@@ -253,6 +260,12 @@ pub struct NetworkActor {
253260
/// Maps peer_id -> topics that need mesh reformation
254261
/// ReformMesh is deferred until PeerIdentified event to avoid race condition
255262
pending_mesh_reforms: HashMap<PeerId, Vec<String>>,
263+
/// TM-B3: Per-peer last gossip message timestamp for stall detection
264+
/// Updated whenever we receive a gossip message from a peer
265+
last_peer_gossip: HashMap<PeerId, Instant>,
266+
/// TM-B3: Track consecutive failed ReformMesh attempts per peer
267+
/// Incremented when mesh verification fails, cleared on successful gossip receipt
268+
reform_failure_count: HashMap<PeerId, u32>,
256269
}
257270

258271
/// Pending block request tracking (Phase 4: Task 2.3)
@@ -311,6 +324,8 @@ impl NetworkActor {
311324
startup_time: Instant::now(),
312325
recent_mesh_reforms: HashMap::new(),
313326
pending_mesh_reforms: HashMap::new(),
327+
last_peer_gossip: HashMap::new(),
328+
reform_failure_count: HashMap::new(),
314329
})
315330
}
316331

@@ -556,13 +571,18 @@ impl NetworkActor {
556571
});
557572
}
558573

559-
/// TM-B2: Periodic mesh health check - verifies all connected V2 peers are in mesh
574+
/// TM-B2/TM-B3: Periodic mesh health check - verifies all connected V2 peers are in mesh
560575
///
561576
/// This provides defense-in-depth for mesh formation failures. Even if the deferred
562577
/// ReformMesh fails, this background check will detect and fix stale mesh state
563578
/// within 30 seconds.
579+
///
580+
/// TM-B3 enhancement: Also detects gossip stall (connected but no messages) and
581+
/// forces disconnect after multiple failed reform attempts to allow fresh connection.
564582
fn schedule_periodic_mesh_health_check(&self, ctx: &mut Context<Self>) {
565583
const MESH_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(30);
584+
const GOSSIP_STALL_THRESHOLD: Duration = Duration::from_secs(60);
585+
const MAX_REFORM_ATTEMPTS_BEFORE_DISCONNECT: u32 = 3;
566586

567587
ctx.run_interval(MESH_HEALTH_CHECK_INTERVAL, |actor, _ctx| {
568588
let Some(cmd_tx) = actor.swarm_cmd_tx.as_ref() else {
@@ -588,6 +608,39 @@ impl NetworkActor {
588608
continue;
589609
};
590610

611+
// TM-B3: Check for gossip stall - connected but no messages for > threshold
612+
let gossip_stalled = actor
613+
.last_peer_gossip
614+
.get(&peer_id)
615+
.map(|t| t.elapsed() > GOSSIP_STALL_THRESHOLD)
616+
.unwrap_or(false);
617+
618+
let reform_attempts = actor
619+
.reform_failure_count
620+
.get(&peer_id)
621+
.copied()
622+
.unwrap_or(0);
623+
624+
// If gossip has stalled and we've exceeded reform attempts, force disconnect
625+
if gossip_stalled && reform_attempts >= MAX_REFORM_ATTEMPTS_BEFORE_DISCONNECT {
626+
tracing::warn!(
627+
peer_id = %peer_id,
628+
reform_attempts = reform_attempts,
629+
stall_duration_secs = actor.last_peer_gossip.get(&peer_id)
630+
.map(|t| t.elapsed().as_secs())
631+
.unwrap_or(0),
632+
"Gossip stall detected - forcing disconnect for mesh recovery"
633+
);
634+
635+
let _ = cmd_tx.try_send(SwarmCommand::ForceDisconnect {
636+
peer_id,
637+
reason: "gossip_stall_recovery".to_string(),
638+
});
639+
actor.reform_failure_count.remove(&peer_id);
640+
actor.last_peer_gossip.remove(&peer_id);
641+
continue;
642+
}
643+
591644
// Check cooldown to avoid spamming - skip if we reformed recently
592645
if let Some(last_reform) = actor.recent_mesh_reforms.get(&peer_id) {
593646
if last_reform.elapsed() < Duration::from_secs(30) {
@@ -642,6 +695,13 @@ impl NetworkActor {
642695
}
643696
}
644697
});
698+
699+
// TM-B3: Increment failure count preemptively (will be cleared on success)
700+
// This tracks how many times we've attempted to reform mesh without
701+
// receiving any gossip messages, indicating potential stall
702+
if gossip_stalled {
703+
*actor.reform_failure_count.entry(peer_id).or_insert(0) += 1;
704+
}
645705
}
646706
}
647707
});
@@ -1022,7 +1082,14 @@ impl NetworkActor {
10221082
}
10231083

10241084
SwarmEvent::ExpiredListenAddr { address, .. } => {
1025-
tracing::info!(address = %address, "Expired listen address");
1085+
tracing::warn!(
1086+
address = %address,
1087+
"Listen address expired - clearing mesh reform cooldowns for recovery"
1088+
);
1089+
// TM-B3: Clear cooldowns to allow immediate mesh health check
1090+
// Listen address expiry often indicates network issues that may have
1091+
// disrupted gossip mesh state
1092+
self.recent_mesh_reforms.clear();
10261093
}
10271094

10281095
SwarmEvent::ListenerClosed { addresses, .. } => {
@@ -1167,6 +1234,13 @@ impl NetworkActor {
11671234
self.metrics.record_message_received(data.len());
11681235
self.metrics.record_gossip_received();
11691236

1237+
// TM-B3: Track last gossip received from this peer for stall detection
1238+
if let Ok(peer_id) = source_peer.parse::<PeerId>() {
1239+
self.last_peer_gossip.insert(peer_id, Instant::now());
1240+
// Clear failure count on successful gossip receipt
1241+
self.reform_failure_count.remove(&peer_id);
1242+
}
1243+
11701244
// Phase 1: Forward block gossip messages to ChainActor for import
11711245
if topic.contains("block") {
11721246
if let Some(ref chain_actor) = self.chain_actor {
@@ -2736,6 +2810,16 @@ impl Handler<NetworkMessage> for NetworkActor {
27362810
let _ = response_tx.send(Ok(all_in_mesh));
27372811
}
27382812

2813+
Some(SwarmCommand::ForceDisconnect { peer_id, reason }) => {
2814+
// TM-B3: Force disconnect for stale mesh recovery
2815+
tracing::warn!(
2816+
peer_id = %peer_id,
2817+
reason = %reason,
2818+
"Force disconnecting peer for mesh recovery"
2819+
);
2820+
let _ = swarm.disconnect_peer_id(peer_id);
2821+
}
2822+
27392823
None => {
27402824
tracing::info!("Command channel closed, stopping swarm poll");
27412825
break;

0 commit comments

Comments
 (0)