Skip to content

Commit 98f387d

Browse files
authored
Merge pull request #32 from tstromberg/main
Remove shutdown message to avoid panics
2 parents 1d61827 + ac1da86 commit 98f387d

File tree

4 files changed

+501
-44
lines changed

4 files changed

+501
-44
lines changed

pkg/srv/client.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strings"
77
"sync"
8+
"sync/atomic"
89
"time"
910

1011
"golang.org/x/net/websocket"
@@ -13,11 +14,34 @@ import (
1314
)
1415

1516
// Client represents a connected WebSocket client with their subscription preferences.
17+
//
1618
// Connection management follows a simple pattern:
1719
// - ONE goroutine (Run) handles ALL writes to avoid concurrent write issues
1820
// - Server sends pings every pingInterval to detect dead connections
1921
// - Client responds with pongs; read loop resets deadline on any message
2022
// - Read loop (in websocket.go) detects disconnects and closes the connection
23+
//
24+
// Cleanup coordination (CRITICAL FOR THREAD SAFETY):
25+
// Multiple goroutines can trigger cleanup concurrently:
26+
// 1. Handle() defer in websocket.go calls Hub.Unregister() (async via channel)
27+
// 2. Handle() defer in websocket.go calls closeWebSocket() (closes WS connection)
28+
// 3. Client.Run() defer calls client.Close() when context is cancelled
29+
// 4. Hub.Run() processes unregister message and calls client.Close()
30+
// 5. Hub.cleanup() during shutdown calls client.Close() for all clients
31+
//
32+
// Thread safety is ensured by:
33+
// - Close() uses sync.Once to ensure channels are closed exactly once
34+
// - closed atomic flag allows checking if client is closing (safe from any goroutine)
35+
// - Hub checks closed flag before sending to avoid race with channel close
36+
// - closeWebSocket() does NOT send to client channels (would race with Close)
37+
//
38+
// Cleanup flow when a client disconnects:
39+
// 1. Handle() read loop exits (EOF, timeout, or error)
40+
// 2. defer cancel() signals Client.Run() via context
41+
// 3. defer Hub.Unregister(clientID) sends message to hub (returns immediately)
42+
// 4. defer closeWebSocket() closes the WebSocket connection only
43+
// 5. Client.Run() sees context cancellation, exits, calls defer client.Close()
44+
// 6. Hub.Run() processes unregister, calls client.Close() (idempotent via sync.Once)
2145
type Client struct {
2246
conn *websocket.Conn
2347
send chan Event
@@ -28,6 +52,7 @@ type Client struct {
2852
ID string
2953
subscription Subscription
3054
closeOnce sync.Once
55+
closed uint32 // Atomic flag: 1 if closed, 0 if open
3156
}
3257

3358
// NewClient creates a new client.
@@ -156,8 +181,18 @@ func (c *Client) write(msg any, timeout time.Duration) error {
156181
// Close gracefully closes the client.
157182
func (c *Client) Close() {
158183
c.closeOnce.Do(func() {
184+
// Set closed flag BEFORE closing channels
185+
// This allows other goroutines to check if client is closing
186+
atomic.StoreUint32(&c.closed, 1)
187+
159188
close(c.done)
160189
close(c.send)
161190
close(c.control)
162191
})
163192
}
193+
194+
// IsClosed returns true if the client is closed or closing.
195+
// Safe to call from any goroutine.
196+
func (c *Client) IsClosed() bool {
197+
return atomic.LoadUint32(&c.closed) != 0
198+
}

pkg/srv/hub.go

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,24 @@ type Event struct {
2424
// Hub manages WebSocket clients and event broadcasting.
2525
// It runs in its own goroutine and handles client registration,
2626
// unregistration, and event distribution.
27+
//
28+
// Thread safety design:
29+
// - Single-goroutine pattern: Only Run() modifies the clients map
30+
// - All external operations (Register, Unregister, Broadcast) send to buffered channels
31+
// - ClientCount() uses RLock for safe concurrent reads
32+
// - Client snapshot pattern in broadcast minimizes lock time
33+
//
34+
// Unregister coordination:
35+
// - Unregister(clientID) sends message to channel and returns immediately (async)
36+
// - Run() processes unregister messages in order
37+
// - Calls client.Close() which is idempotent (sync.Once)
38+
// - Multiple concurrent unregisters for same client are safe
39+
//
40+
// Broadcast safety:
41+
// - Creates client snapshot with RLock, then releases lock
42+
// - Non-blocking send to client.send channel prevents deadlocks
43+
// - If client disconnects during iteration, send fails gracefully (channel full or closed)
44+
// - Client.Close() is safe to call multiple times during this window
2745
type Hub struct {
2846
clients map[string]*Client
2947
register chan *Client
@@ -142,9 +160,8 @@ func (h *Hub) Run(ctx context.Context) {
142160
dropped := 0
143161
for _, client := range clientSnapshot {
144162
if matches(client.subscription, msg.event, msg.payload, client.userOrgs) {
145-
// Non-blocking send
146-
select {
147-
case client.send <- msg.event:
163+
// Try to send (safe against closed channels)
164+
if h.trySendEvent(client, msg.event) {
148165
matched++
149166
logger.Info("delivered event to client", logger.Fields{
150167
"client_id": client.ID,
@@ -154,9 +171,9 @@ func (h *Hub) Run(ctx context.Context) {
154171
"pr_url": msg.event.URL,
155172
"delivery_id": msg.event.DeliveryID,
156173
})
157-
default:
174+
} else {
158175
dropped++
159-
logger.Warn("dropped event for client: buffer full", logger.Fields{
176+
logger.Warn("dropped event for client: channel full or closed", logger.Fields{
160177
"client_id": client.ID,
161178
})
162179
}
@@ -225,38 +242,72 @@ func (h *Hub) ClientCount() int {
225242
return len(h.clients)
226243
}
227244

245+
// trySendEvent attempts to send an event to a client's send channel.
246+
// Returns true if sent successfully, false if channel is full or closed.
247+
//
248+
// CRITICAL: This function checks the client's closed flag before sending.
249+
// This prevents race conditions where Client.Close() is called while Hub is broadcasting.
250+
//
251+
// Race scenario this handles:
252+
// 1. Hub takes client snapshot (client in map, channels open)
253+
// 2. Client.Close() is called (sets closed=1, then closes send channel)
254+
// 3. Hub checks client.IsClosed() before sending
255+
// 4. If closed=1, we don't attempt to send (avoiding panic)
256+
//
257+
// Note: There's still a tiny window between IsClosed() check and send where
258+
// Close() could be called, so we keep recover() as a safety net.
259+
func (h *Hub) trySendEvent(client *Client, event Event) (sent bool) {
260+
// Check if client is closed before attempting send
261+
// This prevents most races with client.Close()
262+
if client.IsClosed() {
263+
return false
264+
}
265+
266+
defer func() {
267+
if r := recover(); r != nil {
268+
// Channel was closed between IsClosed() check and send
269+
// This is a very rare race but possible, so we catch it
270+
sent = false
271+
}
272+
}()
273+
274+
// Non-blocking send with panic protection
275+
select {
276+
case client.send <- event:
277+
return true
278+
default:
279+
return false
280+
}
281+
}
282+
228283
// cleanup closes all client connections during shutdown.
284+
//
285+
// CRITICAL THREADING NOTE:
286+
// This function MUST NOT send to client channels (send/control) because of race conditions:
287+
// - Client.Close() can be called concurrently from multiple places (Handle defer, Run defer, etc.)
288+
// - Once Close() starts, it closes all channels atomically
289+
// - Trying to send to a closed channel panics, even with select/default
290+
// - select/default only protects against FULL channels, not CLOSED channels
291+
//
292+
// Instead, we rely on:
293+
// - WebSocket connection close will signal the client
294+
// - Client.Run() will detect context cancellation and exit gracefully
295+
// - client.Close() is idempotent (sync.Once) so safe to call multiple times
229296
func (h *Hub) cleanup() {
230297
h.mu.Lock()
231298
defer h.mu.Unlock()
232299

233-
logger.Info("Hub cleanup: closing client connections gracefully", logger.Fields{
300+
logger.Info("Hub cleanup: closing client connections", logger.Fields{
234301
"client_count": len(h.clients),
235302
})
236303

304+
// Close all clients. DO NOT try to send shutdown messages - race with client.Close()
305+
// The WebSocket connection close and context cancellation are sufficient signals.
237306
for id, client := range h.clients {
238-
// Try to send shutdown message (non-blocking)
239-
select {
240-
case client.send <- Event{Type: "shutdown"}:
241-
logger.Info("sent shutdown notice to client", logger.Fields{"client_id": id})
242-
default:
243-
logger.Warn("could not send shutdown notice to client: channel full", logger.Fields{"client_id": id})
244-
}
245-
}
246-
247-
// Give clients a moment to process shutdown messages and close gracefully
248-
// This allows time for proper WebSocket close frames to be sent
249-
if len(h.clients) > 0 {
250-
logger.Info("waiting for clients to receive shutdown messages", logger.Fields{
251-
"client_count": len(h.clients),
252-
})
253-
time.Sleep(200 * time.Millisecond)
254-
}
255-
256-
// Now close all clients
257-
for _, client := range h.clients {
258307
client.Close()
308+
logger.Info("closed client during hub cleanup", logger.Fields{"client_id": id})
259309
}
310+
260311
h.clients = nil
261312
logger.Info("Hub cleanup complete", nil)
262313
}

0 commit comments

Comments
 (0)