Skip to content

Commit e81aeda

Browse files
fuziontechclaude
andcommitted
Protect HealthCheckLoop from same nil-pointer crash, fix test nits
- Add recoverWorkerPanic to the doHealthCheck call in HealthCheckLoop. Same race: w.client.Close() from a concurrent crash/retire nils out FlightServiceClient while the health check goroutine calls DoAction. - Replace nil contexts with context.Background() in tests. - Add TestDestroySessionAfterOnWorkerCrash to verify the exact production sequence: OnWorkerCrash cleans up, then the deferred DestroySession is a safe no-op. - Add comment explaining intentional double-close of TCP connection (OnWorkerCrash + handleConnection defer). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4ad573b commit e81aeda

File tree

4 files changed

+62
-10
lines changed

4 files changed

+62
-10
lines changed

controlplane/session_mgr.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,10 @@ func (sm *SessionManager) OnWorkerCrash(workerID int, errorFn func(pid int32)) {
171171
}
172172
// Close the TCP connection to unblock the message loop's read.
173173
// This causes the session goroutine to exit instead of looping
174-
// with ErrWorkerDead on every query.
174+
// with ErrWorkerDead on every query. The deferred close in
175+
// handleConnection will also call Close() on the same conn;
176+
// that's harmless (net.Conn.Close on a closed socket returns
177+
// an error which is discarded).
175178
if session.connCloser != nil {
176179
_ = session.connCloser.Close()
177180
}

controlplane/session_mgr_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,43 @@ func TestRecoverWorkerPanic_RuntimeErrorRePanics(t *testing.T) {
211211

212212
t.Fatal("should not reach here")
213213
}
214+
215+
func TestDestroySessionAfterOnWorkerCrash(t *testing.T) {
216+
// Verify that DestroySession is a safe no-op when OnWorkerCrash already
217+
// cleaned up the session. This is the exact production sequence:
218+
// OnWorkerCrash runs from the health check, then the deferred
219+
// DestroySession runs when handleConnection returns.
220+
pool := &FlightWorkerPool{
221+
workers: make(map[int]*ManagedWorker),
222+
}
223+
sm := NewSessionManager(pool, nil)
224+
225+
conn := &mockCloser{}
226+
executor := &server.FlightExecutor{}
227+
pid := int32(1010)
228+
229+
sm.mu.Lock()
230+
sm.sessions[pid] = &ManagedSession{
231+
PID: pid,
232+
WorkerID: 9,
233+
Executor: executor,
234+
connCloser: conn,
235+
}
236+
sm.byWorker[9] = []int32{pid}
237+
sm.mu.Unlock()
238+
239+
// Simulate crash cleanup
240+
sm.OnWorkerCrash(9, func(pid int32) {})
241+
242+
if sm.SessionCount() != 0 {
243+
t.Fatal("expected 0 sessions after OnWorkerCrash")
244+
}
245+
246+
// Now DestroySession runs (from deferred call in handleConnection).
247+
// Should be a no-op — no panic, no double-close of worker resources.
248+
sm.DestroySession(pid)
249+
250+
if sm.SessionCount() != 0 {
251+
t.Fatal("expected 0 sessions after DestroySession")
252+
}
253+
}

controlplane/worker_mgr.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -484,10 +484,18 @@ func (p *FlightWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Du
484484
}
485485
_ = os.Remove(w.socketPath)
486486
default:
487-
// Worker is alive, do a health check
488-
hctx, cancel := context.WithTimeout(ctx, 3*time.Second)
489-
err := doHealthCheck(hctx, w.client)
490-
cancel()
487+
// Worker is alive, do a health check.
488+
// Recover nil-pointer panics: w.client.Close() (from a
489+
// concurrent crash/retire) nils out FlightServiceClient,
490+
// racing with the DoAction call inside doHealthCheck.
491+
var healthErr error
492+
func() {
493+
defer recoverWorkerPanic(&healthErr)
494+
hctx, cancel := context.WithTimeout(ctx, 3*time.Second)
495+
healthErr = doHealthCheck(hctx, w.client)
496+
cancel()
497+
}()
498+
err := healthErr
491499

492500
if err != nil {
493501
mu.Lock()

server/flight_executor_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package server
22

33
import (
4+
"context"
45
"errors"
56
"runtime"
67
"strings"
@@ -12,7 +13,7 @@ func TestFlightExecutorMarkDead_QueryContext(t *testing.T) {
1213
e := &FlightExecutor{} // client is nil — would panic if accessed
1314
e.MarkDead()
1415

15-
_, err := e.QueryContext(nil, "SELECT 1")
16+
_, err := e.QueryContext(context.Background(), "SELECT 1")
1617
if !errors.Is(err, ErrWorkerDead) {
1718
t.Fatalf("expected ErrWorkerDead, got %v", err)
1819
}
@@ -22,7 +23,7 @@ func TestFlightExecutorMarkDead_ExecContext(t *testing.T) {
2223
e := &FlightExecutor{}
2324
e.MarkDead()
2425

25-
_, err := e.ExecContext(nil, "SET x = 1")
26+
_, err := e.ExecContext(context.Background(), "SET x = 1")
2627
if !errors.Is(err, ErrWorkerDead) {
2728
t.Fatalf("expected ErrWorkerDead, got %v", err)
2829
}
@@ -33,7 +34,7 @@ func TestFlightExecutorMarkDeadIdempotent(t *testing.T) {
3334
e.MarkDead()
3435
e.MarkDead() // should not panic
3536

36-
_, err := e.QueryContext(nil, "SELECT 1")
37+
_, err := e.QueryContext(context.Background(), "SELECT 1")
3738
if !errors.Is(err, ErrWorkerDead) {
3839
t.Fatalf("expected ErrWorkerDead after double MarkDead, got %v", err)
3940
}
@@ -109,7 +110,7 @@ func TestFlightExecutorNilClient_QueryContextRecovers(t *testing.T) {
109110
client: nil, // simulates closed client
110111
}
111112

112-
_, err := e.QueryContext(nil, "SELECT 1")
113+
_, err := e.QueryContext(context.Background(), "SELECT 1")
113114
if err == nil {
114115
t.Fatal("expected error from nil client")
115116
}
@@ -123,7 +124,7 @@ func TestFlightExecutorNilClient_ExecContextRecovers(t *testing.T) {
123124
client: nil,
124125
}
125126

126-
_, err := e.ExecContext(nil, "SET x = 1")
127+
_, err := e.ExecContext(context.Background(), "SET x = 1")
127128
if err == nil {
128129
t.Fatal("expected error from nil client")
129130
}

0 commit comments

Comments
 (0)