Skip to content

Commit d95b4d3

Browse files
KaboomFoxclaude
andcommitted
Fix dead shard recovery and subscription replay on reconnect
Replace initial_subscriptions with a subscription_provider callback that reads current subs from manager state on every connect/reconnect. Wrap connection run in a panic retry loop (max 3 attempts) that recreates the channel pair and updates state before retrying. Remove redundant subscription sends from start() since the provider now handles replay automatically. Add pre-commit hook for local CI. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0d2ceeb commit d95b4d3

File tree

3 files changed

+236
-103
lines changed

3 files changed

+236
-103
lines changed

.githooks/pre-commit

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env bash
2+
set -e
3+
4+
# Pre-commit hook: runs the same checks as CI to catch issues before pushing.
5+
# Install: git config core.hooksPath .githooks
6+
7+
RED='\033[0;31m'
8+
GREEN='\033[0;32m'
9+
YELLOW='\033[0;33m'
10+
BOLD='\033[1m'
11+
NC='\033[0m'
12+
13+
failed=0
14+
15+
step() {
16+
printf "${BOLD}[pre-commit]${NC} %s... " "$1"
17+
}
18+
19+
pass() {
20+
printf "${GREEN}ok${NC} (%.1fs)\n" "$1"
21+
}
22+
23+
fail() {
24+
printf "${RED}FAILED${NC} (%.1fs)\n" "$1"
25+
failed=1
26+
}
27+
28+
# 1. Format check (fastest, catches trivial issues)
29+
step "cargo fmt --check"
30+
start=$(date +%s.%N)
31+
if cargo fmt --all -- --check >/dev/null 2>&1; then
32+
pass "$(echo "$(date +%s.%N) - $start" | bc)"
33+
else
34+
fail "$(echo "$(date +%s.%N) - $start" | bc)"
35+
echo ""
36+
printf " ${YELLOW}Fix: cargo fmt --all${NC}\n"
37+
echo ""
38+
fi
39+
40+
# 2. Clippy (catches warnings/lint issues)
41+
step "cargo clippy"
42+
start=$(date +%s.%N)
43+
if cargo clippy --all-features -- -D warnings 2>&1 | tail -20 > /tmp/pre-commit-clippy.log 2>&1; then
44+
pass "$(echo "$(date +%s.%N) - $start" | bc)"
45+
else
46+
fail "$(echo "$(date +%s.%N) - $start" | bc)"
47+
echo ""
48+
cat /tmp/pre-commit-clippy.log
49+
echo ""
50+
fi
51+
52+
# 3. Tests (catches regressions)
53+
step "cargo test"
54+
start=$(date +%s.%N)
55+
if cargo test --all-features 2>&1 | tail -20 > /tmp/pre-commit-test.log 2>&1; then
56+
pass "$(echo "$(date +%s.%N) - $start" | bc)"
57+
else
58+
fail "$(echo "$(date +%s.%N) - $start" | bc)"
59+
echo ""
60+
cat /tmp/pre-commit-test.log
61+
echo ""
62+
fi
63+
64+
# Result
65+
echo ""
66+
if [ "$failed" -ne 0 ]; then
67+
printf "${RED}${BOLD}Pre-commit checks failed.${NC} Fix the issues above or skip with --no-verify\n"
68+
exit 1
69+
else
70+
printf "${GREEN}${BOLD}All pre-commit checks passed.${NC}\n"
71+
fi

src/connection.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ use tokio_tungstenite::{
1919
use tracing::{debug, error, info, trace, warn};
2020
use url::Url;
2121

22+
/// A callback that provides the current subscriptions for a shard.
23+
/// Called on every connect/reconnect to get the latest subscription state.
24+
pub(crate) type SubscriptionProvider<S> = Arc<dyn Fn() -> Vec<S> + Send + Sync>;
25+
2226
/// Commands that can be sent to a connection
2327
#[derive(Debug)]
2428
pub enum ConnectionCommand {
@@ -75,8 +79,8 @@ pub struct Connection<H: WebSocketHandler> {
7579
command_rx: mpsc::Receiver<ConnectionCommand>,
7680
/// Optional channel to signal when connection is ready
7781
ready_tx: Option<oneshot::Sender<()>>,
78-
/// Explicit subscriptions for this shard (used during hot switchover)
79-
initial_subscriptions: Option<Vec<H::Subscription>>,
82+
/// Provides current subscriptions for this shard on connect/reconnect
83+
subscription_provider: SubscriptionProvider<H::Subscription>,
8084
/// Optional channel to request hot switchover from manager
8185
switchover_tx: Option<mpsc::Sender<usize>>,
8286
}
@@ -93,6 +97,7 @@ impl<H: WebSocketHandler> Connection<H> {
9397
metrics: Arc<Metrics>,
9498
command_rx: mpsc::Receiver<ConnectionCommand>,
9599
switchover_tx: mpsc::Sender<usize>,
100+
subscription_provider: SubscriptionProvider<H::Subscription>,
96101
) -> Self {
97102
Self {
98103
shard_id,
@@ -103,7 +108,7 @@ impl<H: WebSocketHandler> Connection<H> {
103108
metrics,
104109
command_rx,
105110
ready_tx: None,
106-
initial_subscriptions: None,
111+
subscription_provider,
107112
switchover_tx: Some(switchover_tx),
108113
}
109114
}
@@ -119,7 +124,7 @@ impl<H: WebSocketHandler> Connection<H> {
119124
metrics: Arc<Metrics>,
120125
command_rx: mpsc::Receiver<ConnectionCommand>,
121126
ready_tx: oneshot::Sender<()>,
122-
subscriptions: Vec<H::Subscription>,
127+
subscription_provider: SubscriptionProvider<H::Subscription>,
123128
switchover_tx: mpsc::Sender<usize>,
124129
) -> Self {
125130
Self {
@@ -131,7 +136,7 @@ impl<H: WebSocketHandler> Connection<H> {
131136
metrics,
132137
command_rx,
133138
ready_tx: Some(ready_tx),
134-
initial_subscriptions: Some(subscriptions),
139+
subscription_provider,
135140
switchover_tx: Some(switchover_tx),
136141
}
137142
}
@@ -391,18 +396,17 @@ impl<H: WebSocketHandler> Connection<H> {
391396
self.metrics.record_message_sent();
392397
}
393398

394-
// If we have explicit subscriptions (hot switchover), send subscription message
395-
if let Some(ref subs) = self.initial_subscriptions {
396-
if !subs.is_empty() {
397-
if let Some(msg) = self.handler.subscription_message(subs) {
398-
write.send(msg).await?;
399-
self.metrics.record_message_sent();
400-
info!(
401-
"[SHARD-{}] Sent {} subscriptions from hot switchover",
402-
self.shard_id,
403-
subs.len()
404-
);
405-
}
399+
// Replay current subscriptions from provider (works for both reconnect and hot switchover)
400+
let subs = (self.subscription_provider)();
401+
if !subs.is_empty() {
402+
if let Some(msg) = self.handler.subscription_message(&subs) {
403+
write.send(msg).await?;
404+
self.metrics.record_message_sent();
405+
info!(
406+
"[SHARD-{}] Replayed {} subscriptions",
407+
self.shard_id,
408+
subs.len()
409+
);
406410
}
407411
}
408412

0 commit comments

Comments
 (0)