Skip to content

Commit 288b5f9

Browse files
committed
fix(streaming): reduce stream events with batching scheduler
Add StreamUpdateScheduler to batch and debounce streaming updates, reducing event frequency by batching updates over 200ms intervals for rendering and 600ms for database writes. Introduces stream_kind to distinguish init/delta/final events and filters events by conversation ID for performance.
1 parent 8fd50e4 commit 288b5f9

File tree

10 files changed

+853
-276
lines changed

10 files changed

+853
-276
lines changed

src/main/presenter/agentPresenter/index.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { ContentBufferHandler } from './streaming/contentBufferHandler'
2323
import { LLMEventHandler } from './streaming/llmEventHandler'
2424
import { StreamGenerationHandler } from './streaming/streamGenerationHandler'
2525
import type { GeneratingMessageState } from './streaming/types'
26+
import { StreamUpdateScheduler } from './streaming/streamUpdateScheduler'
2627
import { ToolCallHandler } from './loop/toolCallHandler'
2728
import { PermissionHandler } from './permission/permissionHandler'
2829
import { UtilityHandler } from './utility/utilityHandler'
@@ -57,6 +58,7 @@ export class AgentPresenter implements IAgentPresenter {
5758
private streamGenerationHandler: StreamGenerationHandler
5859
private permissionHandler: PermissionHandler
5960
private utilityHandler: UtilityHandler
61+
private streamUpdateScheduler: StreamUpdateScheduler
6062

6163
constructor(options: AgentPresenterDependencies) {
6264
this.sessionPresenter = options.sessionPresenter
@@ -69,6 +71,10 @@ export class AgentPresenter implements IAgentPresenter {
6971
this.messageManager = options.messageManager ?? new MessageManager(options.sqlitePresenter)
7072
this.commandPermissionService = options.commandPermissionService
7173

74+
this.streamUpdateScheduler = new StreamUpdateScheduler({
75+
messageManager: this.messageManager
76+
})
77+
7278
const handlerContext: ThreadHandlerContext = {
7379
sqlitePresenter: this.sqlitePresenter,
7480
messageManager: this.messageManager,
@@ -79,14 +85,14 @@ export class AgentPresenter implements IAgentPresenter {
7985

8086
this.contentBufferHandler = new ContentBufferHandler({
8187
generatingMessages: this.generatingMessages,
82-
messageManager: this.messageManager
88+
streamUpdateScheduler: this.streamUpdateScheduler
8389
})
8490

8591
this.toolCallHandler = new ToolCallHandler({
86-
messageManager: this.messageManager,
8792
sqlitePresenter: this.sqlitePresenter,
8893
searchingMessages: this.searchingMessages,
89-
commandPermissionHandler: this.commandPermissionService
94+
commandPermissionHandler: this.commandPermissionService,
95+
streamUpdateScheduler: this.streamUpdateScheduler
9096
})
9197

9298
this.llmEventHandler = new LLMEventHandler({
@@ -95,6 +101,7 @@ export class AgentPresenter implements IAgentPresenter {
95101
messageManager: this.messageManager,
96102
contentBufferHandler: this.contentBufferHandler,
97103
toolCallHandler: this.toolCallHandler,
104+
streamUpdateScheduler: this.streamUpdateScheduler,
98105
onConversationUpdated: (state) => this.handleConversationUpdates(state)
99106
})
100107

@@ -139,7 +146,7 @@ export class AgentPresenter implements IAgentPresenter {
139146
async sendMessage(
140147
agentId: string,
141148
content: string,
142-
_tabId?: number,
149+
tabId?: number,
143150
selectedVariantsMap?: Record<string, string>
144151
): Promise<AssistantMessage | null> {
145152
await this.logResolvedIfEnabled(agentId)
@@ -159,7 +166,7 @@ export class AgentPresenter implements IAgentPresenter {
159166
userMessage.id
160167
)
161168

162-
this.trackGeneratingMessage(assistantMessage, agentId)
169+
this.trackGeneratingMessage(assistantMessage, agentId, tabId)
163170
await this.updateConversationAfterUserMessage(agentId)
164171
await this.sessionManager.startLoop(agentId, assistantMessage.id)
165172

@@ -311,7 +318,11 @@ export class AgentPresenter implements IAgentPresenter {
311318
}
312319
}
313320

314-
private trackGeneratingMessage(message: AssistantMessage, conversationId: string): void {
321+
private trackGeneratingMessage(
322+
message: AssistantMessage,
323+
conversationId: string,
324+
tabId?: number
325+
): void {
315326
this.generatingMessages.set(message.id, {
316327
message,
317328
conversationId,
@@ -320,7 +331,8 @@ export class AgentPresenter implements IAgentPresenter {
320331
promptTokens: 0,
321332
reasoningStartTime: null,
322333
reasoningEndTime: null,
323-
lastReasoningTime: null
334+
lastReasoningTime: null,
335+
tabId
324336
})
325337
}
326338

src/main/presenter/agentPresenter/loop/toolCallHandler.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import type {
88
MCPResourceContent
99
} from '@shared/presenter'
1010
import { nanoid } from 'nanoid'
11-
import type { MessageManager } from '../../sessionPresenter/managers/messageManager'
1211
import type { GeneratingMessageState } from '../streaming/types'
12+
import type { StreamUpdateScheduler } from '../streaming/streamUpdateScheduler'
1313
import type { CommandPermissionService } from '../../permission/commandPermissionService'
1414

1515
interface PermissionRequestPayload {
@@ -42,21 +42,21 @@ export class ToolCallHandler {
4242
'application/vnd.mcp-ui.remote-dom'
4343
])
4444

45-
private readonly messageManager: MessageManager
4645
private readonly sqlitePresenter: ISQLitePresenter
4746
private readonly searchingMessages: Set<string>
4847
private readonly commandPermissionHandler?: CommandPermissionService
48+
private readonly streamUpdateScheduler: StreamUpdateScheduler
4949

5050
constructor(options: {
51-
messageManager: MessageManager
5251
sqlitePresenter: ISQLitePresenter
5352
searchingMessages: Set<string>
5453
commandPermissionHandler?: CommandPermissionService
54+
streamUpdateScheduler: StreamUpdateScheduler
5555
}) {
56-
this.messageManager = options.messageManager
5756
this.sqlitePresenter = options.sqlitePresenter
5857
this.searchingMessages = options.searchingMessages
5958
this.commandPermissionHandler = options.commandPermissionHandler
59+
this.streamUpdateScheduler = options.streamUpdateScheduler
6060
}
6161

6262
async processToolCallStart(
@@ -288,7 +288,15 @@ export class ToolCallHandler {
288288

289289
this.finalizeLastBlock(state)
290290
state.message.content.push(...uiBlocks)
291-
await this.messageManager.editMessage(event.eventId, JSON.stringify(state.message.content))
291+
this.streamUpdateScheduler.enqueueDelta(
292+
event.eventId,
293+
state.conversationId,
294+
state.message.parentId,
295+
Boolean(state.message.is_variant),
296+
state.tabId,
297+
{},
298+
state.message.content
299+
)
292300
return true
293301
} catch (error) {
294302
console.error('[ToolCallHandler] Error processing MCP UI resources from tool call:', error)
@@ -406,7 +414,15 @@ export class ToolCallHandler {
406414
)
407415
}
408416

409-
await this.messageManager.editMessage(event.eventId, JSON.stringify(state.message.content))
417+
this.streamUpdateScheduler.enqueueDelta(
418+
event.eventId,
419+
state.conversationId,
420+
state.message.parentId,
421+
Boolean(state.message.is_variant),
422+
state.tabId,
423+
{},
424+
state.message.content
425+
)
410426
return true
411427
} catch (error) {
412428
console.error('[ToolCallHandler] Error processing search results from tool call:', error)

src/main/presenter/agentPresenter/streaming/contentBufferHandler.ts

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
import { eventBus, SendTarget } from '@/eventbus'
2-
import { STREAM_EVENTS } from '@/events'
31
import { finalizeAssistantMessageBlocks } from '@shared/chat/messageBlocks'
4-
import type { MessageManager } from '../../sessionPresenter/managers/messageManager'
52
import type { GeneratingMessageState } from './types'
3+
import type { StreamUpdateScheduler } from './streamUpdateScheduler'
64

75
export class ContentBufferHandler {
86
private readonly generatingMessages: Map<string, GeneratingMessageState>
9-
private readonly messageManager: MessageManager
7+
private readonly streamUpdateScheduler: StreamUpdateScheduler
108

119
constructor(options: {
1210
generatingMessages: Map<string, GeneratingMessageState>
13-
messageManager: MessageManager
11+
streamUpdateScheduler: StreamUpdateScheduler
1412
}) {
1513
this.generatingMessages = options.generatingMessages
16-
this.messageManager = options.messageManager
14+
this.streamUpdateScheduler = options.streamUpdateScheduler
1715
}
1816

1917
async flushAdaptiveBuffer(eventId: string): Promise<void> {
@@ -106,20 +104,15 @@ export class ContentBufferHandler {
106104
const batchContent = batch.join('')
107105
contentBlock.content += batchContent
108106

109-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
110-
111-
const eventData: any = {
107+
this.streamUpdateScheduler.enqueueDelta(
112108
eventId,
113-
content: batchContent,
114-
chunkInfo: {
115-
current: batchEnd,
116-
total: totalChunks,
117-
isLargeContent: true,
118-
batchSize: batch.length
119-
}
120-
}
121-
122-
eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, eventData)
109+
state.conversationId,
110+
state.message.parentId,
111+
Boolean(state.message.is_variant),
112+
state.tabId,
113+
{ content: batchContent },
114+
state.message.content
115+
)
123116

124117
if (batchEnd < chunks.length) {
125118
await new Promise((resolve) => setImmediate(resolve))
@@ -154,7 +147,15 @@ export class ContentBufferHandler {
154147
})
155148
}
156149

157-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
150+
this.streamUpdateScheduler.enqueueDelta(
151+
eventId,
152+
state.conversationId,
153+
state.message.parentId,
154+
Boolean(state.message.is_variant),
155+
state.tabId,
156+
{ content },
157+
state.message.content
158+
)
158159
}
159160

160161
splitLargeContent(content: string): string[] {

src/main/presenter/agentPresenter/streaming/llmEventHandler.ts

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { MessageManager } from '../../sessionPresenter/managers/messageMana
99
import type { GeneratingMessageState } from './types'
1010
import type { ContentBufferHandler } from './contentBufferHandler'
1111
import type { ToolCallHandler } from '../loop/toolCallHandler'
12+
import type { StreamUpdateScheduler } from './streamUpdateScheduler'
1213

1314
type ConversationUpdateHandler = (state: GeneratingMessageState) => Promise<void>
1415

@@ -18,6 +19,7 @@ export class LLMEventHandler {
1819
private readonly messageManager: MessageManager
1920
private readonly contentBufferHandler: ContentBufferHandler
2021
private readonly toolCallHandler: ToolCallHandler
22+
private readonly streamUpdateScheduler: StreamUpdateScheduler
2123
private readonly onConversationUpdated?: ConversationUpdateHandler
2224

2325
constructor(options: {
@@ -26,13 +28,15 @@ export class LLMEventHandler {
2628
messageManager: MessageManager
2729
contentBufferHandler: ContentBufferHandler
2830
toolCallHandler: ToolCallHandler
31+
streamUpdateScheduler: StreamUpdateScheduler
2932
onConversationUpdated?: ConversationUpdateHandler
3033
}) {
3134
this.generatingMessages = options.generatingMessages
3235
this.searchingMessages = options.searchingMessages
3336
this.messageManager = options.messageManager
3437
this.contentBufferHandler = options.contentBufferHandler
3538
this.toolCallHandler = options.toolCallHandler
39+
this.streamUpdateScheduler = options.streamUpdateScheduler
3640
this.onConversationUpdated = options.onConversationUpdated
3741
}
3842

@@ -90,7 +94,15 @@ export class LLMEventHandler {
9094
},
9195
extra: { needContinue: true }
9296
})
93-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
97+
this.streamUpdateScheduler.enqueueDelta(
98+
eventId,
99+
state.conversationId,
100+
state.message.parentId,
101+
Boolean(state.message.is_variant),
102+
state.tabId,
103+
{},
104+
state.message.content
105+
)
94106
return
95107
}
96108

@@ -233,8 +245,33 @@ export class LLMEventHandler {
233245
}
234246
}
235247

236-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
237-
eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, msg)
248+
const delta: Partial<LLMAgentEventData> = {}
249+
if (content) delta.content = content
250+
if (reasoning_content) delta.reasoning_content = reasoning_content
251+
if (image_data) delta.image_data = image_data
252+
if (totalUsage) delta.totalUsage = totalUsage
253+
254+
if (tool_call) {
255+
delta.tool_call = tool_call
256+
delta.tool_call_id = tool_call_id
257+
delta.tool_call_name = tool_call_name
258+
delta.tool_call_params = tool_call_params
259+
delta.tool_call_response = msg.tool_call_response
260+
delta.tool_call_server_name = tool_call_server_name
261+
delta.tool_call_server_icons = tool_call_server_icons
262+
delta.tool_call_server_description = tool_call_server_description
263+
delta.tool_call_response_raw = tool_call_response_raw
264+
}
265+
266+
this.streamUpdateScheduler.enqueueDelta(
267+
eventId,
268+
state.conversationId,
269+
state.message.parentId,
270+
Boolean(state.message.is_variant),
271+
state.tabId,
272+
delta,
273+
state.message.content
274+
)
238275
}
239276

240277
async handleLLMAgentError(msg: LLMAgentEventData): Promise<void> {
@@ -254,6 +291,7 @@ export class LLMEventHandler {
254291
presenter.sessionManager.clearPendingPermission(state.conversationId)
255292
}
256293

294+
await this.streamUpdateScheduler.flushAll(eventId, 'final')
257295
this.searchingMessages.delete(eventId)
258296
eventBus.sendToRenderer(STREAM_EVENTS.ERROR, SendTarget.ALL_WINDOWS, msg)
259297
}
@@ -287,7 +325,15 @@ export class LLMEventHandler {
287325
}
288326
}
289327
})
290-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
328+
this.streamUpdateScheduler.enqueueDelta(
329+
eventId,
330+
state.conversationId,
331+
state.message.parentId,
332+
Boolean(state.message.is_variant),
333+
state.tabId,
334+
{},
335+
state.message.content
336+
)
291337
this.searchingMessages.delete(eventId)
292338
presenter.sessionManager.setStatus(state.conversationId, 'waiting_permission')
293339
return
@@ -298,6 +344,7 @@ export class LLMEventHandler {
298344
presenter.sessionManager.clearPendingPermission(state.conversationId)
299345
}
300346

347+
await this.streamUpdateScheduler.flushAll(eventId, 'final')
301348
this.searchingMessages.delete(eventId)
302349
eventBus.sendToRenderer(STREAM_EVENTS.END, SendTarget.ALL_WINDOWS, msg)
303350
}

0 commit comments

Comments
 (0)