Skip to content

Commit a15ad68

Browse files
committed
fix(websocket): increase buffer size for websocket channels and improve client handling
1 parent 78a3580 commit a15ad68

File tree

3 files changed

+42
-12
lines changed

3 files changed

+42
-12
lines changed

api/cluster/websocket.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
"time"
1111

1212
"github.com/0xJacky/Nginx-UI/internal/analytic"
13-
"github.com/0xJacky/Nginx-UI/internal/kernel"
1413
"github.com/0xJacky/Nginx-UI/internal/helper"
14+
"github.com/0xJacky/Nginx-UI/internal/kernel"
1515
"github.com/0xJacky/Nginx-UI/model"
1616
"github.com/gin-gonic/gin"
1717
"github.com/gorilla/websocket"
@@ -51,7 +51,7 @@ func GetHub() *Hub {
5151
hubOnce.Do(func() {
5252
hub = &Hub{
5353
clients: make(map[*Client]bool),
54-
broadcast: make(chan WebSocketMessage, 256),
54+
broadcast: make(chan WebSocketMessage, 1024), // Increased buffer size
5555
register: make(chan *Client),
5656
unregister: make(chan *Client),
5757
}
@@ -81,15 +81,35 @@ func (h *Hub) run() {
8181

8282
case message := <-h.broadcast:
8383
h.mutex.RLock()
84+
deadClients := make([]*Client, 0)
8485
for client := range h.clients {
8586
select {
8687
case client.send <- message:
88+
case <-time.After(100 * time.Millisecond):
89+
// Client is too slow, mark for removal
90+
logger.Debug("Client send channel timeout, marking for removal")
91+
deadClients = append(deadClients, client)
8792
default:
88-
close(client.send)
89-
delete(h.clients, client)
93+
// Channel is full, mark for removal
94+
logger.Debug("Client send channel full, marking for removal")
95+
deadClients = append(deadClients, client)
9096
}
9197
}
9298
h.mutex.RUnlock()
99+
100+
// Clean up dead clients
101+
if len(deadClients) > 0 {
102+
h.mutex.Lock()
103+
for _, client := range deadClients {
104+
if _, ok := h.clients[client]; ok {
105+
close(client.send)
106+
delete(h.clients, client)
107+
client.cancel() // Trigger client cleanup
108+
}
109+
}
110+
h.mutex.Unlock()
111+
logger.Info("Cleaned up slow/unresponsive clients", "count", len(deadClients))
112+
}
93113
}
94114
}
95115
}
@@ -135,7 +155,7 @@ func GetAllEnabledEnvironmentWS(c *gin.Context) {
135155

136156
client := &Client{
137157
conn: ws,
138-
send: make(chan WebSocketMessage, 256),
158+
send: make(chan WebSocketMessage, 1024), // Increased buffer size
139159
ctx: ctx,
140160
cancel: cancel,
141161
}
@@ -222,7 +242,7 @@ func (c *Client) handleEnvironmentMonitoring() {
222242
}
223243
}
224244

225-
// sendMessage sends a message to the client
245+
// sendMessage sends a message to the client with timeout and better error handling
226246
func (c *Client) sendMessage(event string, data any) {
227247
message := WebSocketMessage{
228248
Event: event,
@@ -231,8 +251,16 @@ func (c *Client) sendMessage(event string, data any) {
231251

232252
select {
233253
case c.send <- message:
254+
case <-time.After(5 * time.Second):
255+
logger.Warn("Client send channel full, message dropped after timeout", "event", event)
256+
// Force disconnect slow clients to prevent resource leakage
257+
c.cancel()
234258
default:
235-
logger.Warn("Client send channel full, message dropped")
259+
logger.Warn("Client send channel full, message dropped immediately", "event", event)
260+
// For non-critical messages, we can drop them immediately
261+
if event != "heartbeat" {
262+
logger.Info("Dropping non-critical message due to full channel", "event", event)
263+
}
236264
}
237265
}
238266

@@ -290,7 +318,7 @@ func (c *Client) readPump() {
290318
if helper.IsUnexpectedWebsocketError(err) {
291319
logger.Error("Websocket error:", err)
292320
}
293-
return
321+
return
294322
}
295323
}
296324
}()

api/event/websocket.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func GetHub() *Hub {
4949
hubOnce.Do(func() {
5050
hub = &Hub{
5151
clients: make(map[*Client]bool),
52-
broadcast: make(chan WebSocketMessage, 256),
52+
broadcast: make(chan WebSocketMessage, 1024), // Increased buffer size
5353
register: make(chan *Client),
5454
unregister: make(chan *Client),
5555
}
@@ -69,8 +69,10 @@ func (h *Hub) BroadcastMessage(event string, data interface{}) {
6969
}
7070
select {
7171
case h.broadcast <- message:
72+
case <-time.After(1 * time.Second):
73+
logger.Warn("Broadcast channel full, message dropped after timeout", "event", event)
7274
default:
73-
logger.Warn("Broadcast channel full, message dropped")
75+
logger.Warn("Broadcast channel full, message dropped immediately", "event", event)
7476
}
7577
}
7678

@@ -131,7 +133,7 @@ func EventBus(c *gin.Context) {
131133

132134
client := &Client{
133135
conn: ws,
134-
send: make(chan WebSocketMessage, 256),
136+
send: make(chan WebSocketMessage, 1024), // Increased buffer size
135137
ctx: ctx,
136138
cancel: cancel,
137139
}

api/nginx/websocket.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func StreamDetailStatusWS(c *gin.Context) {
153153

154154
client := &NginxPerformanceClient{
155155
conn: ws,
156-
send: make(chan interface{}, 256),
156+
send: make(chan interface{}, 1024), // Increased buffer size
157157
ctx: ctx,
158158
cancel: cancel,
159159
}

0 commit comments

Comments
 (0)