Skip to content

Commit f5b4f75

Browse files
committed
feat: add real-time streaming for generator tool preliminary results
Generator tools can now stream preliminary results in real-time as they yield, rather than batch-emitting them after all tool execution completes. Changes: - Add ToolEventBroadcaster class for push-based multi-consumer event delivery - Update executeToolRound to pass onPreliminaryResult callback to executeTool - Add executeToolsWithBroadcast method for real-time streaming flows - Update getToolStream and getFullResponsesStream for real-time delivery - Export ToolEventBroadcaster for advanced use cases This enables streaming UI updates during long-running tool operations, similar to Vercel AI SDK's streaming preliminary tool outputs feature.
1 parent 4f14e68 commit f5b4f75

File tree

5 files changed

+687
-36
lines changed

5 files changed

+687
-36
lines changed

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ export {
109109
} from './lib/stream-transformers.js';
110110
// Tool creation helpers
111111
export { tool } from './lib/tool.js';
112+
// Real-time tool event broadcasting
113+
export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js';
112114
export {
113115
hasApprovalRequiredTools,
114116
hasExecuteFunction,

src/lib/model-result.ts

Lines changed: 180 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import type {
1515
TurnContext,
1616
UnsentToolResult,
1717
} from './tool-types.js';
18+
import { ToolEventBroadcaster } from './tool-event-broadcaster.js';
1819

1920
import { betaResponsesSend } from '../funcs/betaResponsesSend.js';
2021
import {
@@ -134,7 +135,11 @@ export class ModelResult<TTools extends readonly Tool[]> {
134135
private initPromise: Promise<void> | null = null;
135136
private toolExecutionPromise: Promise<void> | null = null;
136137
private finalResponse: models.OpenResponsesNonStreamingResponse | null = null;
137-
private preliminaryResults: Map<string, unknown[]> = new Map();
138+
private toolEventBroadcaster: ToolEventBroadcaster<{
139+
type: 'preliminary_result';
140+
toolCallId: string;
141+
result: InferToolEventsUnion<TTools>;
142+
}> | null = null;
138143
private allToolExecutionRounds: Array<{
139144
round: number;
140145
toolCalls: ParsedToolCall<Tool>[];
@@ -402,23 +407,35 @@ export class ModelResult<TTools extends readonly Tool[]> {
402407
/**
403408
* Execute all tools in a single round
404409
* Returns the tool results for API submission
410+
* @param broadcaster - Optional broadcaster for real-time preliminary result streaming
405411
*/
406412
private async executeToolRound(
407413
toolCalls: ParsedToolCall<Tool>[],
408-
turnContext: TurnContext
414+
turnContext: TurnContext,
415+
broadcaster?: ToolEventBroadcaster<{
416+
type: 'preliminary_result';
417+
toolCallId: string;
418+
result: InferToolEventsUnion<TTools>;
419+
}>
409420
): Promise<models.OpenResponsesFunctionCallOutput[]> {
410421
const toolResults: models.OpenResponsesFunctionCallOutput[] = [];
411422

412423
for (const toolCall of toolCalls) {
413424
const tool = this.options.tools?.find((t) => t.function.name === toolCall.name);
414425
if (!tool || !hasExecuteFunction(tool)) continue;
415426

416-
const result = await executeTool(tool, toolCall, turnContext);
427+
// Create callback for real-time preliminary results
428+
const onPreliminaryResult = broadcaster
429+
? (callId: string, result: unknown) => {
430+
broadcaster.push({
431+
type: 'preliminary_result' as const,
432+
toolCallId: callId,
433+
result: result as InferToolEventsUnion<TTools>,
434+
});
435+
}
436+
: undefined;
417437

418-
// Store preliminary results for streaming
419-
if (result.preliminaryResults && result.preliminaryResults.length > 0) {
420-
this.preliminaryResults.set(toolCall.id, result.preliminaryResults);
421-
}
438+
const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult);
422439

423440
toolResults.push({
424441
type: 'function_call_output' as const,
@@ -914,6 +931,127 @@ export class ModelResult<TTools extends readonly Tool[]> {
914931
return this.toolExecutionPromise;
915932
}
916933

934+
/**
935+
* Execute tools with real-time broadcasting of preliminary results.
936+
* This is used by streaming methods that want real-time tool events.
937+
* Unlike executeToolsIfNeeded, this creates a new broadcaster and passes it through.
938+
*/
939+
private async executeToolsWithBroadcast(
940+
broadcaster: ToolEventBroadcaster<{
941+
type: 'preliminary_result';
942+
toolCallId: string;
943+
result: InferToolEventsUnion<TTools>;
944+
}>
945+
): Promise<void> {
946+
try {
947+
await this.initStream();
948+
949+
// If resuming from approval and still pending, don't continue
950+
if (this.isResumingFromApproval && this.currentState?.status === 'awaiting_approval') {
951+
return;
952+
}
953+
954+
// Get initial response
955+
let currentResponse = await this.getInitialResponse();
956+
957+
// Save initial response to state
958+
await this.saveResponseToState(currentResponse);
959+
960+
// Check if tools should be executed
961+
const hasToolCalls = currentResponse.output.some(
962+
(item) => hasTypeProperty(item) && item.type === 'function_call'
963+
);
964+
965+
if (!this.options.tools?.length || !hasToolCalls) {
966+
this.finalResponse = currentResponse;
967+
await this.markStateComplete();
968+
return;
969+
}
970+
971+
// Extract and check tool calls
972+
const toolCalls = extractToolCallsFromResponse(currentResponse);
973+
974+
// Check for approval requirements
975+
if (await this.handleApprovalCheck(toolCalls, 0, currentResponse)) {
976+
return; // Paused for approval
977+
}
978+
979+
if (!this.hasExecutableToolCalls(toolCalls)) {
980+
this.finalResponse = currentResponse;
981+
await this.markStateComplete();
982+
return;
983+
}
984+
985+
// Main execution loop
986+
let currentRound = 0;
987+
988+
while (true) {
989+
// Check for external interruption
990+
if (await this.checkForInterruption(currentResponse)) {
991+
return;
992+
}
993+
994+
// Check stop conditions
995+
if (await this.shouldStopExecution()) {
996+
break;
997+
}
998+
999+
const currentToolCalls = extractToolCallsFromResponse(currentResponse);
1000+
if (currentToolCalls.length === 0) {
1001+
break;
1002+
}
1003+
1004+
// Check for approval requirements
1005+
if (await this.handleApprovalCheck(currentToolCalls, currentRound + 1, currentResponse)) {
1006+
return;
1007+
}
1008+
1009+
if (!this.hasExecutableToolCalls(currentToolCalls)) {
1010+
break;
1011+
}
1012+
1013+
// Build turn context
1014+
const turnContext: TurnContext = { numberOfTurns: currentRound + 1 };
1015+
1016+
// Resolve async functions for this turn
1017+
await this.resolveAsyncFunctionsForTurn(turnContext);
1018+
1019+
// Execute tools WITH broadcaster for real-time events
1020+
const toolResults = await this.executeToolRound(currentToolCalls, turnContext, broadcaster);
1021+
1022+
// Track execution round
1023+
this.allToolExecutionRounds.push({
1024+
round: currentRound,
1025+
toolCalls: currentToolCalls,
1026+
response: currentResponse,
1027+
toolResults,
1028+
});
1029+
1030+
// Save tool results to state
1031+
await this.saveToolResultsToState(toolResults);
1032+
1033+
// Apply nextTurnParams
1034+
await this.applyNextTurnParams(currentToolCalls);
1035+
1036+
// Make follow-up request
1037+
currentResponse = await this.makeFollowupRequest(currentResponse, toolResults);
1038+
1039+
// Save new response to state
1040+
await this.saveResponseToState(currentResponse);
1041+
1042+
currentRound++;
1043+
}
1044+
1045+
// Validate and finalize
1046+
this.validateFinalResponse(currentResponse);
1047+
this.finalResponse = currentResponse;
1048+
await this.markStateComplete();
1049+
} finally {
1050+
// Always complete the broadcaster when done
1051+
broadcaster.complete();
1052+
}
1053+
}
1054+
9171055
/**
9181056
* Internal helper to get the text after tool execution
9191057
*/
@@ -958,7 +1096,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
9581096
/**
9591097
* Stream all response events as they arrive.
9601098
* Multiple consumers can iterate over this stream concurrently.
961-
* Includes preliminary tool result events after tool execution.
1099+
* Preliminary tool results are streamed in REAL-TIME as generator tools yield.
9621100
*/
9631101
getFullResponsesStream(): AsyncIterableIterator<ResponseStreamEvent<InferToolEventsUnion<TTools>>> {
9641102
return async function* (this: ModelResult<TTools>) {
@@ -967,27 +1105,32 @@ export class ModelResult<TTools extends readonly Tool[]> {
9671105
throw new Error('Stream not initialized');
9681106
}
9691107

1108+
// Create broadcaster for real-time tool events
1109+
this.toolEventBroadcaster = new ToolEventBroadcaster();
1110+
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();
1111+
1112+
// Start tool execution in background (doesn't block)
1113+
const executionPromise = this.executeToolsWithBroadcast(this.toolEventBroadcaster);
1114+
9701115
const consumer = this.reusableStream.createConsumer();
9711116

972-
// Yield original events directly
1117+
// Yield original API events
9731118
for await (const event of consumer) {
9741119
yield event;
9751120
}
9761121

977-
// After stream completes, check if tools were executed and emit preliminary results
978-
await this.executeToolsIfNeeded();
979-
980-
// Emit all preliminary results as new event types
981-
for (const [toolCallId, results] of this.preliminaryResults) {
982-
for (const result of results) {
983-
yield {
984-
type: 'tool.preliminary_result' as const,
985-
toolCallId,
986-
result: result as InferToolEventsUnion<TTools>,
987-
timestamp: Date.now(),
988-
};
989-
}
1122+
// Yield tool preliminary results as they arrive (real-time!)
1123+
for await (const event of toolEventConsumer) {
1124+
yield {
1125+
type: 'tool.preliminary_result' as const,
1126+
toolCallId: event.toolCallId,
1127+
result: event.result,
1128+
timestamp: Date.now(),
1129+
};
9901130
}
1131+
1132+
// Ensure execution completed (handles errors)
1133+
await executionPromise;
9911134
}.call(this);
9921135
}
9931136

@@ -1065,7 +1208,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
10651208

10661209
/**
10671210
* Stream tool call argument deltas and preliminary results.
1068-
* This filters the full event stream to yield:
1211+
* Preliminary results are streamed in REAL-TIME as generator tools yield.
10691212
* - Tool call argument deltas as { type: "delta", content: string }
10701213
* - Preliminary results as { type: "preliminary_result", toolCallId, result }
10711214
*/
@@ -1076,27 +1219,28 @@ export class ModelResult<TTools extends readonly Tool[]> {
10761219
throw new Error('Stream not initialized');
10771220
}
10781221

1079-
// Yield tool deltas as structured events
1222+
// Create broadcaster for real-time tool events
1223+
this.toolEventBroadcaster = new ToolEventBroadcaster();
1224+
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();
1225+
1226+
// Start tool execution in background (doesn't block)
1227+
const executionPromise = this.executeToolsWithBroadcast(this.toolEventBroadcaster);
1228+
1229+
// Yield tool deltas from API stream
10801230
for await (const delta of extractToolDeltas(this.reusableStream)) {
10811231
yield {
10821232
type: 'delta' as const,
10831233
content: delta,
10841234
};
10851235
}
10861236

1087-
// After stream completes, check if tools were executed and emit preliminary results
1088-
await this.executeToolsIfNeeded();
1089-
1090-
// Emit all preliminary results
1091-
for (const [toolCallId, results] of this.preliminaryResults) {
1092-
for (const result of results) {
1093-
yield {
1094-
type: 'preliminary_result' as const,
1095-
toolCallId,
1096-
result: result as InferToolEventsUnion<TTools>,
1097-
};
1098-
}
1237+
// Yield tool events as they arrive (real-time!)
1238+
for await (const event of toolEventConsumer) {
1239+
yield event;
10991240
}
1241+
1242+
// Ensure execution completed (handles errors)
1243+
await executionPromise;
11001244
}.call(this);
11011245
}
11021246

0 commit comments

Comments
 (0)