11using System ;
2+ using System . Collections . Concurrent ;
23using System . IO ;
34using System . Net . WebSockets ;
45using System . Text . Json ;
@@ -31,6 +32,7 @@ public class CqWsSession : CqSession, ICqPostSession, ICqActionSession, IDisposa
3132
3233 // 主循环线程
3334 private Task ? mainLoopTask ;
35+ private Task ? mainPostLoopTask ;
3436 private Task ? standaloneActionLoopTask ;
3537
3638 // 三个接入点的套接字
@@ -39,6 +41,8 @@ public class CqWsSession : CqSession, ICqPostSession, ICqActionSession, IDisposa
3941 private WebSocket ? apiWebSocket ;
4042 private WebSocket ? eventWebSocket ;
4143
44+ private ConcurrentQueue < CqPostModel > postQueue ;
45+
4246 /// <summary>
4347 /// 已连接
4448 /// </summary>
@@ -96,6 +100,7 @@ public CqWsSession(CqWsSessionOptions options)
96100 // 初始化 action 发送器 和 post 管道
97101 actionSender = new CqWsActionSender ( this , apiWebSocket ?? webSocket ?? throw new InvalidOperationException ( "This would never happened" ) ) ;
98102 postPipeline = new CqPostPipeline ( ) ;
103+ postQueue = new ConcurrentQueue < CqPostModel > ( ) ;
99104 }
100105
101106 internal CqWsSession ( WebSocket remoteWebSocket , Uri baseUri , string ? accessToken , int bufferSize )
@@ -108,34 +113,20 @@ internal CqWsSession(WebSocket remoteWebSocket, Uri baseUri, string? accessToken
108113
109114 actionSender = new CqWsActionSender ( this , remoteWebSocket ) ;
110115 postPipeline = new CqPostPipeline ( ) ;
111- }
112-
113- internal async Task ProcPostModelAsync ( CqPostModel postModel )
114- {
115- CqPostContext ? postContext = CqPostContext . FromModel ( postModel ) ;
116- postContext ? . SetSession ( this ) ;
117-
118- // 如果 post 上下文不为空, 则使用 PostPipeline 处理该事件
119- if ( postContext != null )
120- {
121- await postPipeline . ExecuteAsync ( postContext ) ;
122-
123- // WebSocket 需要模拟 QuickAction
124- await actionSender . HandleQuickAction ( postContext , postModel ) ;
125- }
116+ postQueue = new ConcurrentQueue < CqPostModel > ( ) ;
126117 }
127118
128119 /// <summary>
129120 /// 处理 WebSocket 数据
130121 /// </summary>
131122 /// <param name="wsDataModel"></param>
132123 /// <returns></returns>
133- private async Task ProcWsDataAsync ( CqWsDataModel ? wsDataModel )
124+ private void ProcWsDataAsync ( CqWsDataModel ? wsDataModel )
134125 {
135126 // 如果是 post 上报
136127 if ( wsDataModel is CqPostModel postModel )
137128 {
138- await ProcPostModelAsync ( postModel ) ;
129+ postQueue . Enqueue ( postModel ) ;
139130 }
140131 // 否则如果是 action 请求响应
141132 else if ( wsDataModel is CqActionResultRaw actionResultRaw )
@@ -156,12 +147,10 @@ private async Task WebSocketLoop(WebSocket webSocket)
156147 MemoryStream ms = new MemoryStream ( ) ;
157148 while ( ! disposed )
158149 {
159- IsConnected = webSocket . State == WebSocketState . Open ;
150+ IsConnected & = webSocket . State == WebSocketState . Open ;
160151
161152 if ( ! IsConnected )
162- {
163153 return ;
164- }
165154
166155 try
167156 {
@@ -181,12 +170,16 @@ private async Task WebSocketLoop(WebSocket webSocket)
181170 try // 直接捕捉 JSON 反序列化异常
182171 {
183172#endif
184- // 反序列化为 WebSocket 数据 (自己抽的类
185- string json = GlobalConfig . TextEncoding . GetString ( ms . ToArray ( ) ) ;
186- CqWsDataModel ? wsDataModel = JsonSerializer . Deserialize < CqWsDataModel > ( json , JsonHelper . Options ) ;
173+ #if DEBUG
174+ // 反序列化为 WebSocket 数据 (自己抽的类
175+ string json = GlobalConfig . TextEncoding . GetString ( ms . ToArray ( ) ) ;
176+ #endif
177+
178+ ms . Seek ( 0 , SeekOrigin . Begin ) ;
179+ CqWsDataModel ? wsDataModel = JsonSerializer . Deserialize < CqWsDataModel > ( ms , JsonHelper . Options ) ;
187180
188181 // 处理 WebSocket 数据
189- await ProcWsDataAsync ( wsDataModel ) ;
182+ ProcWsDataAsync ( wsDataModel ) ;
190183
191184#if DEBUG
192185 if ( wsDataModel is not CqPostModel )
@@ -202,6 +195,34 @@ private async Task WebSocketLoop(WebSocket webSocket)
202195 }
203196 }
204197
198+ private async Task PostProcLoop ( )
199+ {
200+ while ( ! disposed )
201+ {
202+ if ( ! IsConnected )
203+ return ;
204+
205+ if ( postQueue . TryDequeue ( out var postModel ) )
206+ {
207+ CqPostContext ? postContext = CqPostContext . FromModel ( postModel ) ;
208+ postContext ? . SetSession ( this ) ;
209+
210+ // 如果 post 上下文不为空, 则使用 PostPipeline 处理该事件
211+ if ( postContext != null )
212+ {
213+ await postPipeline . ExecuteAsync ( postContext ) ;
214+
215+ // WebSocket 需要模拟 QuickAction
216+ await actionSender . HandleQuickAction ( postContext , postModel ) ;
217+ }
218+ }
219+ else
220+ {
221+ await Task . Delay ( 1 ) ;
222+ }
223+ }
224+ }
225+
205226 /// <summary>
206227 /// 连接
207228 /// </summary>
@@ -283,6 +304,9 @@ public async Task StartAsync()
283304 // 当使用单独的 API 套接字的时候, 我们需要监听 API 套接字
284305 if ( apiWebSocket != null )
285306 standaloneActionLoopTask = WebSocketLoop ( apiWebSocket ) ;
307+
308+ // 单独线程处理上报
309+ mainPostLoopTask = PostProcLoop ( ) ;
286310 }
287311
288312 /// <summary>
@@ -303,7 +327,7 @@ public async Task WaitForShutdownAsync()
303327 if ( mainLoopTask == null )
304328 throw new InvalidOperationException ( "Session is not started yet" ) ;
305329
306- await mainLoopTask ;
330+ await Task . WhenAll ( mainLoopTask , mainPostLoopTask ) ;
307331 }
308332
309333 /// <summary>
0 commit comments