Skip to content

Commit ec97958

Browse files
authored
Fix stdin stream queued messages and command output streaming (#11814)
Fix stdin stream queue handling and command output streaming
1 parent b75f484 commit ec97958

File tree

6 files changed

+389
-9
lines changed

6 files changed

+389
-9
lines changed

apps/cli/src/agent/__tests__/json-event-emitter-streaming.test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,132 @@ describe("JsonEventEmitter streaming deltas", () => {
215215
expect(output[0]).toMatchObject({ content: "gh" })
216216
expect(output[1]).toMatchObject({ content: " pr" })
217217
})
218+
219+
it("streams say:command_output as deltas and correlates tool_result id to execute_command", () => {
220+
const { stdout, lines } = createMockStdout()
221+
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
222+
const commandId = 404
223+
const outputTs = 405
224+
225+
emitMessage(
226+
emitter,
227+
createAskMessage({
228+
ts: commandId,
229+
ask: "command",
230+
partial: false,
231+
text: "echo hello",
232+
}),
233+
)
234+
235+
emitMessage(emitter, {
236+
ts: outputTs,
237+
type: "say",
238+
say: "command_output",
239+
partial: true,
240+
text: "line1\n",
241+
} as ClineMessage)
242+
emitMessage(emitter, {
243+
ts: outputTs,
244+
type: "say",
245+
say: "command_output",
246+
partial: true,
247+
text: "line1\nline2\n",
248+
} as ClineMessage)
249+
emitMessage(emitter, {
250+
ts: outputTs,
251+
type: "say",
252+
say: "command_output",
253+
partial: false,
254+
text: "line1\nline2\n",
255+
} as ClineMessage)
256+
257+
const output = lines()
258+
expect(output).toHaveLength(4)
259+
expect(output[0]).toMatchObject({
260+
type: "tool_use",
261+
id: commandId,
262+
subtype: "command",
263+
tool_use: { name: "execute_command", input: { command: "echo hello" } },
264+
done: true,
265+
})
266+
expect(output[1]).toMatchObject({
267+
type: "tool_result",
268+
id: commandId,
269+
subtype: "command",
270+
tool_result: { name: "execute_command", output: "line1\n" },
271+
})
272+
expect(output[2]).toMatchObject({
273+
type: "tool_result",
274+
id: commandId,
275+
subtype: "command",
276+
tool_result: { name: "execute_command", output: "line2\n" },
277+
})
278+
expect(output[3]).toMatchObject({
279+
type: "tool_result",
280+
id: commandId,
281+
subtype: "command",
282+
tool_result: { name: "execute_command" },
283+
done: true,
284+
})
285+
expect(output[3]).not.toHaveProperty("tool_result.output")
286+
})
287+
288+
it("prefers status-driven command output streaming and suppresses duplicate say completion", () => {
289+
const { stdout, lines } = createMockStdout()
290+
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
291+
const commandId = 505
292+
293+
emitMessage(
294+
emitter,
295+
createAskMessage({
296+
ts: commandId,
297+
ask: "command",
298+
partial: false,
299+
text: "echo streamed",
300+
}),
301+
)
302+
303+
emitter.emitCommandOutputChunk("line1\n")
304+
emitter.emitCommandOutputChunk("line1\nline2\n")
305+
emitter.emitCommandOutputDone()
306+
307+
// This completion say is expected from the extension, but should be suppressed
308+
// because we already streamed and completed via commandExecutionStatus.
309+
emitMessage(emitter, {
310+
ts: 999,
311+
type: "say",
312+
say: "command_output",
313+
partial: false,
314+
text: "line1\nline2\n",
315+
} as ClineMessage)
316+
317+
const output = lines()
318+
expect(output).toHaveLength(4)
319+
expect(output[0]).toMatchObject({
320+
type: "tool_use",
321+
id: commandId,
322+
subtype: "command",
323+
tool_use: { name: "execute_command", input: { command: "echo streamed" } },
324+
done: true,
325+
})
326+
expect(output[1]).toMatchObject({
327+
type: "tool_result",
328+
id: commandId,
329+
subtype: "command",
330+
tool_result: { name: "execute_command", output: "line1\n" },
331+
})
332+
expect(output[2]).toMatchObject({
333+
type: "tool_result",
334+
id: commandId,
335+
subtype: "command",
336+
tool_result: { name: "execute_command", output: "line2\n" },
337+
})
338+
expect(output[3]).toMatchObject({
339+
type: "tool_result",
340+
id: commandId,
341+
subtype: "command",
342+
tool_result: { name: "execute_command" },
343+
done: true,
344+
})
345+
})
218346
})

apps/cli/src/agent/json-event-emitter.ts

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ export class JsonEventEmitter {
107107
private previousContent = new Map<number, string>()
108108
// Track previous tool-use content for structured (non-append-only) delta computation.
109109
private previousToolUseContent = new Map<number, string>()
110+
// Track the currently active execute_command tool_use id for command_output correlation.
111+
private activeCommandToolUseId: number | undefined
112+
// Track command output snapshots by command tool-use id for delta computation.
113+
private previousCommandOutputByToolUseId = new Map<number, string>()
114+
// Track command ids whose output is being streamed from commandExecutionStatus updates.
115+
private statusDrivenCommandOutputIds = new Set<number>()
116+
// Track command ids that already emitted a terminal command_output done event.
117+
private completedCommandOutputIds = new Set<number>()
118+
// Suppress the next say:command_output completion message after status-driven streaming.
119+
private suppressNextCommandOutputSay = false
110120
// Track the completion result content
111121
private completionResultContent: string | undefined
112122
// Track the latest assistant text as a fallback for result.content.
@@ -288,6 +298,90 @@ export class JsonEventEmitter {
288298
return this.mode === "stream-json" && content === null
289299
}
290300

301+
private computeCommandOutputDelta(commandId: number, fullOutput: string | undefined): string | null {
302+
const normalized = fullOutput ?? ""
303+
const previous = this.previousCommandOutputByToolUseId.get(commandId) || ""
304+
305+
if (normalized === previous) {
306+
return null
307+
}
308+
309+
this.previousCommandOutputByToolUseId.set(commandId, normalized)
310+
return normalized.startsWith(previous) ? normalized.slice(previous.length) : normalized
311+
}
312+
313+
private emitCommandOutputEvent(commandId: number, fullOutput: string | undefined, isDone: boolean): void {
314+
if (this.mode === "stream-json") {
315+
const outputDelta = this.computeCommandOutputDelta(commandId, fullOutput)
316+
const event: JsonEvent = {
317+
type: "tool_result",
318+
id: commandId,
319+
subtype: "command",
320+
tool_result: { name: "execute_command" },
321+
}
322+
323+
if (outputDelta !== null && outputDelta.length > 0) {
324+
event.tool_result = { name: "execute_command", output: outputDelta }
325+
}
326+
327+
if (isDone) {
328+
event.done = true
329+
this.previousCommandOutputByToolUseId.delete(commandId)
330+
this.statusDrivenCommandOutputIds.delete(commandId)
331+
this.completedCommandOutputIds.add(commandId)
332+
if (this.activeCommandToolUseId === commandId) {
333+
this.activeCommandToolUseId = undefined
334+
}
335+
}
336+
337+
// Suppress empty partial updates that carry no delta.
338+
if (!isDone && outputDelta === null) {
339+
return
340+
}
341+
342+
this.emitEvent(event)
343+
return
344+
}
345+
346+
this.emitEvent({
347+
type: "tool_result",
348+
id: commandId,
349+
subtype: "command",
350+
tool_result: { name: "execute_command", output: fullOutput },
351+
...(isDone ? { done: true } : {}),
352+
})
353+
354+
if (isDone) {
355+
this.previousCommandOutputByToolUseId.delete(commandId)
356+
this.statusDrivenCommandOutputIds.delete(commandId)
357+
this.completedCommandOutputIds.add(commandId)
358+
if (this.activeCommandToolUseId === commandId) {
359+
this.activeCommandToolUseId = undefined
360+
}
361+
}
362+
}
363+
364+
public emitCommandOutputChunk(outputSnapshot: string): void {
365+
const commandId = this.activeCommandToolUseId
366+
if (commandId === undefined) {
367+
return
368+
}
369+
370+
this.statusDrivenCommandOutputIds.add(commandId)
371+
this.emitCommandOutputEvent(commandId, outputSnapshot, false)
372+
}
373+
374+
public emitCommandOutputDone(): void {
375+
const commandId = this.activeCommandToolUseId
376+
if (commandId === undefined) {
377+
return
378+
}
379+
380+
this.statusDrivenCommandOutputIds.add(commandId)
381+
this.suppressNextCommandOutputSay = true
382+
this.emitCommandOutputEvent(commandId, undefined, true)
383+
}
384+
291385
/**
292386
* Get content to send for a message (delta for streaming, full for json mode).
293387
*/
@@ -392,10 +486,7 @@ export class JsonEventEmitter {
392486
break
393487

394488
case "command_output":
395-
this.emitEvent({
396-
type: "tool_result",
397-
tool_result: { name: "execute_command", output: msg.text },
398-
})
489+
this.handleCommandOutputMessage(msg, isDone)
399490
break
400491

401492
case "user_feedback":
@@ -517,6 +608,10 @@ export class JsonEventEmitter {
517608
const toolInfo = parseToolInfo(msg.text)
518609

519610
if (subtype === "command") {
611+
this.activeCommandToolUseId = msg.ts
612+
this.completedCommandOutputIds.delete(msg.ts)
613+
this.suppressNextCommandOutputSay = false
614+
520615
if (isStreamingPartial) {
521616
const commandDelta = this.computeStructuredDelta(msg.ts, msg.text)
522617
if (commandDelta === null) {
@@ -595,6 +690,21 @@ export class JsonEventEmitter {
595690
})
596691
}
597692

693+
private handleCommandOutputMessage(msg: ClineMessage, isDone: boolean): void {
694+
if (this.suppressNextCommandOutputSay) {
695+
if (isDone) {
696+
this.suppressNextCommandOutputSay = false
697+
}
698+
return
699+
}
700+
701+
const commandId = this.activeCommandToolUseId ?? msg.ts
702+
if (this.statusDrivenCommandOutputIds.has(commandId) || this.completedCommandOutputIds.has(commandId)) {
703+
return
704+
}
705+
this.emitCommandOutputEvent(commandId, msg.text, isDone)
706+
}
707+
598708
/**
599709
* Handle task completion and emit result event.
600710
*/
@@ -711,6 +821,11 @@ export class JsonEventEmitter {
711821
this.seenMessageIds.clear()
712822
this.previousContent.clear()
713823
this.previousToolUseContent.clear()
824+
this.activeCommandToolUseId = undefined
825+
this.previousCommandOutputByToolUseId.clear()
826+
this.statusDrivenCommandOutputIds.clear()
827+
this.completedCommandOutputIds.clear()
828+
this.suppressNextCommandOutputSay = false
714829
this.completionResultContent = undefined
715830
this.lastAssistantText = undefined
716831
this.expectPromptEchoAsUser = true

apps/cli/src/commands/cli/stdin-stream.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,12 +349,46 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
349349

350350
const onExtensionMessage = (message: {
351351
type?: string
352+
text?: unknown
352353
state?: {
353354
currentTaskId?: unknown
354355
currentTaskItem?: { id?: unknown }
355356
messageQueue?: unknown
356357
}
357358
}) => {
359+
if (message.type === "commandExecutionStatus") {
360+
if (typeof message.text !== "string") {
361+
return
362+
}
363+
364+
let parsedStatus: unknown
365+
try {
366+
parsedStatus = JSON.parse(message.text)
367+
} catch {
368+
return
369+
}
370+
371+
if (!isRecord(parsedStatus) || typeof parsedStatus.status !== "string") {
372+
return
373+
}
374+
375+
if (parsedStatus.status === "output" && typeof parsedStatus.output === "string") {
376+
jsonEmitter.emitCommandOutputChunk(parsedStatus.output)
377+
return
378+
}
379+
380+
if (
381+
parsedStatus.status === "exited" ||
382+
parsedStatus.status === "timeout" ||
383+
parsedStatus.status === "fallback"
384+
) {
385+
jsonEmitter.emitCommandOutputDone()
386+
return
387+
}
388+
389+
return
390+
}
391+
358392
if (message.type !== "state") {
359393
return
360394
}
@@ -463,7 +497,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
463497
}
464498

465499
switch (stdinCommand.command) {
466-
case "start":
500+
case "start": {
467501
// A task can emit completion events before runTask() finalizers run.
468502
// Wait for full settlement to avoid false "task_busy" on immediate next start.
469503
// Safe from races: `for await` processes stdin commands serially, so no
@@ -503,8 +537,16 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
503537
success: true,
504538
})
505539

540+
// In CLI stdin-stream mode, default to the execa terminal provider so
541+
// command output can be streamed deterministically. Explicit per-request
542+
// config still wins.
543+
const taskConfiguration = {
544+
terminalShellIntegrationDisabled: true,
545+
...(stdinCommand.configuration ?? {}),
546+
}
547+
506548
activeTaskPromise = host
507-
.runTask(stdinCommand.prompt, latestTaskId, stdinCommand.configuration)
549+
.runTask(stdinCommand.prompt, latestTaskId, taskConfiguration)
508550
.catch((error) => {
509551
const message = error instanceof Error ? error.message : String(error)
510552

@@ -559,6 +601,7 @@ export async function runStdinStreamMode({ host, jsonEmitter, setStreamRequestId
559601
})
560602

561603
break
604+
}
562605

563606
case "message": {
564607
// If cancel was requested, wait briefly for the task to be rehydrated

0 commit comments

Comments
 (0)