Skip to content

Commit d4c6a49

Browse files
authored
fix(streaming): reduce stream events with batching scheduler (#1271)
Add StreamUpdateScheduler to batch and debounce streaming updates, reducing event frequency by batching updates over 200ms intervals for rendering and 600ms for database writes. Introduces stream_kind to distinguish init/delta/final events and filters events by conversation ID for performance.
1 parent 8fd50e4 commit d4c6a49

File tree

10 files changed

+853
-276
lines changed

10 files changed

+853
-276
lines changed

src/main/presenter/agentPresenter/index.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { ContentBufferHandler } from './streaming/contentBufferHandler'
2323
import { LLMEventHandler } from './streaming/llmEventHandler'
2424
import { StreamGenerationHandler } from './streaming/streamGenerationHandler'
2525
import type { GeneratingMessageState } from './streaming/types'
26+
import { StreamUpdateScheduler } from './streaming/streamUpdateScheduler'
2627
import { ToolCallHandler } from './loop/toolCallHandler'
2728
import { PermissionHandler } from './permission/permissionHandler'
2829
import { UtilityHandler } from './utility/utilityHandler'
@@ -57,6 +58,7 @@ export class AgentPresenter implements IAgentPresenter {
5758
private streamGenerationHandler: StreamGenerationHandler
5859
private permissionHandler: PermissionHandler
5960
private utilityHandler: UtilityHandler
61+
private streamUpdateScheduler: StreamUpdateScheduler
6062

6163
constructor(options: AgentPresenterDependencies) {
6264
this.sessionPresenter = options.sessionPresenter
@@ -69,6 +71,10 @@ export class AgentPresenter implements IAgentPresenter {
6971
this.messageManager = options.messageManager ?? new MessageManager(options.sqlitePresenter)
7072
this.commandPermissionService = options.commandPermissionService
7173

74+
this.streamUpdateScheduler = new StreamUpdateScheduler({
75+
messageManager: this.messageManager
76+
})
77+
7278
const handlerContext: ThreadHandlerContext = {
7379
sqlitePresenter: this.sqlitePresenter,
7480
messageManager: this.messageManager,
@@ -79,14 +85,14 @@ export class AgentPresenter implements IAgentPresenter {
7985

8086
this.contentBufferHandler = new ContentBufferHandler({
8187
generatingMessages: this.generatingMessages,
82-
messageManager: this.messageManager
88+
streamUpdateScheduler: this.streamUpdateScheduler
8389
})
8490

8591
this.toolCallHandler = new ToolCallHandler({
86-
messageManager: this.messageManager,
8792
sqlitePresenter: this.sqlitePresenter,
8893
searchingMessages: this.searchingMessages,
89-
commandPermissionHandler: this.commandPermissionService
94+
commandPermissionHandler: this.commandPermissionService,
95+
streamUpdateScheduler: this.streamUpdateScheduler
9096
})
9197

9298
this.llmEventHandler = new LLMEventHandler({
@@ -95,6 +101,7 @@ export class AgentPresenter implements IAgentPresenter {
95101
messageManager: this.messageManager,
96102
contentBufferHandler: this.contentBufferHandler,
97103
toolCallHandler: this.toolCallHandler,
104+
streamUpdateScheduler: this.streamUpdateScheduler,
98105
onConversationUpdated: (state) => this.handleConversationUpdates(state)
99106
})
100107

@@ -139,7 +146,7 @@ export class AgentPresenter implements IAgentPresenter {
139146
async sendMessage(
140147
agentId: string,
141148
content: string,
142-
_tabId?: number,
149+
tabId?: number,
143150
selectedVariantsMap?: Record<string, string>
144151
): Promise<AssistantMessage | null> {
145152
await this.logResolvedIfEnabled(agentId)
@@ -159,7 +166,7 @@ export class AgentPresenter implements IAgentPresenter {
159166
userMessage.id
160167
)
161168

162-
this.trackGeneratingMessage(assistantMessage, agentId)
169+
this.trackGeneratingMessage(assistantMessage, agentId, tabId)
163170
await this.updateConversationAfterUserMessage(agentId)
164171
await this.sessionManager.startLoop(agentId, assistantMessage.id)
165172

@@ -311,7 +318,11 @@ export class AgentPresenter implements IAgentPresenter {
311318
}
312319
}
313320

314-
private trackGeneratingMessage(message: AssistantMessage, conversationId: string): void {
321+
private trackGeneratingMessage(
322+
message: AssistantMessage,
323+
conversationId: string,
324+
tabId?: number
325+
): void {
315326
this.generatingMessages.set(message.id, {
316327
message,
317328
conversationId,
@@ -320,7 +331,8 @@ export class AgentPresenter implements IAgentPresenter {
320331
promptTokens: 0,
321332
reasoningStartTime: null,
322333
reasoningEndTime: null,
323-
lastReasoningTime: null
334+
lastReasoningTime: null,
335+
tabId
324336
})
325337
}
326338

src/main/presenter/agentPresenter/loop/toolCallHandler.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import type {
88
MCPResourceContent
99
} from '@shared/presenter'
1010
import { nanoid } from 'nanoid'
11-
import type { MessageManager } from '../../sessionPresenter/managers/messageManager'
1211
import type { GeneratingMessageState } from '../streaming/types'
12+
import type { StreamUpdateScheduler } from '../streaming/streamUpdateScheduler'
1313
import type { CommandPermissionService } from '../../permission/commandPermissionService'
1414

1515
interface PermissionRequestPayload {
@@ -42,21 +42,21 @@ export class ToolCallHandler {
4242
'application/vnd.mcp-ui.remote-dom'
4343
])
4444

45-
private readonly messageManager: MessageManager
4645
private readonly sqlitePresenter: ISQLitePresenter
4746
private readonly searchingMessages: Set<string>
4847
private readonly commandPermissionHandler?: CommandPermissionService
48+
private readonly streamUpdateScheduler: StreamUpdateScheduler
4949

5050
constructor(options: {
51-
messageManager: MessageManager
5251
sqlitePresenter: ISQLitePresenter
5352
searchingMessages: Set<string>
5453
commandPermissionHandler?: CommandPermissionService
54+
streamUpdateScheduler: StreamUpdateScheduler
5555
}) {
56-
this.messageManager = options.messageManager
5756
this.sqlitePresenter = options.sqlitePresenter
5857
this.searchingMessages = options.searchingMessages
5958
this.commandPermissionHandler = options.commandPermissionHandler
59+
this.streamUpdateScheduler = options.streamUpdateScheduler
6060
}
6161

6262
async processToolCallStart(
@@ -288,7 +288,15 @@ export class ToolCallHandler {
288288

289289
this.finalizeLastBlock(state)
290290
state.message.content.push(...uiBlocks)
291-
await this.messageManager.editMessage(event.eventId, JSON.stringify(state.message.content))
291+
this.streamUpdateScheduler.enqueueDelta(
292+
event.eventId,
293+
state.conversationId,
294+
state.message.parentId,
295+
Boolean(state.message.is_variant),
296+
state.tabId,
297+
{},
298+
state.message.content
299+
)
292300
return true
293301
} catch (error) {
294302
console.error('[ToolCallHandler] Error processing MCP UI resources from tool call:', error)
@@ -406,7 +414,15 @@ export class ToolCallHandler {
406414
)
407415
}
408416

409-
await this.messageManager.editMessage(event.eventId, JSON.stringify(state.message.content))
417+
this.streamUpdateScheduler.enqueueDelta(
418+
event.eventId,
419+
state.conversationId,
420+
state.message.parentId,
421+
Boolean(state.message.is_variant),
422+
state.tabId,
423+
{},
424+
state.message.content
425+
)
410426
return true
411427
} catch (error) {
412428
console.error('[ToolCallHandler] Error processing search results from tool call:', error)

src/main/presenter/agentPresenter/streaming/contentBufferHandler.ts

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
import { eventBus, SendTarget } from '@/eventbus'
2-
import { STREAM_EVENTS } from '@/events'
31
import { finalizeAssistantMessageBlocks } from '@shared/chat/messageBlocks'
4-
import type { MessageManager } from '../../sessionPresenter/managers/messageManager'
52
import type { GeneratingMessageState } from './types'
3+
import type { StreamUpdateScheduler } from './streamUpdateScheduler'
64

75
export class ContentBufferHandler {
86
private readonly generatingMessages: Map<string, GeneratingMessageState>
9-
private readonly messageManager: MessageManager
7+
private readonly streamUpdateScheduler: StreamUpdateScheduler
108

119
constructor(options: {
1210
generatingMessages: Map<string, GeneratingMessageState>
13-
messageManager: MessageManager
11+
streamUpdateScheduler: StreamUpdateScheduler
1412
}) {
1513
this.generatingMessages = options.generatingMessages
16-
this.messageManager = options.messageManager
14+
this.streamUpdateScheduler = options.streamUpdateScheduler
1715
}
1816

1917
async flushAdaptiveBuffer(eventId: string): Promise<void> {
@@ -106,20 +104,15 @@ export class ContentBufferHandler {
106104
const batchContent = batch.join('')
107105
contentBlock.content += batchContent
108106

109-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
110-
111-
const eventData: any = {
107+
this.streamUpdateScheduler.enqueueDelta(
112108
eventId,
113-
content: batchContent,
114-
chunkInfo: {
115-
current: batchEnd,
116-
total: totalChunks,
117-
isLargeContent: true,
118-
batchSize: batch.length
119-
}
120-
}
121-
122-
eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, eventData)
109+
state.conversationId,
110+
state.message.parentId,
111+
Boolean(state.message.is_variant),
112+
state.tabId,
113+
{ content: batchContent },
114+
state.message.content
115+
)
123116

124117
if (batchEnd < chunks.length) {
125118
await new Promise((resolve) => setImmediate(resolve))
@@ -154,7 +147,15 @@ export class ContentBufferHandler {
154147
})
155148
}
156149

157-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
150+
this.streamUpdateScheduler.enqueueDelta(
151+
eventId,
152+
state.conversationId,
153+
state.message.parentId,
154+
Boolean(state.message.is_variant),
155+
state.tabId,
156+
{ content },
157+
state.message.content
158+
)
158159
}
159160

160161
splitLargeContent(content: string): string[] {

src/main/presenter/agentPresenter/streaming/llmEventHandler.ts

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { MessageManager } from '../../sessionPresenter/managers/messageMana
99
import type { GeneratingMessageState } from './types'
1010
import type { ContentBufferHandler } from './contentBufferHandler'
1111
import type { ToolCallHandler } from '../loop/toolCallHandler'
12+
import type { StreamUpdateScheduler } from './streamUpdateScheduler'
1213

1314
type ConversationUpdateHandler = (state: GeneratingMessageState) => Promise<void>
1415

@@ -18,6 +19,7 @@ export class LLMEventHandler {
1819
private readonly messageManager: MessageManager
1920
private readonly contentBufferHandler: ContentBufferHandler
2021
private readonly toolCallHandler: ToolCallHandler
22+
private readonly streamUpdateScheduler: StreamUpdateScheduler
2123
private readonly onConversationUpdated?: ConversationUpdateHandler
2224

2325
constructor(options: {
@@ -26,13 +28,15 @@ export class LLMEventHandler {
2628
messageManager: MessageManager
2729
contentBufferHandler: ContentBufferHandler
2830
toolCallHandler: ToolCallHandler
31+
streamUpdateScheduler: StreamUpdateScheduler
2932
onConversationUpdated?: ConversationUpdateHandler
3033
}) {
3134
this.generatingMessages = options.generatingMessages
3235
this.searchingMessages = options.searchingMessages
3336
this.messageManager = options.messageManager
3437
this.contentBufferHandler = options.contentBufferHandler
3538
this.toolCallHandler = options.toolCallHandler
39+
this.streamUpdateScheduler = options.streamUpdateScheduler
3640
this.onConversationUpdated = options.onConversationUpdated
3741
}
3842

@@ -90,7 +94,15 @@ export class LLMEventHandler {
9094
},
9195
extra: { needContinue: true }
9296
})
93-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
97+
this.streamUpdateScheduler.enqueueDelta(
98+
eventId,
99+
state.conversationId,
100+
state.message.parentId,
101+
Boolean(state.message.is_variant),
102+
state.tabId,
103+
{},
104+
state.message.content
105+
)
94106
return
95107
}
96108

@@ -233,8 +245,33 @@ export class LLMEventHandler {
233245
}
234246
}
235247

236-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
237-
eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, msg)
248+
const delta: Partial<LLMAgentEventData> = {}
249+
if (content) delta.content = content
250+
if (reasoning_content) delta.reasoning_content = reasoning_content
251+
if (image_data) delta.image_data = image_data
252+
if (totalUsage) delta.totalUsage = totalUsage
253+
254+
if (tool_call) {
255+
delta.tool_call = tool_call
256+
delta.tool_call_id = tool_call_id
257+
delta.tool_call_name = tool_call_name
258+
delta.tool_call_params = tool_call_params
259+
delta.tool_call_response = msg.tool_call_response
260+
delta.tool_call_server_name = tool_call_server_name
261+
delta.tool_call_server_icons = tool_call_server_icons
262+
delta.tool_call_server_description = tool_call_server_description
263+
delta.tool_call_response_raw = tool_call_response_raw
264+
}
265+
266+
this.streamUpdateScheduler.enqueueDelta(
267+
eventId,
268+
state.conversationId,
269+
state.message.parentId,
270+
Boolean(state.message.is_variant),
271+
state.tabId,
272+
delta,
273+
state.message.content
274+
)
238275
}
239276

240277
async handleLLMAgentError(msg: LLMAgentEventData): Promise<void> {
@@ -254,6 +291,7 @@ export class LLMEventHandler {
254291
presenter.sessionManager.clearPendingPermission(state.conversationId)
255292
}
256293

294+
await this.streamUpdateScheduler.flushAll(eventId, 'final')
257295
this.searchingMessages.delete(eventId)
258296
eventBus.sendToRenderer(STREAM_EVENTS.ERROR, SendTarget.ALL_WINDOWS, msg)
259297
}
@@ -287,7 +325,15 @@ export class LLMEventHandler {
287325
}
288326
}
289327
})
290-
await this.messageManager.editMessage(eventId, JSON.stringify(state.message.content))
328+
this.streamUpdateScheduler.enqueueDelta(
329+
eventId,
330+
state.conversationId,
331+
state.message.parentId,
332+
Boolean(state.message.is_variant),
333+
state.tabId,
334+
{},
335+
state.message.content
336+
)
291337
this.searchingMessages.delete(eventId)
292338
presenter.sessionManager.setStatus(state.conversationId, 'waiting_permission')
293339
return
@@ -298,6 +344,7 @@ export class LLMEventHandler {
298344
presenter.sessionManager.clearPendingPermission(state.conversationId)
299345
}
300346

347+
await this.streamUpdateScheduler.flushAll(eventId, 'final')
301348
this.searchingMessages.delete(eventId)
302349
eventBus.sendToRenderer(STREAM_EVENTS.END, SendTarget.ALL_WINDOWS, msg)
303350
}

0 commit comments

Comments
 (0)