Skip to content

Commit 30d68bd

Browse files
committed
Update.
1 parent 84e9e9f commit 30d68bd

File tree

2 files changed

+118
-64
lines changed

2 files changed

+118
-64
lines changed

cmd/dashboard/controller/common_page.go

Lines changed: 114 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import (
66
"fmt"
77
"log"
88
"math"
9+
"net"
910
"net/http"
1011
"runtime"
1112
"runtime/debug"
1213
"strconv"
14+
"sync"
1315
"time"
1416

1517
"code.cloudfoundry.org/bytefmt"
@@ -1141,120 +1143,172 @@ func (cp *commonPage) ws(c *gin.Context) {
11411143
CheckOrigin: func(r *http.Request) bool {
11421144
return true
11431145
},
1144-
// 增加缓冲区大小,提高稳定性
1145-
ReadBufferSize: 1024 * 8,
1146-
WriteBufferSize: 1024 * 8,
1146+
ReadBufferSize: 16384, // 16KB
1147+
WriteBufferSize: 16384, // 16KB
1148+
HandshakeTimeout: 10 * time.Second,
11471149
}
1150+
11481151
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
11491152
if err != nil {
1150-
log.Printf("NG-ERROR: Failed to set websocket upgrade: %+v", err)
1153+
log.Printf("WebSocket升级失败: %v", err)
11511154
return
11521155
}
11531156

1154-
// 使用context控制生命周期
1157+
// 生成连接ID用于跟踪
1158+
connID := fmt.Sprintf("ws_%d", time.Now().UnixNano())
1159+
log.Printf("WebSocket连接建立: %s", connID)
1160+
1161+
// 创建带取消的上下文
11551162
ctx, cancel := context.WithCancel(c.Request.Context())
11561163
defer cancel()
11571164

1158-
// 设置连接参数,提高稳定性
1159-
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
1160-
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
1161-
1162-
// 设置Pong处理函数,更新读取超时
1165+
// 确保连接关闭
1166+
defer func() {
1167+
conn.Close()
1168+
log.Printf("WebSocket连接关闭: %s", connID)
1169+
}()
1170+
1171+
// 设置连接基本参数
1172+
conn.SetReadLimit(32768) // 32KB读取限制
11631173
conn.SetPongHandler(func(string) error {
11641174
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
11651175
return nil
11661176
})
11671177

1168-
// 使用正确的构造函数
1178+
// 创建safe连接
11691179
safeConn := websocketx.NewConn(conn)
1170-
1171-
// 确保连接被关闭
1172-
defer func() {
1173-
cancel() // 取消context,确保所有goroutine退出
1174-
safeConn.Close()
1175-
}()
11761180

1177-
// 使用一个channel来通知写入goroutine退出
1178-
done := make(chan struct{})
1179-
1180-
// 添加心跳ticker
1181-
pingTicker := time.NewTicker(30 * time.Second)
1182-
defer pingTicker.Stop()
1181+
// 使用WaitGroup确保所有goroutine正确退出
1182+
var wg sync.WaitGroup
1183+
wg.Add(2) // 2个goroutine:读取和写入
1184+
1185+
// 错误通道,用于goroutine间通信
1186+
errChan := make(chan error, 2)
11831187

1184-
// Read goroutine
1188+
// 读取goroutine - 处理客户端断开检测和心跳
11851189
go func() {
1190+
defer wg.Done()
11861191
defer func() {
1187-
close(done) // 发送退出信号
1188-
cancel() // 取消context
1192+
if r := recover(); r != nil {
1193+
log.Printf("WebSocket读取goroutine panic恢复 %s: %v", connID, r)
1194+
}
11891195
}()
1196+
11901197
for {
11911198
select {
11921199
case <-ctx.Done():
11931200
return
11941201
default:
1195-
// 设置读取超时
1196-
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
1197-
1198-
// 我们需要从连接中读取,以检测客户端是否已断开连接。
1202+
// 设置读取超时,避免无限阻塞
1203+
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
1204+
11991205
msgType, message, err := safeConn.ReadMessage()
12001206
if err != nil {
1201-
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
1202-
log.Printf("NG-ERROR: websocket read error: %v", err)
1207+
// 检查是否是超时错误
1208+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
1209+
continue // 超时继续循环
12031210
}
1204-
return // 退出循环
1211+
1212+
// 其他错误表示连接问题
1213+
if !websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
1214+
log.Printf("WebSocket读取错误 %s: %v", connID, err)
1215+
}
1216+
1217+
// 通知其他goroutine退出
1218+
select {
1219+
case errChan <- err:
1220+
default:
1221+
}
1222+
return
12051223
}
1206-
1207-
// 处理客户端发送的ping消息
1224+
1225+
// 处理心跳消息
12081226
if msgType == websocket.TextMessage && string(message) == `{"type":"ping"}` {
1209-
// 立即响应pong
1210-
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
1227+
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
12111228
if err := safeConn.WriteMessage(websocket.TextMessage, []byte(`{"type":"pong"}`)); err != nil {
1212-
log.Printf("Failed to send pong: %v", err)
1229+
log.Printf("发送pong失败 %s: %v", connID, err)
1230+
select {
1231+
case errChan <- err:
1232+
default:
1233+
}
12131234
return
12141235
}
12151236
}
12161237
}
12171238
}
12181239
}()
12191240

1220-
// Write goroutine
1241+
// 写入goroutine - 处理数据推送和心跳
12211242
go func() {
1222-
ticker := time.NewTicker(time.Second * 1) // 从5秒改为1秒,提高更新频率
1223-
defer ticker.Stop()
1243+
defer wg.Done()
1244+
defer func() {
1245+
if r := recover(); r != nil {
1246+
log.Printf("WebSocket写入goroutine panic恢复 %s: %v", connID, r)
1247+
}
1248+
}()
1249+
1250+
// 数据推送定时器
1251+
dataTicker := time.NewTicker(1 * time.Second)
1252+
defer dataTicker.Stop()
1253+
1254+
// 心跳定时器
1255+
pingTicker := time.NewTicker(25 * time.Second)
1256+
defer pingTicker.Stop()
1257+
12241258
for {
12251259
select {
12261260
case <-ctx.Done():
12271261
return
1228-
case <-done: // 从读取goroutine接收到退出信号
1262+
case <-errChan:
12291263
return
1230-
case <-pingTicker.C:
1231-
// 发送WebSocket Ping帧
1232-
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
1233-
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
1234-
log.Printf("Failed to send ping: %v", err)
1235-
return
1236-
}
1237-
case <-ticker.C:
1264+
case <-dataTicker.C:
1265+
// 获取并发送服务器状态
12381266
stat, err := cp.getServerStat(c, false)
12391267
if err != nil {
1240-
log.Printf("NG-ERROR: failed to get server stat for websocket: %v", err)
1241-
// 不要退出,让 done channel 处理终止
1242-
continue
1268+
log.Printf("获取服务器状态失败 %s: %v", connID, err)
1269+
continue // 继续运行,不退出
12431270
}
1244-
1245-
// 设置写入超时
1271+
12461272
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
12471273
if err = safeConn.WriteMessage(websocket.TextMessage, stat); err != nil {
1248-
// 写入失败,可能是因为连接已关闭。
1249-
// 读取goroutine将处理清理工作。我们可以在这里退出。
1274+
log.Printf("发送数据失败 %s: %v", connID, err)
1275+
return
1276+
}
1277+
1278+
case <-pingTicker.C:
1279+
// 发送WebSocket Ping帧
1280+
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
1281+
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
1282+
log.Printf("发送ping失败 %s: %v", connID, err)
12501283
return
12511284
}
12521285
}
12531286
}
12541287
}()
1255-
1256-
// 等待context取消
1257-
<-ctx.Done()
1288+
1289+
// 等待任一goroutine完成或超时
1290+
done := make(chan struct{})
1291+
go func() {
1292+
wg.Wait()
1293+
close(done)
1294+
}()
1295+
1296+
select {
1297+
case <-done:
1298+
log.Printf("WebSocket所有goroutine正常退出: %s", connID)
1299+
case <-ctx.Done():
1300+
log.Printf("WebSocket上下文取消: %s", connID)
1301+
case <-time.After(30 * time.Minute):
1302+
log.Printf("WebSocket连接超时: %s", connID)
1303+
cancel()
1304+
}
1305+
1306+
// 等待所有goroutine退出,最多5秒
1307+
select {
1308+
case <-done:
1309+
case <-time.After(5 * time.Second):
1310+
log.Printf("等待goroutine退出超时: %s", connID)
1311+
}
12581312
}
12591313

12601314
func (cp *commonPage) terminal(c *gin.Context) {

service/singleton/singleton.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1712,15 +1712,15 @@ var (
17121712

17131713
func NewMemoryMonitor() *MemoryMonitor {
17141714
return &MemoryMonitor{
1715-
highThreshold: 1200, // 提高到1200MB,减少误触发清理
1716-
warningThreshold: 600, // 提高到600MB警告阈值
1717-
maxGoroutines: 600, // 提高到600个goroutine限制
1715+
highThreshold: 2048, // 2GB才触发高压清理,避免误触发
1716+
warningThreshold: 1024, // 1GB警告阈值,更合理
1717+
maxGoroutines: 1000, // 1000个goroutine限制,避免正常业务被清理
17181718
isHighPressure: false,
17191719
}
17201720
}
17211721

17221722
func (mm *MemoryMonitor) Start() {
1723-
mm.ticker = time.NewTicker(10 * time.Second) // 每10秒检查一次
1723+
mm.ticker = time.NewTicker(30 * time.Second) // 改为30秒检查一次,降低频率
17241724

17251725
go func() {
17261726
defer func() {

0 commit comments

Comments
 (0)