Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/public-wings-dress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

fix: #526 separate tool_call_item and tool_call_output_item in stream events
13 changes: 12 additions & 1 deletion packages/agents-core/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
maybeResetToolChoice,
ProcessedResponse,
processModelResponse,
streamStepItemsToRunResult,
} from './runImplementation';
import { RunItem } from './items';
import {
Expand Down Expand Up @@ -807,6 +808,14 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
);

result.state._lastProcessedResponse = processedResponse;

// Record the items emitted directly from the model response so we do not
// stream them again after tools and other side effects finish.
const preToolItems = new Set(processedResponse.newItems);
if (preToolItems.size > 0) {
streamStepItemsToRunResult(result, processedResponse.newItems);
}

const turnResult = await executeToolsAndSideEffects<TContext>(
currentAgent,
result.state._originalInput,
Expand All @@ -817,7 +826,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
result.state,
);

addStepToRunResult(result, turnResult);
addStepToRunResult(result, turnResult, {
skipItems: preToolItems,
});

result.state._toolUseTracker.addToolUse(
currentAgent,
Expand Down
75 changes: 56 additions & 19 deletions packages/agents-core/src/runImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1176,32 +1176,69 @@ export async function checkForFinalOutputFromTools<
throw new UserError(`Invalid toolUseBehavior: ${toolUseBehavior}`, state);
}

function getRunItemStreamEventName(
item: RunItem,
): RunItemStreamEventName | undefined {
if (item instanceof RunMessageOutputItem) {
return 'message_output_created';
}
if (item instanceof RunHandoffCallItem) {
return 'handoff_requested';
}
if (item instanceof RunHandoffOutputItem) {
return 'handoff_occurred';
}
if (item instanceof RunToolCallItem) {
return 'tool_called';
}
if (item instanceof RunToolCallOutputItem) {
return 'tool_output';
}
if (item instanceof RunReasoningItem) {
return 'reasoning_item_created';
}
if (item instanceof RunToolApprovalItem) {
return 'tool_approval_requested';
}
return undefined;
}

function enqueueRunItemStreamEvent(
result: StreamedRunResult<any, any>,
item: RunItem,
): void {
const itemName = getRunItemStreamEventName(item);
if (!itemName) {
logger.warn('Unknown item type: ', item);
return;
}
result._addItem(new RunItemStreamEvent(itemName, item));
}

export function streamStepItemsToRunResult(
result: StreamedRunResult<any, any>,
items: RunItem[],
): void {
// Preserve the order in which items were generated by enqueueing each one
// immediately on the streamed result.
for (const item of items) {
enqueueRunItemStreamEvent(result, item);
}
}

export function addStepToRunResult(
result: StreamedRunResult<any, any>,
step: SingleStepResult,
options?: { skipItems?: Set<RunItem> },
): void {
// skipItems contains run items that were already streamed so we avoid
// enqueueing duplicate events for the same instance.
const skippedItems = options?.skipItems;
for (const item of step.newStepItems) {
let itemName: RunItemStreamEventName;
if (item instanceof RunMessageOutputItem) {
itemName = 'message_output_created';
} else if (item instanceof RunHandoffCallItem) {
itemName = 'handoff_requested';
} else if (item instanceof RunHandoffOutputItem) {
itemName = 'handoff_occurred';
} else if (item instanceof RunToolCallItem) {
itemName = 'tool_called';
} else if (item instanceof RunToolCallOutputItem) {
itemName = 'tool_output';
} else if (item instanceof RunReasoningItem) {
itemName = 'reasoning_item_created';
} else if (item instanceof RunToolApprovalItem) {
itemName = 'tool_approval_requested';
} else {
logger.warn('Unknown item type: ', item);
if (skippedItems?.has(item)) {
continue;
}

result._addItem(new RunItemStreamEvent(itemName, item));
enqueueRunItemStreamEvent(result, item);
}
}

Expand Down
221 changes: 218 additions & 3 deletions packages/agents-core/test/run.stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { describe, it, expect, beforeAll } from 'vitest';
import { describe, it, expect, beforeAll, vi } from 'vitest';
import { z } from 'zod';
import {
Agent,
run,
Expand All @@ -8,12 +9,14 @@ import {
Usage,
RunStreamEvent,
RunAgentUpdatedStreamEvent,
RunItemStreamEvent,
handoff,
Model,
ModelRequest,
ModelResponse,
StreamEvent,
FunctionCallItem,
tool,
} from '../src';
import { FakeModel, FakeModelProvider, fakeModelMessage } from './stubs';

Expand Down Expand Up @@ -150,7 +153,8 @@ describe('Runner.run (streaming)', () => {

// Track agent_end events on both the agent and runner
const agentEndEvents: Array<{ context: any; output: string }> = [];
const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = [];
const runnerEndEvents: Array<{ context: any; agent: any; output: string }> =
[];

agent.on('agent_end', (context, output) => {
agentEndEvents.push({ context, output });
Expand All @@ -174,9 +178,220 @@ describe('Runner.run (streaming)', () => {
// Verify agent_end was called on both agent and runner
expect(agentEndEvents).toHaveLength(1);
expect(agentEndEvents[0].output).toBe('Final output');

expect(runnerEndEvents).toHaveLength(1);
expect(runnerEndEvents[0].agent).toBe(agent);
expect(runnerEndEvents[0].output).toBe('Final output');
});

it('streams tool_called before the tool finishes executing', async () => {
let releaseTool: (() => void) | undefined;
const toolExecuted = vi.fn();

const blockingTool = tool({
name: 'blocker',
description: 'blocks until released',
parameters: z.object({ value: z.string() }),
execute: async ({ value }) => {
toolExecuted(value);
await new Promise<void>((resolve) => {
releaseTool = resolve;
});
return `result:${value}`;
},
});

const functionCall: FunctionCallItem = {
id: 'call-1',
type: 'function_call',
name: blockingTool.name,
callId: 'c1',
status: 'completed',
arguments: JSON.stringify({ value: 'test' }),
};

const toolResponse: ModelResponse = {
output: [functionCall],
usage: new Usage(),
};

const finalMessageResponse: ModelResponse = {
output: [fakeModelMessage('done')],
usage: new Usage(),
};

class BlockingStreamModel implements Model {
#callCount = 0;

async getResponse(_req: ModelRequest): Promise<ModelResponse> {
return this.#callCount === 0 ? toolResponse : finalMessageResponse;
}

async *getStreamedResponse(
_req: ModelRequest,
): AsyncIterable<StreamEvent> {
const currentCall = this.#callCount++;
const response =
currentCall === 0 ? toolResponse : finalMessageResponse;
yield {
type: 'response_done',
response: {
id: `resp-${currentCall}`,
usage: {
requests: 1,
inputTokens: 0,
outputTokens: 0,
totalTokens: 0,
},
output: response.output,
},
} as any;
}
}

const agent = new Agent({
name: 'BlockingAgent',
model: new BlockingStreamModel(),
tools: [blockingTool],
});

const runner = new Runner();
const result = await runner.run(agent, 'hello', { stream: true });
const iterator = result.toStream()[Symbol.asyncIterator]();

const collected: RunStreamEvent[] = [];
const firstRunItemPromise: Promise<RunItemStreamEvent> = (async () => {
while (true) {
const next = await iterator.next();
if (next.done) {
throw new Error('Stream ended before emitting a run item event');
}
collected.push(next.value);
if (next.value.type === 'run_item_stream_event') {
return next.value;
}
}
})();

let firstRunItemResolved = false;
void firstRunItemPromise.then(() => {
firstRunItemResolved = true;
});

// Allow the tool execution to start.
await new Promise((resolve) => setImmediate(resolve));

expect(toolExecuted).toHaveBeenCalledWith('test');
expect(releaseTool).toBeDefined();
expect(firstRunItemResolved).toBe(true);

const firstRunItem = await firstRunItemPromise;
expect(firstRunItem.name).toBe('tool_called');

releaseTool?.();

while (true) {
const next = await iterator.next();
if (next.done) {
break;
}
collected.push(next.value);
}

await result.completed;

const toolCalledIndex = collected.findIndex(
(event) =>
event.type === 'run_item_stream_event' && event.name === 'tool_called',
);
const toolOutputIndex = collected.findIndex(
(event) =>
event.type === 'run_item_stream_event' && event.name === 'tool_output',
);

expect(toolCalledIndex).toBeGreaterThan(-1);
expect(toolOutputIndex).toBeGreaterThan(-1);
expect(toolCalledIndex).toBeLessThan(toolOutputIndex);
});

it('emits run item events in the order items are generated', async () => {
const sequenceTool = tool({
name: 'report',
description: 'Generate a report',
parameters: z.object({}),
execute: async () => 'report ready',
});

const functionCall: FunctionCallItem = {
id: 'call-1',
type: 'function_call',
name: sequenceTool.name,
callId: 'c1',
status: 'completed',
arguments: '{}',
};

const firstTurnResponse: ModelResponse = {
output: [fakeModelMessage('Starting work'), functionCall],
usage: new Usage(),
};

const secondTurnResponse: ModelResponse = {
output: [fakeModelMessage('All done')],
usage: new Usage(),
};

class SequencedStreamModel implements Model {
#turn = 0;

async getResponse(_req: ModelRequest): Promise<ModelResponse> {
return this.#turn === 0 ? firstTurnResponse : secondTurnResponse;
}

async *getStreamedResponse(
_req: ModelRequest,
): AsyncIterable<StreamEvent> {
const response =
this.#turn === 0 ? firstTurnResponse : secondTurnResponse;
this.#turn += 1;
yield {
type: 'response_done',
response: {
id: `resp-${this.#turn}`,
usage: {
requests: 1,
inputTokens: 0,
outputTokens: 0,
totalTokens: 0,
},
output: response.output,
},
} as any;
}
}

const agent = new Agent({
name: 'SequencedAgent',
model: new SequencedStreamModel(),
tools: [sequenceTool],
});

const runner = new Runner();
const result = await runner.run(agent, 'begin', { stream: true });

const itemEventNames: string[] = [];
for await (const event of result.toStream()) {
if (event.type === 'run_item_stream_event') {
itemEventNames.push(event.name);
}
}
await result.completed;

expect(itemEventNames).toEqual([
'message_output_created',
'tool_called',
'tool_output',
'message_output_created',
]);
});
});
Loading