Skip to content

Commit bbab16a

Browse files
committed
fix: add stream termination and stopWhen condition support
- Fix buildMessageStreamCore to properly terminate on completion events - Add stopWhen condition checking to tool execution loop in ModelResult - Ensure toolResults are stored and yielded correctly in getNewMessagesStream This fixes CI test failures where: 1. Tests would timeout waiting for streams to complete 2. stopWhen conditions weren't being respected during tool execution 3. Tool execution results weren't being properly tracked Resolves issue where getNewMessagesStream() wasn't yielding function call outputs after tool execution.
1 parent f7e2849 commit bbab16a

File tree

2 files changed

+47
-29
lines changed

2 files changed

+47
-29
lines changed

src/lib/model-result.ts

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
import { executeTool } from './tool-executor.js';
3434
import { executeNextTurnParamsFunctions, applyNextTurnParamsToRequest } from './next-turn-params.js';
3535
import { hasExecuteFunction } from './tool-types.js';
36+
import { isStopConditionMet } from './stop-conditions.js';
3637

3738
/**
3839
* Type guard for stream event with toReadableStream method
@@ -124,6 +125,7 @@ export class ModelResult {
124125
round: number;
125126
toolCalls: ParsedToolCall[];
126127
response: models.OpenResponsesNonStreamingResponse;
128+
toolResults: Array<models.OpenResponsesFunctionCallOutput>;
127129
}> = [];
128130
// Track resolved request after async function resolution
129131
private resolvedRequest: models.OpenResponsesRequest | null = null;
@@ -265,6 +267,34 @@ export class ModelResult {
265267
let currentRound = 0;
266268

267269
while (true) {
270+
// Check stopWhen conditions
271+
if (this.options.request.stopWhen) {
272+
const stopConditions = Array.isArray(this.options.request.stopWhen)
273+
? this.options.request.stopWhen
274+
: [this.options.request.stopWhen];
275+
276+
const shouldStop = await isStopConditionMet({
277+
stopConditions,
278+
steps: this.allToolExecutionRounds.map((round) => ({
279+
stepType: 'continue' as const,
280+
text: extractTextFromResponse(round.response),
281+
toolCalls: round.toolCalls,
282+
toolResults: round.toolResults.map((tr) => ({
283+
toolCallId: tr.callId,
284+
toolName: round.toolCalls.find((tc) => tc.id === tr.callId)?.name ?? '',
285+
result: JSON.parse(tr.output),
286+
})),
287+
response: round.response,
288+
usage: round.response.usage,
289+
finishReason: undefined, // OpenResponsesNonStreamingResponse doesn't have finishReason
290+
})),
291+
});
292+
293+
if (shouldStop) {
294+
break;
295+
}
296+
}
297+
268298
const currentToolCalls = extractToolCallsFromResponse(currentResponse);
269299

270300
if (currentToolCalls.length === 0) {
@@ -280,13 +310,6 @@ export class ModelResult {
280310
break;
281311
}
282312

283-
// Store execution round info
284-
this.allToolExecutionRounds.push({
285-
round: currentRound,
286-
toolCalls: currentToolCalls,
287-
response: currentResponse,
288-
});
289-
290313
// Build turn context for this round (for async parameter resolution only)
291314
const turnContext: TurnContext = {
292315
numberOfTurns: currentRound + 1, // 1-indexed
@@ -334,6 +357,14 @@ export class ModelResult {
334357
});
335358
}
336359

360+
// Store execution round info including tool results
361+
this.allToolExecutionRounds.push({
362+
round: currentRound,
363+
toolCalls: currentToolCalls,
364+
response: currentResponse,
365+
toolResults,
366+
});
367+
337368
// Execute nextTurnParams functions for tools that were called
338369
if (this.options.tools && currentToolCalls.length > 0) {
339370
if (!this.resolvedRequest) {
@@ -531,29 +562,10 @@ export class ModelResult {
531562
// Execute tools if needed
532563
await this.executeToolsIfNeeded();
533564

534-
// Yield function call output for each executed tool
565+
// Yield function call outputs for each executed tool
535566
for (const round of this.allToolExecutionRounds) {
536-
for (const toolCall of round.toolCalls) {
537-
// Find the tool to check if it was executed
538-
const tool = this.options.tools?.find((t) => t.function.name === toolCall.name);
539-
if (!tool || !hasExecuteFunction(tool)) {
540-
continue;
541-
}
542-
543-
// Get the result from preliminary results or construct from the response
544-
const prelimResults = this.preliminaryResults.get(toolCall.id);
545-
const result =
546-
prelimResults && prelimResults.length > 0
547-
? prelimResults[prelimResults.length - 1] // Last result is the final output
548-
: undefined;
549-
550-
// Yield function call output in responses format
551-
yield {
552-
type: 'function_call_output' as const,
553-
id: `output_${toolCall.id}`,
554-
callId: toolCall.id,
555-
output: result !== undefined ? JSON.stringify(result) : '',
556-
} as models.OpenResponsesFunctionCallOutput;
567+
for (const toolResult of round.toolResults) {
568+
yield toolResult;
557569
}
558570
}
559571

src/lib/stream-transformers.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,12 @@ async function* buildMessageStreamCore(
137137
break;
138138
}
139139

140+
case 'response.completed':
141+
case 'response.failed':
142+
case 'response.incomplete':
143+
// Stream is complete, stop consuming
144+
return;
145+
140146
default:
141147
// Ignore other event types - this is intentionally not exhaustive
142148
// as we only care about specific events for message building

0 commit comments

Comments
 (0)