Skip to content

Commit bb7a3f1

Browse files
committed
Merge branch 'dev'
2 parents f0932ea + a5133c3 commit bb7a3f1

File tree

2 files changed

+56
-63
lines changed

2 files changed

+56
-63
lines changed

src/EleCho.GoCqHttpSdk/CqWsSession.cs

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.IO;
34
using System.Net.WebSockets;
45
using System.Text.Json;
@@ -31,6 +32,7 @@ public class CqWsSession : CqSession, ICqPostSession, ICqActionSession, IDisposa
3132

3233
// 主循环线程
3334
private Task? mainLoopTask;
35+
private Task? mainPostLoopTask;
3436
private Task? standaloneActionLoopTask;
3537

3638
// 三个接入点的套接字
@@ -39,6 +41,8 @@ public class CqWsSession : CqSession, ICqPostSession, ICqActionSession, IDisposa
3941
private WebSocket? apiWebSocket;
4042
private WebSocket? eventWebSocket;
4143

44+
private ConcurrentQueue<CqPostModel> postQueue;
45+
4246
/// <summary>
4347
/// 已连接
4448
/// </summary>
@@ -96,6 +100,7 @@ public CqWsSession(CqWsSessionOptions options)
96100
// 初始化 action 发送器 和 post 管道
97101
actionSender = new CqWsActionSender(this, apiWebSocket ?? webSocket ?? throw new InvalidOperationException("This would never happened"));
98102
postPipeline = new CqPostPipeline();
103+
postQueue = new ConcurrentQueue<CqPostModel>();
99104
}
100105

101106
internal CqWsSession(WebSocket remoteWebSocket, Uri baseUri, string? accessToken, int bufferSize)
@@ -108,34 +113,20 @@ internal CqWsSession(WebSocket remoteWebSocket, Uri baseUri, string? accessToken
108113

109114
actionSender = new CqWsActionSender(this, remoteWebSocket);
110115
postPipeline = new CqPostPipeline();
111-
}
112-
113-
internal async Task ProcPostModelAsync(CqPostModel postModel)
114-
{
115-
CqPostContext? postContext = CqPostContext.FromModel(postModel);
116-
postContext?.SetSession(this);
117-
118-
// 如果 post 上下文不为空, 则使用 PostPipeline 处理该事件
119-
if (postContext != null)
120-
{
121-
await postPipeline.ExecuteAsync(postContext);
122-
123-
// WebSocket 需要模拟 QuickAction
124-
await actionSender.HandleQuickAction(postContext, postModel);
125-
}
116+
postQueue = new ConcurrentQueue<CqPostModel>();
126117
}
127118

128119
/// <summary>
129120
/// 处理 WebSocket 数据
130121
/// </summary>
131122
/// <param name="wsDataModel"></param>
132123
/// <returns></returns>
133-
private async Task ProcWsDataAsync(CqWsDataModel? wsDataModel)
124+
private void ProcWsDataAsync(CqWsDataModel? wsDataModel)
134125
{
135126
// 如果是 post 上报
136127
if (wsDataModel is CqPostModel postModel)
137128
{
138-
await ProcPostModelAsync(postModel);
129+
postQueue.Enqueue(postModel);
139130
}
140131
// 否则如果是 action 请求响应
141132
else if (wsDataModel is CqActionResultRaw actionResultRaw)
@@ -156,12 +147,10 @@ private async Task WebSocketLoop(WebSocket webSocket)
156147
MemoryStream ms = new MemoryStream();
157148
while (!disposed)
158149
{
159-
IsConnected = webSocket.State == WebSocketState.Open;
150+
IsConnected &= webSocket.State == WebSocketState.Open;
160151

161152
if (!IsConnected)
162-
{
163153
return;
164-
}
165154

166155
try
167156
{
@@ -181,12 +170,16 @@ private async Task WebSocketLoop(WebSocket webSocket)
181170
try // 直接捕捉 JSON 反序列化异常
182171
{
183172
#endif
184-
// 反序列化为 WebSocket 数据 (自己抽的类
185-
string json = GlobalConfig.TextEncoding.GetString(ms.ToArray());
186-
CqWsDataModel? wsDataModel = JsonSerializer.Deserialize<CqWsDataModel>(json, JsonHelper.Options);
173+
#if DEBUG
174+
// 反序列化为 WebSocket 数据 (自己抽的类
175+
string json = GlobalConfig.TextEncoding.GetString(ms.ToArray());
176+
#endif
177+
178+
ms.Seek(0, SeekOrigin.Begin);
179+
CqWsDataModel? wsDataModel = JsonSerializer.Deserialize<CqWsDataModel>(ms, JsonHelper.Options);
187180

188181
// 处理 WebSocket 数据
189-
await ProcWsDataAsync(wsDataModel);
182+
ProcWsDataAsync(wsDataModel);
190183

191184
#if DEBUG
192185
if (wsDataModel is not CqPostModel)
@@ -202,6 +195,34 @@ private async Task WebSocketLoop(WebSocket webSocket)
202195
}
203196
}
204197

198+
private async Task PostProcLoop()
199+
{
200+
while (!disposed)
201+
{
202+
if (!IsConnected)
203+
return;
204+
205+
if (postQueue.TryDequeue(out var postModel))
206+
{
207+
CqPostContext? postContext = CqPostContext.FromModel(postModel);
208+
postContext?.SetSession(this);
209+
210+
// 如果 post 上下文不为空, 则使用 PostPipeline 处理该事件
211+
if (postContext != null)
212+
{
213+
await postPipeline.ExecuteAsync(postContext);
214+
215+
// WebSocket 需要模拟 QuickAction
216+
await actionSender.HandleQuickAction(postContext, postModel);
217+
}
218+
}
219+
else
220+
{
221+
await Task.Delay(1);
222+
}
223+
}
224+
}
225+
205226
/// <summary>
206227
/// 连接
207228
/// </summary>
@@ -283,6 +304,9 @@ public async Task StartAsync()
283304
// 当使用单独的 API 套接字的时候, 我们需要监听 API 套接字
284305
if (apiWebSocket != null)
285306
standaloneActionLoopTask = WebSocketLoop(apiWebSocket);
307+
308+
// 单独线程处理上报
309+
mainPostLoopTask = PostProcLoop();
286310
}
287311

288312
/// <summary>
@@ -303,7 +327,7 @@ public async Task WaitForShutdownAsync()
303327
if (mainLoopTask == null)
304328
throw new InvalidOperationException("Session is not started yet");
305329

306-
await mainLoopTask;
330+
await Task.WhenAll(mainLoopTask, mainPostLoopTask);
307331
}
308332

309333
/// <summary>

src/TestConsole/Program.cs

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,56 +23,25 @@ internal class Program
2323
{
2424
public const int WebSocketPort = 5701;
2525

26+
static CqRHttpSession rHttpSession = new CqRHttpSession(new CqRHttpSessionOptions()
27+
{
28+
BaseUri = new Uri($"http://localhost:5701"),
29+
});
30+
2631
static CqWsSession session = new CqWsSession(new CqWsSessionOptions()
2732
{
2833
BaseUri = new Uri($"ws://127.0.0.1:{WebSocketPort}"),
29-
UseApiEndPoint = true,
30-
UseEventEndPoint = true,
3134
});
3235

3336
private static async Task Main(string[] args)
3437
{
35-
Console.Write("OpenAI API Key:\n> ");
36-
var apikey =
37-
Console.ReadLine()!;
38-
39-
session.UseMessageMatchPlugin(new MessageMatchPlugin1(session));
40-
session.UseMessageMatchPlugin(new OpenAiMatchPlugin(session, apikey));
41-
session.UseMessageMatchPlugin(new MessageMatchPlugin2(session));
42-
43-
session.UseGroupRequest(async context =>
44-
{
45-
await session.ApproveGroupRequestAsync(context.Flag, context.GroupRequestType);
46-
});
47-
4838
session.UseGroupMessage(async context =>
4939
{
50-
Console.WriteLine($"{context.Sender.Nickname}: {context.Message.Text}");
51-
52-
if (context.Message.Text.StartsWith("ocr ", StringComparison.OrdinalIgnoreCase))
40+
if (context.Message.Text.StartsWith("echo "))
5341
{
54-
var img = context.Message.FirstOrDefault(x => x is CqImageMsg);
55-
if (img is CqImageMsg imgmsg)
56-
{
57-
var ocrrst =
58-
await session.OcrImageAsync(imgmsg.File);
59-
60-
if (ocrrst == null)
61-
return;
62-
63-
StringBuilder sb = new StringBuilder();
64-
sb.AppendLine("OCR:");
65-
foreach (var txtdet in ocrrst.Texts)
66-
sb.AppendLine($"{txtdet.Text} Confidence:{txtdet.Confidence}");
67-
68-
await session.SendGroupMessageAsync(context.GroupId, new CqMessage(sb.ToString()));
69-
}
42+
await session.SendGroupMessageAsync(context.GroupId, new CqMessage(context.Message.Text.Substring(5)));
7043
}
7144

72-
if (context.Message.Text.EndsWith("..."))
73-
{
74-
await session.SendGroupMessageAsync(context.GroupId, context.Message);
75-
}
7645
});
7746

7847
Console.WriteLine("OK");

0 commit comments

Comments
 (0)