@@ -2,14 +2,19 @@ package handler
22
33import (
44 "bufio"
5+ "encoding/json"
56 "io"
67 "log"
78 "net/http"
89 "os"
10+ "strconv"
911 "strings"
1012 "sync"
13+ "sync/atomic"
14+ "time"
1115
1216 "github.com/awsl-project/maxx/internal/domain"
17+ "github.com/awsl-project/maxx/internal/event"
1318 "github.com/gorilla/websocket"
1419)
1520
@@ -28,8 +33,13 @@ type WebSocketHub struct {
2833 clients map [* websocket.Conn ]bool
2934 broadcast chan WSMessage
3035 mu sync.RWMutex
36+
37+ // broadcast channel 满时的丢弃计数(热路径:只做原子累加)
38+ broadcastDroppedTotal atomic.Uint64
3139}
3240
41+ const websocketWriteTimeout = 5 * time .Second
42+
3343func NewWebSocketHub () * WebSocketHub {
3444 hub := & WebSocketHub {
3545 clients : make (map [* websocket.Conn ]bool ),
@@ -41,15 +51,47 @@ func NewWebSocketHub() *WebSocketHub {
4151
4252func (h * WebSocketHub ) run () {
4353 for msg := range h .broadcast {
54+ // 避免在持锁状态下进行网络写入;同时修复 RLock 下 delete map 的数据竞争风险
4455 h .mu .RLock ()
56+ clients := make ([]* websocket.Conn , 0 , len (h .clients ))
4557 for client := range h .clients {
46- err := client .WriteJSON (msg )
47- if err != nil {
48- client .Close ()
58+ clients = append (clients , client )
59+ }
60+ h .mu .RUnlock ()
61+
62+ var toRemove []* websocket.Conn
63+ for _ , client := range clients {
64+ _ = client .SetWriteDeadline (time .Now ().Add (websocketWriteTimeout ))
65+ if err := client .WriteJSON (msg ); err != nil {
66+ _ = client .Close ()
67+ toRemove = append (toRemove , client )
68+ }
69+ }
70+
71+ if len (toRemove ) > 0 {
72+ h .mu .Lock ()
73+ for _ , client := range toRemove {
4974 delete (h .clients , client )
5075 }
76+ h .mu .Unlock ()
77+ }
78+ }
79+ }
80+
81+ func (h * WebSocketHub ) tryEnqueueBroadcast (msg WSMessage , meta string ) {
82+ select {
83+ case h .broadcast <- msg :
84+ default :
85+ dropped := h .broadcastDroppedTotal .Add (1 )
86+ // 避免日志刷屏:首次 + 每100次打印一次,确保可观测性但不拖慢热路径。
87+ if dropped == 1 || dropped % 100 == 0 {
88+ meta = strings .TrimSpace (meta )
89+ if meta != "" {
90+ log .Printf ("[WebSocket] drop broadcast message type=%s %s dropped_total=%d" , msg .Type , meta , dropped )
91+ } else {
92+ log .Printf ("[WebSocket] drop broadcast message type=%s dropped_total=%d" , msg .Type , dropped )
93+ }
5194 }
52- h .mu .RUnlock ()
5395 }
5496}
5597
@@ -81,33 +123,84 @@ func (h *WebSocketHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
81123}
82124
83125func (h * WebSocketHub ) BroadcastProxyRequest (req * domain.ProxyRequest ) {
84- h .broadcast <- WSMessage {
126+ sanitized := event .SanitizeProxyRequestForBroadcast (req )
127+ var data interface {} = sanitized
128+ var meta string
129+ if sanitized != nil {
130+ // 无论 Sanitize 是否返回原指针,都强制做一次浅拷贝快照,避免异步消费者读到后续可变更的数据。
131+ snapshot := * sanitized
132+ data = snapshot
133+ meta = "requestID=" + snapshot .RequestID
134+ if snapshot .ID != 0 {
135+ meta += " requestDbID=" + strconv .FormatUint (snapshot .ID , 10 )
136+ }
137+ }
138+ msg := WSMessage {
85139 Type : "proxy_request_update" ,
86- Data : req ,
140+ Data : data ,
87141 }
142+ h .tryEnqueueBroadcast (msg , meta )
88143}
89144
90145func (h * WebSocketHub ) BroadcastProxyUpstreamAttempt (attempt * domain.ProxyUpstreamAttempt ) {
91- h .broadcast <- WSMessage {
146+ sanitized := event .SanitizeProxyUpstreamAttemptForBroadcast (attempt )
147+ var data interface {} = sanitized
148+ var meta string
149+ if sanitized != nil {
150+ snapshot := * sanitized
151+ data = snapshot
152+ if snapshot .ProxyRequestID != 0 {
153+ meta = "proxyRequestID=" + strconv .FormatUint (snapshot .ProxyRequestID , 10 )
154+ }
155+ if snapshot .ID != 0 {
156+ if meta != "" {
157+ meta += " "
158+ }
159+ meta += "attemptDbID=" + strconv .FormatUint (snapshot .ID , 10 )
160+ }
161+ }
162+ msg := WSMessage {
92163 Type : "proxy_upstream_attempt_update" ,
93- Data : attempt ,
164+ Data : data ,
94165 }
166+ h .tryEnqueueBroadcast (msg , meta )
95167}
96168
97169// BroadcastMessage sends a custom message with specified type to all connected clients
98170func (h * WebSocketHub ) BroadcastMessage (messageType string , data interface {}) {
99- h .broadcast <- WSMessage {
171+ // 约定:BroadcastMessage 允许调用方传入 map/struct/指针等可变对象。
172+ //
173+ // 但由于实际发送是异步的(入队后由 run() 写到各连接),如果这里直接把可变指针放进 channel,
174+ // 调用方在入队后继续修改数据,会导致与 BroadcastProxyRequest 类似的数据竞态。
175+ //
176+ // 因此这里先把 data 预先序列化为 json.RawMessage,形成不可变快照;后续 WriteJSON 会直接写入该快照。
177+ var snapshot interface {} = data
178+ if data != nil {
179+ if raw , ok := data .(json.RawMessage ); ok {
180+ snapshot = raw
181+ } else {
182+ b , err := json .Marshal (data )
183+ if err != nil {
184+ log .Printf ("[WebSocket] drop broadcast message type=%s: marshal snapshot failed: %v" , messageType , err )
185+ return
186+ }
187+ snapshot = json .RawMessage (b )
188+ }
189+ }
190+ msg := WSMessage {
100191 Type : messageType ,
101- Data : data ,
192+ Data : snapshot ,
102193 }
194+ h .tryEnqueueBroadcast (msg , "" )
103195}
104196
105197// BroadcastLog sends a log message to all connected clients
106198func (h * WebSocketHub ) BroadcastLog (message string ) {
107- h . broadcast <- WSMessage {
199+ msg := WSMessage {
108200 Type : "log_message" ,
109201 Data : message ,
110202 }
203+ h .tryEnqueueBroadcast (msg , "" )
111204}
112205
113206// WebSocketLogWriter implements io.Writer to capture logs and broadcast via WebSocket
0 commit comments