-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathflush-gate.js
More file actions
103 lines (86 loc) · 3.01 KB
/
flush-gate.js
File metadata and controls
103 lines (86 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// FlushGate: 消息合并模块(借鉴 Claude Code bridge/flushGate.ts)
// 空闲时攒 batchDelayMs 窗口合并连续消息;处理中来的消息入缓冲区,处理完自动 flush
export function createFlushGate(options = {}) {
const {
batchDelayMs = 800,
maxBufferSize = 5,
onBuffered = null, // async (chatId) => {} — 消息入缓冲时的回调(用于发"已收到"提示)
} = options;
// 每个 chatId 的状态
const gates = new Map(); // chatId -> { processing, buffer[], batchTimer }
function getGate(chatId) {
if (!gates.has(chatId)) {
gates.set(chatId, { processing: false, buffer: [], batchTimer: null, processFn: null });
}
return gates.get(chatId);
}
/**
* 入队一条消息
* @param {number} chatId
* @param {{ ctx, prompt: string }} message
* @param {function} processFn - async (ctx, combinedPrompt) => void
*/
async function enqueue(chatId, message, processFn) {
const gate = getGate(chatId);
gate.processFn = processFn; // 始终用最新的 processFn
if (gate.processing) {
// 正在处理中 → 入缓冲区
if (gate.buffer.length < maxBufferSize) {
gate.buffer.push(message);
if (onBuffered) {
await onBuffered(chatId, message.ctx).catch(() => {});
}
} else {
console.warn(`[FlushGate] chatId=${chatId} buffer full (${maxBufferSize}), dropping message`);
}
return;
}
// 空闲态 → 攒一个短窗口
gate.buffer.push(message);
if (gate.batchTimer) return; // 已经在等窗口了
gate.batchTimer = setTimeout(async () => {
gate.batchTimer = null;
await flush(chatId);
}, batchDelayMs);
}
async function flush(chatId) {
const gate = getGate(chatId);
if (gate.processing || gate.buffer.length === 0) return;
// 取出缓冲区所有消息
const batch = gate.buffer.splice(0);
gate.processing = true;
try {
// 合并 prompt
const combinedPrompt = batch.length === 1
? batch[0].prompt
: batch.map((m, i) => `[消息 ${i + 1}]\n${m.prompt}`).join("\n\n");
const latestCtx = batch[batch.length - 1].ctx; // 用最后一条消息的 ctx 回复
await gate.processFn(latestCtx, combinedPrompt);
} finally {
gate.processing = false;
// 处理完成后,如果缓冲区还有新消息,继续 flush
if (gate.buffer.length > 0) {
await flush(chatId);
}
}
}
function isProcessing(chatId) {
return gates.get(chatId)?.processing || false;
}
function getPendingCount(chatId) {
return gates.get(chatId)?.buffer.length || 0;
}
/**
* 清空排队中的消息(不影响正在处理的任务)
* @param {number} chatId
* @returns {number} 被清除的消息数
*/
function clearBuffer(chatId) {
const gate = gates.get(chatId);
if (!gate) return 0;
const count = gate.buffer.length;
gate.buffer.splice(0);
return count;
}
return { enqueue, isProcessing, getPendingCount, clearBuffer };
}