Skip to content

Commit be68a2d

Browse files
committed
feat: enable real-time streaming of generator tool preliminary results
This change enables generator tool yields to be streamed to consumers AS they happen during execution, instead of being batched after completion. Changes: - Add ToolEventBroadcaster for push-based multi-consumer event streaming - Wire onPreliminaryResult callback in executeToolRound to broadcast events - Update getToolStream() and getFullResponsesStream() to consume broadcast - Fix Zod v4 toJSONSchema target to 'draft-7' (openapi-3.0 was invalid) - Add comprehensive unit tests for ToolEventBroadcaster - Fix test files to use draft-7 target
1 parent a96ad34 commit be68a2d

File tree

6 files changed

+470
-41
lines changed

6 files changed

+470
-41
lines changed

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,6 @@ export {
109109
} from './lib/tool-types.js';
110110
// Turn context helpers
111111
export { buildTurnContext, normalizeInputToArray } from './lib/turn-context.js';
112+
// Real-time tool event broadcasting
113+
export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js';
112114
export * from './sdk/sdk.js';

src/lib/model-result.ts

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type {
1212
ToolStreamEvent,
1313
TurnContext,
1414
} from './tool-types.js';
15+
import { ToolEventBroadcaster } from './tool-event-broadcaster.js';
1516

1617
import { betaResponsesSend } from '../funcs/betaResponsesSend.js';
1718
import {
@@ -106,7 +107,11 @@ export class ModelResult<TTools extends readonly Tool[]> {
106107
private initPromise: Promise<void> | null = null;
107108
private toolExecutionPromise: Promise<void> | null = null;
108109
private finalResponse: models.OpenResponsesNonStreamingResponse | null = null;
109-
private preliminaryResults: Map<string, unknown[]> = new Map();
110+
private toolEventBroadcaster: ToolEventBroadcaster<{
111+
type: 'preliminary_result';
112+
toolCallId: string;
113+
result: InferToolEventsUnion<TTools>;
114+
}> | null = null;
110115
private allToolExecutionRounds: Array<{
111116
round: number;
112117
toolCalls: ParsedToolCall<Tool>[];
@@ -163,8 +168,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
163168
// Already resolved, extract non-function fields
164169
// Since request is CallModelInput, we need to filter out stopWhen
165170
// Note: tools are already in API format at this point (converted in callModel())
166-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
167-
const { stopWhen, ...rest } = this.options.request;
171+
const { stopWhen: _, ...rest } = this.options.request;
168172
// Cast to ResolvedCallModelInput - we know it's resolved if hasAsyncFunctions returned false
169173
baseRequest = rest as ResolvedCallModelInput;
170174
}
@@ -325,12 +329,18 @@ export class ModelResult<TTools extends readonly Tool[]> {
325329
continue;
326330
}
327331

328-
const result = await executeTool(tool, toolCall, turnContext);
332+
// Create callback for real-time preliminary results
333+
const onPreliminaryResult = this.toolEventBroadcaster
334+
? (callId: string, resultValue: unknown) => {
335+
this.toolEventBroadcaster!.push({
336+
type: 'preliminary_result' as const,
337+
toolCallId: callId,
338+
result: resultValue as InferToolEventsUnion<TTools>,
339+
});
340+
}
341+
: undefined;
329342

330-
// Store preliminary results
331-
if (result.preliminaryResults && result.preliminaryResults.length > 0) {
332-
this.preliminaryResults.set(toolCall.id, result.preliminaryResults);
333-
}
343+
const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult);
334344

335345
toolResults.push({
336346
type: 'function_call_output' as const,
@@ -480,7 +490,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
480490
/**
481491
* Stream all response events as they arrive.
482492
* Multiple consumers can iterate over this stream concurrently.
483-
* Includes preliminary tool result events after tool execution.
493+
* Preliminary tool results are streamed in REAL-TIME as generator tools yield.
484494
*/
485495
getFullResponsesStream(): AsyncIterableIterator<ResponseStreamEvent<InferToolEventsUnion<TTools>>> {
486496
return async function* (this: ModelResult<TTools>) {
@@ -489,27 +499,34 @@ export class ModelResult<TTools extends readonly Tool[]> {
489499
throw new Error('Stream not initialized');
490500
}
491501

502+
// Create broadcaster for real-time tool events
503+
this.toolEventBroadcaster = new ToolEventBroadcaster();
504+
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();
505+
506+
// Start tool execution in background (completes broadcaster when done)
507+
const executionPromise = this.executeToolsIfNeeded().finally(() => {
508+
this.toolEventBroadcaster?.complete();
509+
});
510+
492511
const consumer = this.reusableStream.createConsumer();
493512

494-
// Yield original events directly
513+
// Yield original API events
495514
for await (const event of consumer) {
496515
yield event;
497516
}
498517

499-
// After stream completes, check if tools were executed and emit preliminary results
500-
await this.executeToolsIfNeeded();
501-
502-
// Emit all preliminary results as new event types
503-
for (const [toolCallId, results] of this.preliminaryResults) {
504-
for (const result of results) {
505-
yield {
506-
type: 'tool.preliminary_result' as const,
507-
toolCallId,
508-
result: result as InferToolEventsUnion<TTools>,
509-
timestamp: Date.now(),
510-
};
511-
}
518+
// Yield tool preliminary results as they arrive (real-time!)
519+
for await (const event of toolEventConsumer) {
520+
yield {
521+
type: 'tool.preliminary_result' as const,
522+
toolCallId: event.toolCallId,
523+
result: event.result,
524+
timestamp: Date.now(),
525+
};
512526
}
527+
528+
// Ensure execution completed (handles errors)
529+
await executionPromise;
513530
}.call(this);
514531
}
515532

@@ -587,7 +604,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
587604

588605
/**
589606
* Stream tool call argument deltas and preliminary results.
590-
* This filters the full event stream to yield:
607+
* Preliminary results are streamed in REAL-TIME as generator tools yield.
591608
* - Tool call argument deltas as { type: "delta", content: string }
592609
* - Preliminary results as { type: "preliminary_result", toolCallId, result }
593610
*/
@@ -598,27 +615,30 @@ export class ModelResult<TTools extends readonly Tool[]> {
598615
throw new Error('Stream not initialized');
599616
}
600617

601-
// Yield tool deltas as structured events
618+
// Create broadcaster for real-time tool events
619+
this.toolEventBroadcaster = new ToolEventBroadcaster();
620+
const toolEventConsumer = this.toolEventBroadcaster.createConsumer();
621+
622+
// Start tool execution in background (completes broadcaster when done)
623+
const executionPromise = this.executeToolsIfNeeded().finally(() => {
624+
this.toolEventBroadcaster?.complete();
625+
});
626+
627+
// Yield tool deltas from API stream
602628
for await (const delta of extractToolDeltas(this.reusableStream)) {
603629
yield {
604630
type: 'delta' as const,
605631
content: delta,
606632
};
607633
}
608634

609-
// After stream completes, check if tools were executed and emit preliminary results
610-
await this.executeToolsIfNeeded();
611-
612-
// Emit all preliminary results
613-
for (const [toolCallId, results] of this.preliminaryResults) {
614-
for (const result of results) {
615-
yield {
616-
type: 'preliminary_result' as const,
617-
toolCallId,
618-
result: result as InferToolEventsUnion<TTools>,
619-
};
620-
}
635+
// Yield tool events as they arrive (real-time!)
636+
for await (const event of toolEventConsumer) {
637+
yield event;
621638
}
639+
640+
// Ensure execution completed (handles errors)
641+
await executionPromise;
622642
}.call(this);
623643
}
624644

src/lib/tool-event-broadcaster.ts

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/**
2+
* A push-based event broadcaster that supports multiple concurrent consumers.
3+
* Similar to ReusableReadableStream but for push-based events from tool execution.
4+
*
5+
* Each consumer gets their own position in the buffer and receives all events
6+
* from their join point onward. This enables real-time streaming of generator
7+
* tool preliminary results to multiple consumers simultaneously.
8+
*
9+
* @template T - The event type being broadcast
10+
*/
11+
export class ToolEventBroadcaster<T> {
12+
private buffer: T[] = [];
13+
private consumers = new Map<number, ConsumerState>();
14+
private nextConsumerId = 0;
15+
private isComplete = false;
16+
private completionError: Error | null = null;
17+
18+
/**
19+
* Push a new event to all consumers.
20+
* Events are buffered so late-joining consumers can catch up.
21+
*/
22+
push(event: T): void {
23+
if (this.isComplete) return;
24+
this.buffer.push(event);
25+
this.notifyWaitingConsumers();
26+
}
27+
28+
/**
29+
* Mark the broadcaster as complete - no more events will be pushed.
30+
* Optionally pass an error to signal failure to all consumers.
31+
*/
32+
complete(error?: Error): void {
33+
this.isComplete = true;
34+
this.completionError = error ?? null;
35+
this.notifyWaitingConsumers();
36+
}
37+
38+
/**
39+
* Create a new consumer that can independently iterate over events.
40+
* Consumers can join at any time and will receive events from position 0.
41+
* Multiple consumers can be created and will all receive the same events.
42+
*/
43+
createConsumer(): AsyncIterableIterator<T> {
44+
const consumerId = this.nextConsumerId++;
45+
const state: ConsumerState = {
46+
position: 0,
47+
waitingPromise: null,
48+
cancelled: false,
49+
};
50+
this.consumers.set(consumerId, state);
51+
52+
// eslint-disable-next-line @typescript-eslint/no-this-alias
53+
const self = this;
54+
55+
return {
56+
async next(): Promise<IteratorResult<T>> {
57+
const consumer = self.consumers.get(consumerId);
58+
if (!consumer) {
59+
return { done: true, value: undefined };
60+
}
61+
62+
if (consumer.cancelled) {
63+
return { done: true, value: undefined };
64+
}
65+
66+
// Return buffered event if available
67+
if (consumer.position < self.buffer.length) {
68+
const value = self.buffer[consumer.position]!;
69+
consumer.position++;
70+
return { done: false, value };
71+
}
72+
73+
// If complete and caught up, we're done
74+
if (self.isComplete) {
75+
self.consumers.delete(consumerId);
76+
if (self.completionError) {
77+
throw self.completionError;
78+
}
79+
return { done: true, value: undefined };
80+
}
81+
82+
// Set up waiting promise FIRST to avoid race condition
83+
const waitPromise = new Promise<void>((resolve, reject) => {
84+
consumer.waitingPromise = { resolve, reject };
85+
86+
// Immediately check if we should resolve after setting up promise
87+
if (
88+
self.isComplete ||
89+
self.completionError ||
90+
consumer.position < self.buffer.length
91+
) {
92+
resolve();
93+
}
94+
});
95+
96+
await waitPromise;
97+
consumer.waitingPromise = null;
98+
99+
// Recursively try again after waking up
100+
return this.next();
101+
},
102+
103+
async return(): Promise<IteratorResult<T>> {
104+
const consumer = self.consumers.get(consumerId);
105+
if (consumer) {
106+
consumer.cancelled = true;
107+
self.consumers.delete(consumerId);
108+
}
109+
return { done: true, value: undefined };
110+
},
111+
112+
async throw(e?: unknown): Promise<IteratorResult<T>> {
113+
const consumer = self.consumers.get(consumerId);
114+
if (consumer) {
115+
consumer.cancelled = true;
116+
self.consumers.delete(consumerId);
117+
}
118+
throw e;
119+
},
120+
121+
[Symbol.asyncIterator]() {
122+
return this;
123+
},
124+
};
125+
}
126+
127+
/**
128+
* Notify all waiting consumers that new data is available or stream completed
129+
*/
130+
private notifyWaitingConsumers(): void {
131+
for (const consumer of this.consumers.values()) {
132+
if (consumer.waitingPromise) {
133+
if (this.completionError) {
134+
consumer.waitingPromise.reject(this.completionError);
135+
} else {
136+
consumer.waitingPromise.resolve();
137+
}
138+
consumer.waitingPromise = null;
139+
}
140+
}
141+
}
142+
}
143+
144+
interface ConsumerState {
145+
position: number;
146+
waitingPromise: {
147+
resolve: () => void;
148+
reject: (error: Error) => void;
149+
} | null;
150+
cancelled: boolean;
151+
}

src/lib/tool-executor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ import { hasExecuteFunction, isGeneratorTool, isRegularExecuteTool } from './too
1414
* Convert a Zod schema to JSON Schema using Zod v4's toJSONSchema function
1515
*/
1616
export function convertZodToJsonSchema(zodSchema: ZodType): Record<string, unknown> {
17+
// Use draft-7 as it's closest to OpenAPI 3.0's JSON Schema variant
1718
const jsonSchema = toJSONSchema(zodSchema, {
18-
target: 'openapi-3.0',
19+
target: 'draft-7',
1920
});
2021
return jsonSchema;
2122
}

tests/e2e/call-model-tools.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ describe('Enhanced Tool Support for callModel', () => {
2626
});
2727

2828
const jsonSchema = toJSONSchema(schema, {
29-
target: 'openapi-3.0',
29+
target: 'draft-7',
3030
});
3131

3232
expect(jsonSchema).toHaveProperty('type', 'object');
@@ -48,7 +48,7 @@ describe('Enhanced Tool Support for callModel', () => {
4848
});
4949

5050
const jsonSchema = toJSONSchema(schema, {
51-
target: 'openapi-3.0',
51+
target: 'draft-7',
5252
});
5353

5454
expect(jsonSchema.properties?.user).toBeDefined();
@@ -61,7 +61,7 @@ describe('Enhanced Tool Support for callModel', () => {
6161
});
6262

6363
const jsonSchema = toJSONSchema(schema, {
64-
target: 'openapi-3.0',
64+
target: 'draft-7',
6565
});
6666

6767
expect(jsonSchema.properties?.location?.['description']).toBe(

0 commit comments

Comments
 (0)