Skip to content

Commit 522b3df

Browse files
committed
Update.
1 parent 7bc4323 commit 522b3df

File tree

2 files changed

+43
-21
lines changed

2 files changed

+43
-21
lines changed

service/rpc/server.go

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
9191

9292
// 使用带缓冲的通道避免阻塞
9393
closeCh := make(chan error, 1)
94+
// 使用done channel来安全地通知监控goroutine停止
95+
done := make(chan struct{})
9496

9597
singleton.ServerLock.RLock()
9698
if singleton.ServerList[clientID] == nil {
@@ -115,34 +117,51 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
115117

116118
// 监听连接状态,当连接断开时自动清理
117119
go func() {
118-
<-stream.Context().Done()
119-
// 连接断开时清理资源
120-
singleton.ServerLock.RLock()
121-
if singleton.ServerList[clientID] != nil {
122-
singleton.ServerList[clientID].TaskCloseLock.Lock()
123-
if singleton.ServerList[clientID].TaskClose == closeCh {
124-
// 只有当前连接才清理
125-
singleton.ServerList[clientID].TaskStream = nil
126-
singleton.ServerList[clientID].TaskClose = nil
127-
}
128-
singleton.ServerList[clientID].TaskCloseLock.Unlock()
129-
}
130-
singleton.ServerLock.RUnlock()
131-
132-
// 发送关闭信号
120+
defer close(done) // 确保done channel被关闭
121+
133122
select {
134-
case closeCh <- stream.Context().Err():
135-
default:
123+
case <-stream.Context().Done():
124+
// 连接断开时清理资源
125+
singleton.ServerLock.RLock()
126+
if singleton.ServerList[clientID] != nil {
127+
singleton.ServerList[clientID].TaskCloseLock.Lock()
128+
if singleton.ServerList[clientID].TaskClose == closeCh {
129+
// 只有当前连接才清理
130+
singleton.ServerList[clientID].TaskStream = nil
131+
singleton.ServerList[clientID].TaskClose = nil
132+
}
133+
singleton.ServerList[clientID].TaskCloseLock.Unlock()
134+
}
135+
singleton.ServerLock.RUnlock()
136+
137+
// 安全地发送关闭信号,使用非阻塞发送
138+
select {
139+
case closeCh <- stream.Context().Err():
140+
case <-done: // 如果主goroutine已经退出,停止发送
141+
return
142+
default:
143+
// 通道可能已关闭或已满,忽略
144+
}
145+
case <-done:
146+
// 主goroutine要求停止监控
147+
return
136148
}
137-
close(closeCh)
138149
}()
139150

140151
// 设置超时防止无限等待
152+
ctx, cancel := context.WithTimeout(stream.Context(), 30*time.Minute)
153+
defer cancel()
154+
defer close(done) // 确保监控goroutine被通知停止
155+
141156
select {
142157
case err := <-closeCh:
143158
return err
144-
case <-time.After(30 * time.Minute): // 30分钟超时
145-
return fmt.Errorf("request task timeout")
159+
case <-ctx.Done():
160+
// 超时或连接取消
161+
if ctx.Err() == context.DeadlineExceeded {
162+
return fmt.Errorf("request task timeout after 30 minutes")
163+
}
164+
return ctx.Err()
146165
}
147166
}
148167

service/singleton/singleton.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,14 @@ func CheckServerOnlineStatus() {
260260
// 清理任务连接资源,防止内存泄漏
261261
server.TaskCloseLock.Lock()
262262
if server.TaskClose != nil {
263+
// 安全发送关闭信号,避免向已关闭的通道发送
263264
select {
264265
case server.TaskClose <- fmt.Errorf("server offline"):
265266
default:
267+
// 通道可能已关闭或已满,忽略
266268
}
267-
close(server.TaskClose)
269+
// 标记通道为nil,但不在这里关闭
270+
// 让RequestTask方法的goroutine负责关闭
268271
server.TaskClose = nil
269272
}
270273
server.TaskStream = nil

0 commit comments

Comments
 (0)