Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,6 @@ export {
} from './lib/tool-types.js';
// Turn context helpers
export { buildTurnContext, normalizeInputToArray } from './lib/turn-context.js';
// Real-time tool event broadcasting
export { ToolEventBroadcaster } from './lib/tool-event-broadcaster.js';
export * from './sdk/sdk.js';
109 changes: 72 additions & 37 deletions src/lib/model-result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
ToolStreamEvent,
TurnContext,
} from './tool-types.js';
import { ToolEventBroadcaster } from './tool-event-broadcaster.js';

import { betaResponsesSend } from '../funcs/betaResponsesSend.js';
import {
Expand Down Expand Up @@ -106,7 +107,11 @@ export class ModelResult<TTools extends readonly Tool[]> {
private initPromise: Promise<void> | null = null;
private toolExecutionPromise: Promise<void> | null = null;
private finalResponse: models.OpenResponsesNonStreamingResponse | null = null;
private preliminaryResults: Map<string, unknown[]> = new Map();
private toolEventBroadcaster: ToolEventBroadcaster<{
type: 'preliminary_result';
toolCallId: string;
result: InferToolEventsUnion<TTools>;
}> | null = null;
private allToolExecutionRounds: Array<{
round: number;
toolCalls: ParsedToolCall<Tool>[];
Expand All @@ -120,6 +125,21 @@ export class ModelResult<TTools extends readonly Tool[]> {
this.options = options;
}

/**
* Get or create the tool event broadcaster (lazy initialization).
* Ensures only one broadcaster exists for the lifetime of this ModelResult.
*/
private ensureBroadcaster(): ToolEventBroadcaster<{
type: 'preliminary_result';
toolCallId: string;
result: InferToolEventsUnion<TTools>;
}> {
if (!this.toolEventBroadcaster) {
this.toolEventBroadcaster = new ToolEventBroadcaster();
}
return this.toolEventBroadcaster;
}

/**
* Type guard to check if a value is a non-streaming response
*/
Expand Down Expand Up @@ -163,8 +183,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
// Already resolved, extract non-function fields
// Since request is CallModelInput, we need to filter out stopWhen
// Note: tools are already in API format at this point (converted in callModel())
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { stopWhen, ...rest } = this.options.request;
const { stopWhen: _, ...rest } = this.options.request;
// Cast to ResolvedCallModelInput - we know it's resolved if hasAsyncFunctions returned false
baseRequest = rest as ResolvedCallModelInput;
}
Expand Down Expand Up @@ -325,12 +344,18 @@ export class ModelResult<TTools extends readonly Tool[]> {
continue;
}

const result = await executeTool(tool, toolCall, turnContext);
// Create callback for real-time preliminary results
const onPreliminaryResult = this.toolEventBroadcaster
? (callId: string, resultValue: unknown) => {
this.toolEventBroadcaster?.push({
type: 'preliminary_result' as const,
toolCallId: callId,
result: resultValue as InferToolEventsUnion<TTools>,
});
}
: undefined;

// Store preliminary results
if (result.preliminaryResults && result.preliminaryResults.length > 0) {
this.preliminaryResults.set(toolCall.id, result.preliminaryResults);
}
const result = await executeTool(tool, toolCall, turnContext, onPreliminaryResult);

toolResults.push({
type: 'function_call_output' as const,
Expand Down Expand Up @@ -480,7 +505,7 @@ export class ModelResult<TTools extends readonly Tool[]> {
/**
* Stream all response events as they arrive.
* Multiple consumers can iterate over this stream concurrently.
* Includes preliminary tool result events after tool execution.
* Preliminary tool results are streamed in REAL-TIME as generator tools yield.
*/
getFullResponsesStream(): AsyncIterableIterator<ResponseStreamEvent<InferToolEventsUnion<TTools>>> {
return async function* (this: ModelResult<TTools>) {
Expand All @@ -489,27 +514,34 @@ export class ModelResult<TTools extends readonly Tool[]> {
throw new Error('Stream not initialized');
}

// Get or create broadcaster for real-time tool events (lazy init prevents race conditions)
const broadcaster = this.ensureBroadcaster();
const toolEventConsumer = broadcaster.createConsumer();

// Start tool execution in background (completes broadcaster when done)
const executionPromise = this.executeToolsIfNeeded().finally(() => {
broadcaster.complete();
});

const consumer = this.reusableStream.createConsumer();

// Yield original events directly
// Yield original API events
for await (const event of consumer) {
yield event;
}

// After stream completes, check if tools were executed and emit preliminary results
await this.executeToolsIfNeeded();

// Emit all preliminary results as new event types
for (const [toolCallId, results] of this.preliminaryResults) {
for (const result of results) {
yield {
type: 'tool.preliminary_result' as const,
toolCallId,
result: result as InferToolEventsUnion<TTools>,
timestamp: Date.now(),
};
}
// Yield tool preliminary results as they arrive (real-time!)
for await (const event of toolEventConsumer) {
yield {
type: 'tool.preliminary_result' as const,
toolCallId: event.toolCallId,
result: event.result,
timestamp: Date.now(),
};
}

// Ensure execution completed (handles errors)
await executionPromise;
}.call(this);
}

Expand Down Expand Up @@ -587,7 +619,7 @@ export class ModelResult<TTools extends readonly Tool[]> {

/**
* Stream tool call argument deltas and preliminary results.
* This filters the full event stream to yield:
* Preliminary results are streamed in REAL-TIME as generator tools yield.
* - Tool call argument deltas as { type: "delta", content: string }
* - Preliminary results as { type: "preliminary_result", toolCallId, result }
*/
Expand All @@ -598,27 +630,30 @@ export class ModelResult<TTools extends readonly Tool[]> {
throw new Error('Stream not initialized');
}

// Yield tool deltas as structured events
// Get or create broadcaster for real-time tool events (lazy init prevents race conditions)
const broadcaster = this.ensureBroadcaster();
const toolEventConsumer = broadcaster.createConsumer();

// Start tool execution in background (completes broadcaster when done)
const executionPromise = this.executeToolsIfNeeded().finally(() => {
broadcaster.complete();
});

// Yield tool deltas from API stream
for await (const delta of extractToolDeltas(this.reusableStream)) {
yield {
type: 'delta' as const,
content: delta,
};
}

// After stream completes, check if tools were executed and emit preliminary results
await this.executeToolsIfNeeded();

// Emit all preliminary results
for (const [toolCallId, results] of this.preliminaryResults) {
for (const result of results) {
yield {
type: 'preliminary_result' as const,
toolCallId,
result: result as InferToolEventsUnion<TTools>,
};
}
// Yield tool events as they arrive (real-time!)
for await (const event of toolEventConsumer) {
yield event;
}

// Ensure execution completed (handles errors)
await executionPromise;
}.call(this);
}

Expand Down
170 changes: 170 additions & 0 deletions src/lib/tool-event-broadcaster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* A push-based event broadcaster that supports multiple concurrent consumers.
* Similar to ReusableReadableStream but for push-based events from tool execution.
*
* Each consumer gets their own position in the buffer and receives all events
* from their join point onward. This enables real-time streaming of generator
* tool preliminary results to multiple consumers simultaneously.
*
* @template T - The event type being broadcast
*/
export class ToolEventBroadcaster<T> {
private buffer: T[] = [];
private consumers = new Map<number, ConsumerState>();
private nextConsumerId = 0;
private isComplete = false;
private completionError: Error | null = null;

/**
* Push a new event to all consumers.
* Events are buffered so late-joining consumers can catch up.
*/
push(event: T): void {
if (this.isComplete) {
return;
}
this.buffer.push(event);
this.notifyWaitingConsumers();
}

/**
* Mark the broadcaster as complete - no more events will be pushed.
* Optionally pass an error to signal failure to all consumers.
* Cleans up buffer and consumers after completion.
*/
complete(error?: Error): void {
this.isComplete = true;
this.completionError = error ?? null;
this.notifyWaitingConsumers();
// Schedule cleanup after consumers have processed completion
queueMicrotask(() => this.cleanup());
}

/**
* Clean up resources after all consumers have finished.
* Called automatically after complete(), but can be called manually.
*/
private cleanup(): void {
// Only cleanup if complete and all consumers are done
if (this.isComplete && this.consumers.size === 0) {
this.buffer = [];
}
}

/**
* Create a new consumer that can independently iterate over events.
* Consumers can join at any time and will receive events from position 0.
* Multiple consumers can be created and will all receive the same events.
*/
createConsumer(): AsyncIterableIterator<T> {
const consumerId = this.nextConsumerId++;
const state: ConsumerState = {
position: 0,
waitingPromise: null,
cancelled: false,
};
this.consumers.set(consumerId, state);

// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;

return {
async next(): Promise<IteratorResult<T>> {
const consumer = self.consumers.get(consumerId);
if (!consumer) {
return { done: true, value: undefined };
}

if (consumer.cancelled) {
return { done: true, value: undefined };
}

// Return buffered event if available
if (consumer.position < self.buffer.length) {
const value = self.buffer[consumer.position]!;
consumer.position++;
return { done: false, value };
}

// If complete and caught up, we're done
if (self.isComplete) {
self.consumers.delete(consumerId);
self.cleanup();
if (self.completionError) {
throw self.completionError;
}
return { done: true, value: undefined };
}

// Set up waiting promise FIRST to avoid race condition
const waitPromise = new Promise<void>((resolve, reject) => {
consumer.waitingPromise = { resolve, reject };

// Immediately check if we should resolve after setting up promise
if (
self.isComplete ||
self.completionError ||
consumer.position < self.buffer.length
) {
resolve();
}
});

await waitPromise;
consumer.waitingPromise = null;

// Recursively try again after waking up
return this.next();
},

async return(): Promise<IteratorResult<T>> {
const consumer = self.consumers.get(consumerId);
if (consumer) {
consumer.cancelled = true;
self.consumers.delete(consumerId);
self.cleanup();
}
return { done: true, value: undefined };
},

async throw(e?: unknown): Promise<IteratorResult<T>> {
const consumer = self.consumers.get(consumerId);
if (consumer) {
consumer.cancelled = true;
self.consumers.delete(consumerId);
self.cleanup();
}
throw e;
},

[Symbol.asyncIterator]() {
return this;
},
};
}

/**
* Notify all waiting consumers that new data is available or stream completed
*/
private notifyWaitingConsumers(): void {
for (const consumer of this.consumers.values()) {
if (consumer.waitingPromise) {
if (this.completionError) {
consumer.waitingPromise.reject(this.completionError);
} else {
consumer.waitingPromise.resolve();
}
consumer.waitingPromise = null;
}
}
}
}

interface ConsumerState {
position: number;
waitingPromise: {
resolve: () => void;
reject: (error: Error) => void;
} | null;
cancelled: boolean;
}
Loading