Skip to content

Commit 974e19e

Browse files
KaboomFoxclaude
andcommitted
Fix hot switchover connection recovery and metrics race
- Pass switchover_tx to hot switchover connections so they can request further hot switchovers on data timeout (was None, causing silent exits) - Fix metrics race where old connection's disconnect sets is_connected=false after new connection already set it true. Now re-sets after old closes. - Add logging when hot switchover connection task exits normally for debugging Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent fcc2cc0 commit 974e19e

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

src/connection.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ impl<H: WebSocketHandler> Connection<H> {
110110
command_rx: mpsc::Receiver<ConnectionCommand>,
111111
ready_tx: oneshot::Sender<()>,
112112
subscriptions: Vec<H::Subscription>,
113+
switchover_tx: mpsc::Sender<usize>,
113114
) -> Self {
114115
Self {
115116
shard_id,
@@ -121,7 +122,7 @@ impl<H: WebSocketHandler> Connection<H> {
121122
command_rx,
122123
ready_tx: Some(ready_tx),
123124
initial_subscriptions: Some(subscriptions),
124-
switchover_tx: None,
125+
switchover_tx: Some(switchover_tx),
125126
}
126127
}
127128

src/manager.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ impl<H: WebSocketHandler> ShardManager<H> {
818818
let (new_tx, new_rx) = mpsc::channel::<ConnectionCommand>(DEFAULT_CHANNEL_SIZE);
819819
let (ready_tx, ready_rx) = oneshot::channel();
820820

821-
// Create new connection with ready signal and explicit subscriptions
821+
// Create new connection with ready signal, explicit subscriptions, and switchover channel
822822
let new_connection = Connection::with_ready_signal(
823823
shard_id,
824824
self.handler.clone(),
@@ -829,12 +829,18 @@ impl<H: WebSocketHandler> ShardManager<H> {
829829
new_rx,
830830
ready_tx,
831831
subscriptions,
832+
self.switchover_tx.clone(),
832833
);
833834

834-
// Spawn new connection task
835+
// Spawn new connection task with proper logging
835836
let handle = tokio::spawn(async move {
836-
if let Err(e) = new_connection.run().await {
837-
error!("[SHARD-{}] Hot switchover connection failed: {}", shard_id, e);
837+
match new_connection.run().await {
838+
Ok(()) => {
839+
debug!("[SHARD-{}] Hot switchover connection task exited normally", shard_id);
840+
}
841+
Err(e) => {
842+
error!("[SHARD-{}] Hot switchover connection failed: {}", shard_id, e);
843+
}
838844
}
839845
});
840846

@@ -878,6 +884,14 @@ impl<H: WebSocketHandler> ShardManager<H> {
878884
}
879885
}
880886

887+
// Fix metrics race: old connection's disconnect will set is_connected=false,
888+
// but the new connection is already running. Re-set to true after close is sent.
889+
// Use a small delay to let the old connection's metrics update complete first.
890+
tokio::time::sleep(Duration::from_millis(10)).await;
891+
self.metrics.update_shard(shard_id, |s| {
892+
s.is_connected = true;
893+
});
894+
881895
// Replace handle using HashMap lookup
882896
{
883897
let mut handles = self.shard_handles.write();

0 commit comments

Comments
 (0)