Skip to content

Commit df751bf

Browse files
committed
Update.
1 parent 9c67849 commit df751bf

File tree

2 files changed

+134
-6
lines changed

2 files changed

+134
-6
lines changed

service/rpc/server.go

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"math"
88
"net"
9+
"runtime"
910
"strings"
1011
"sync"
1112
"time"
@@ -47,8 +48,53 @@ func isConnectionError(err error) bool {
4748
return false
4849
}
4950

51+
// GetGoroutineStats 获取 goroutine 统计信息
52+
func GetGoroutineStats() (total int, requestTasks int64) {
53+
return runtime.NumGoroutine(), activeRequestTaskGoroutines
54+
}
55+
56+
// ForceCleanupStaleConnections 强制清理僵尸连接(紧急情况使用)
57+
func ForceCleanupStaleConnections() int {
58+
cleaned := 0
59+
singleton.ServerLock.Lock()
60+
defer singleton.ServerLock.Unlock()
61+
62+
for serverID, server := range singleton.ServerList {
63+
if server != nil && server.TaskClose != nil {
64+
// 检查连接是否长时间无活动
65+
if time.Since(server.LastActive) > 10*time.Minute {
66+
server.TaskCloseLock.Lock()
67+
if server.TaskClose != nil {
68+
// 强制关闭僵尸连接
69+
select {
70+
case server.TaskClose <- fmt.Errorf("force cleanup stale connection"):
71+
default:
72+
}
73+
server.TaskClose = nil
74+
server.TaskStream = nil
75+
cleaned++
76+
log.Printf("强制清理服务器 %d 的僵尸连接", serverID)
77+
}
78+
server.TaskCloseLock.Unlock()
79+
}
80+
}
81+
}
82+
83+
if cleaned > 0 {
84+
log.Printf("强制清理了 %d 个僵尸连接", cleaned)
85+
}
86+
87+
return cleaned
88+
}
89+
5090
var ServerHandlerSingleton *ServerHandler
5191

92+
// goroutine 计数器,用于监控 RequestTask goroutine 数量
93+
var (
94+
activeRequestTaskGoroutines int64
95+
maxRequestTaskGoroutines int64 = 500 // 最大允许的 RequestTask goroutine 数量
96+
)
97+
5298
type ServerHandler struct {
5399
Auth *authHandler
54100
ioStreams map[string]*ioStreamContext
@@ -157,6 +203,38 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
157203
return err
158204
}
159205

206+
// 检查 goroutine 数量限制,防止 goroutine 泄漏导致程序崩溃
207+
currentGoroutines := func() int64 {
208+
// 使用原子操作获取当前活跃的 RequestTask goroutine 数量
209+
current := activeRequestTaskGoroutines
210+
total := int64(runtime.NumGoroutine())
211+
212+
// 如果总 goroutine 数量过多,拒绝新连接
213+
if total > 1000 {
214+
log.Printf("警告:总 goroutine 数量过多 (%d),拒绝新的 RequestTask 连接", total)
215+
return -1
216+
}
217+
218+
// 如果 RequestTask goroutine 数量超过限制,拒绝新连接
219+
if current >= maxRequestTaskGoroutines {
220+
log.Printf("警告:RequestTask goroutine 数量达到限制 (%d/%d),拒绝新连接", current, maxRequestTaskGoroutines)
221+
return -1
222+
}
223+
224+
return current
225+
}()
226+
227+
if currentGoroutines == -1 {
228+
return fmt.Errorf("服务器负载过高,请稍后重试")
229+
}
230+
231+
// 增加活跃 goroutine 计数
232+
activeRequestTaskGoroutines++
233+
defer func() {
234+
// 确保在函数退出时减少计数
235+
activeRequestTaskGoroutines--
236+
}()
237+
160238
// 使用带缓冲的通道避免阻塞
161239
closeCh := make(chan error, 1)
162240
// 使用done channel来安全地通知监控goroutine停止
@@ -183,8 +261,8 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
183261
singleton.ServerList[clientID].TaskCloseLock.Unlock()
184262
singleton.ServerLock.RUnlock()
185263

186-
// 创建一个带超时的上下文,减少超时时间避免goroutine泄漏
187-
ctx, cancel := context.WithTimeout(stream.Context(), 5*time.Minute)
264+
// 创建一个带超时的上下文,大幅减少超时时间避免goroutine泄漏
265+
ctx, cancel := context.WithTimeout(stream.Context(), 2*time.Minute)
188266
defer cancel()
189267

190268
// 监听连接状态,当连接断开时自动清理
@@ -196,8 +274,8 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
196274
}
197275
}()
198276

199-
// 使用定时器避免无限等待
200-
ticker := time.NewTicker(30 * time.Second)
277+
// 使用定时器避免无限等待,缩短检查间隔
278+
ticker := time.NewTicker(15 * time.Second)
201279
defer ticker.Stop()
202280

203281
for {
@@ -251,6 +329,14 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
251329
// 服务器已被删除,退出监控
252330
return
253331
}
332+
333+
// 定期记录 goroutine 状态,帮助监控泄漏
334+
totalGoroutines := runtime.NumGoroutine()
335+
activeRequestTasks := activeRequestTaskGoroutines
336+
if totalGoroutines > 800 || activeRequestTasks > 400 {
337+
log.Printf("Goroutine 监控 - 总数: %d, RequestTask: %d, 服务器: %d",
338+
totalGoroutines, activeRequestTasks, clientID)
339+
}
254340
}
255341
}
256342
}()
@@ -270,8 +356,8 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
270356
singleton.ServerLock.RUnlock()
271357
}()
272358

273-
// 等待连接关闭或超时,使用定时器避免无限等待
274-
ticker := time.NewTicker(1 * time.Minute)
359+
// 等待连接关闭或超时,使用定时器避免无限等待,缩短检查间隔
360+
ticker := time.NewTicker(30 * time.Second)
275361
defer ticker.Stop()
276362

277363
for {
@@ -310,6 +396,14 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
310396
// 服务器已被删除,正常退出,移除频繁的退出日志
311397
return nil
312398
}
399+
400+
// 检查 goroutine 泄漏情况,如果过多则强制退出
401+
totalGoroutines := runtime.NumGoroutine()
402+
if totalGoroutines > 1200 {
403+
log.Printf("严重警告:goroutine 数量过多 (%d),强制断开服务器 %d 的连接以防止崩溃",
404+
totalGoroutines, clientID)
405+
return fmt.Errorf("goroutine 数量过多,强制断开连接")
406+
}
313407
}
314408
}
315409
}

service/singleton/singleton.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,6 +1799,9 @@ func (mm *MemoryMonitor) moderateCleanup() {
17991799
// 清理报警数据
18001800
cleanupAlertMemoryData()
18011801

1802+
// 清理僵尸 goroutine 连接
1803+
cleanupStaleGoroutineConnections()
1804+
18021805
// 适度的GC
18031806
runtime.GC()
18041807

@@ -1835,6 +1838,37 @@ func GetGoroutineCount() int64 {
18351838
return atomic.LoadInt64(&goroutineCount)
18361839
}
18371840

1841+
// cleanupStaleGoroutineConnections 清理僵尸 goroutine 连接
1842+
func cleanupStaleGoroutineConnections() {
1843+
cleaned := 0
1844+
ServerLock.Lock()
1845+
defer ServerLock.Unlock()
1846+
1847+
for _, server := range ServerList {
1848+
if server != nil && server.TaskClose != nil {
1849+
// 检查连接是否长时间无活动(超过5分钟)
1850+
if time.Since(server.LastActive) > 5*time.Minute {
1851+
server.TaskCloseLock.Lock()
1852+
if server.TaskClose != nil {
1853+
// 强制关闭僵尸连接
1854+
select {
1855+
case server.TaskClose <- fmt.Errorf("cleanup stale connection due to memory pressure"):
1856+
default:
1857+
}
1858+
server.TaskClose = nil
1859+
server.TaskStream = nil
1860+
cleaned++
1861+
}
1862+
server.TaskCloseLock.Unlock()
1863+
}
1864+
}
1865+
}
1866+
1867+
if cleaned > 0 {
1868+
log.Printf("内存清理:清理了 %d 个僵尸连接", cleaned)
1869+
}
1870+
}
1871+
18381872
// OptimizeDatabase 定期优化数据库性能
18391873
func OptimizeDatabase() {
18401874
if isSystemBusy() {

0 commit comments

Comments
 (0)