diff --git a/.changeset/public-wings-dress.md b/.changeset/public-wings-dress.md new file mode 100644 index 00000000..213ba2d1 --- /dev/null +++ b/.changeset/public-wings-dress.md @@ -0,0 +1,5 @@ +--- +'@openai/agents-core': patch +--- + +fix: #526 separate tool_call_item and tool_call_output_item in stream events diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index ba37341a..47471505 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -39,6 +39,7 @@ import { maybeResetToolChoice, ProcessedResponse, processModelResponse, + streamStepItemsToRunResult, } from './runImplementation'; import { RunItem } from './items'; import { @@ -807,6 +808,14 @@ export class Runner extends RunHooks> { ); result.state._lastProcessedResponse = processedResponse; + + // Record the items emitted directly from the model response so we do not + // stream them again after tools and other side effects finish. + const preToolItems = new Set(processedResponse.newItems); + if (preToolItems.size > 0) { + streamStepItemsToRunResult(result, processedResponse.newItems); + } + const turnResult = await executeToolsAndSideEffects( currentAgent, result.state._originalInput, @@ -817,7 +826,9 @@ export class Runner extends RunHooks> { result.state, ); - addStepToRunResult(result, turnResult); + addStepToRunResult(result, turnResult, { + skipItems: preToolItems, + }); result.state._toolUseTracker.addToolUse( currentAgent, diff --git a/packages/agents-core/src/runImplementation.ts b/packages/agents-core/src/runImplementation.ts index 04af8933..b20ec3b1 100644 --- a/packages/agents-core/src/runImplementation.ts +++ b/packages/agents-core/src/runImplementation.ts @@ -1176,32 +1176,69 @@ export async function checkForFinalOutputFromTools< throw new UserError(`Invalid toolUseBehavior: ${toolUseBehavior}`, state); } +function getRunItemStreamEventName( + item: RunItem, +): RunItemStreamEventName | undefined { + if (item instanceof RunMessageOutputItem) { + return 'message_output_created'; + } + if (item instanceof RunHandoffCallItem) { + return 'handoff_requested'; + } + if (item instanceof RunHandoffOutputItem) { + return 'handoff_occurred'; + } + if (item instanceof RunToolCallItem) { + return 'tool_called'; + } + if (item instanceof RunToolCallOutputItem) { + return 'tool_output'; + } + if (item instanceof RunReasoningItem) { + return 'reasoning_item_created'; + } + if (item instanceof RunToolApprovalItem) { + return 'tool_approval_requested'; + } + return undefined; +} + +function enqueueRunItemStreamEvent( + result: StreamedRunResult, + item: RunItem, +): void { + const itemName = getRunItemStreamEventName(item); + if (!itemName) { + logger.warn('Unknown item type: ', item); + return; + } + result._addItem(new RunItemStreamEvent(itemName, item)); +} + +export function streamStepItemsToRunResult( + result: StreamedRunResult, + items: RunItem[], +): void { + // Preserve the order in which items were generated by enqueueing each one + // immediately on the streamed result. + for (const item of items) { + enqueueRunItemStreamEvent(result, item); + } +} + export function addStepToRunResult( result: StreamedRunResult, step: SingleStepResult, + options?: { skipItems?: Set }, ): void { + // skipItems contains run items that were already streamed so we avoid + // enqueueing duplicate events for the same instance. + const skippedItems = options?.skipItems; for (const item of step.newStepItems) { - let itemName: RunItemStreamEventName; - if (item instanceof RunMessageOutputItem) { - itemName = 'message_output_created'; - } else if (item instanceof RunHandoffCallItem) { - itemName = 'handoff_requested'; - } else if (item instanceof RunHandoffOutputItem) { - itemName = 'handoff_occurred'; - } else if (item instanceof RunToolCallItem) { - itemName = 'tool_called'; - } else if (item instanceof RunToolCallOutputItem) { - itemName = 'tool_output'; - } else if (item instanceof RunReasoningItem) { - itemName = 'reasoning_item_created'; - } else if (item instanceof RunToolApprovalItem) { - itemName = 'tool_approval_requested'; - } else { - logger.warn('Unknown item type: ', item); + if (skippedItems?.has(item)) { continue; } - - result._addItem(new RunItemStreamEvent(itemName, item)); + enqueueRunItemStreamEvent(result, item); } } diff --git a/packages/agents-core/test/run.stream.test.ts b/packages/agents-core/test/run.stream.test.ts index 94d0186b..3b98987e 100644 --- a/packages/agents-core/test/run.stream.test.ts +++ b/packages/agents-core/test/run.stream.test.ts @@ -1,4 +1,5 @@ -import { describe, it, expect, beforeAll } from 'vitest'; +import { describe, it, expect, beforeAll, vi } from 'vitest'; +import { z } from 'zod'; import { Agent, run, @@ -8,12 +9,14 @@ import { Usage, RunStreamEvent, RunAgentUpdatedStreamEvent, + RunItemStreamEvent, handoff, Model, ModelRequest, ModelResponse, StreamEvent, FunctionCallItem, + tool, } from '../src'; import { FakeModel, FakeModelProvider, fakeModelMessage } from './stubs'; @@ -150,7 +153,8 @@ describe('Runner.run (streaming)', () => { // Track agent_end events on both the agent and runner const agentEndEvents: Array<{ context: any; output: string }> = []; - const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = []; + const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = + []; agent.on('agent_end', (context, output) => { agentEndEvents.push({ context, output }); @@ -174,9 +178,220 @@ describe('Runner.run (streaming)', () => { // Verify agent_end was called on both agent and runner expect(agentEndEvents).toHaveLength(1); expect(agentEndEvents[0].output).toBe('Final output'); - + expect(runnerEndEvents).toHaveLength(1); expect(runnerEndEvents[0].agent).toBe(agent); expect(runnerEndEvents[0].output).toBe('Final output'); }); + + it('streams tool_called before the tool finishes executing', async () => { + let releaseTool: (() => void) | undefined; + const toolExecuted = vi.fn(); + + const blockingTool = tool({ + name: 'blocker', + description: 'blocks until released', + parameters: z.object({ value: z.string() }), + execute: async ({ value }) => { + toolExecuted(value); + await new Promise((resolve) => { + releaseTool = resolve; + }); + return `result:${value}`; + }, + }); + + const functionCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: blockingTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ value: 'test' }), + }; + + const toolResponse: ModelResponse = { + output: [functionCall], + usage: new Usage(), + }; + + const finalMessageResponse: ModelResponse = { + output: [fakeModelMessage('done')], + usage: new Usage(), + }; + + class BlockingStreamModel implements Model { + #callCount = 0; + + async getResponse(_req: ModelRequest): Promise { + return this.#callCount === 0 ? toolResponse : finalMessageResponse; + } + + async *getStreamedResponse( + _req: ModelRequest, + ): AsyncIterable { + const currentCall = this.#callCount++; + const response = + currentCall === 0 ? toolResponse : finalMessageResponse; + yield { + type: 'response_done', + response: { + id: `resp-${currentCall}`, + usage: { + requests: 1, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + output: response.output, + }, + } as any; + } + } + + const agent = new Agent({ + name: 'BlockingAgent', + model: new BlockingStreamModel(), + tools: [blockingTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'hello', { stream: true }); + const iterator = result.toStream()[Symbol.asyncIterator](); + + const collected: RunStreamEvent[] = []; + const firstRunItemPromise: Promise = (async () => { + while (true) { + const next = await iterator.next(); + if (next.done) { + throw new Error('Stream ended before emitting a run item event'); + } + collected.push(next.value); + if (next.value.type === 'run_item_stream_event') { + return next.value; + } + } + })(); + + let firstRunItemResolved = false; + void firstRunItemPromise.then(() => { + firstRunItemResolved = true; + }); + + // Allow the tool execution to start. + await new Promise((resolve) => setImmediate(resolve)); + + expect(toolExecuted).toHaveBeenCalledWith('test'); + expect(releaseTool).toBeDefined(); + expect(firstRunItemResolved).toBe(true); + + const firstRunItem = await firstRunItemPromise; + expect(firstRunItem.name).toBe('tool_called'); + + releaseTool?.(); + + while (true) { + const next = await iterator.next(); + if (next.done) { + break; + } + collected.push(next.value); + } + + await result.completed; + + const toolCalledIndex = collected.findIndex( + (event) => + event.type === 'run_item_stream_event' && event.name === 'tool_called', + ); + const toolOutputIndex = collected.findIndex( + (event) => + event.type === 'run_item_stream_event' && event.name === 'tool_output', + ); + + expect(toolCalledIndex).toBeGreaterThan(-1); + expect(toolOutputIndex).toBeGreaterThan(-1); + expect(toolCalledIndex).toBeLessThan(toolOutputIndex); + }); + + it('emits run item events in the order items are generated', async () => { + const sequenceTool = tool({ + name: 'report', + description: 'Generate a report', + parameters: z.object({}), + execute: async () => 'report ready', + }); + + const functionCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: sequenceTool.name, + callId: 'c1', + status: 'completed', + arguments: '{}', + }; + + const firstTurnResponse: ModelResponse = { + output: [fakeModelMessage('Starting work'), functionCall], + usage: new Usage(), + }; + + const secondTurnResponse: ModelResponse = { + output: [fakeModelMessage('All done')], + usage: new Usage(), + }; + + class SequencedStreamModel implements Model { + #turn = 0; + + async getResponse(_req: ModelRequest): Promise { + return this.#turn === 0 ? firstTurnResponse : secondTurnResponse; + } + + async *getStreamedResponse( + _req: ModelRequest, + ): AsyncIterable { + const response = + this.#turn === 0 ? firstTurnResponse : secondTurnResponse; + this.#turn += 1; + yield { + type: 'response_done', + response: { + id: `resp-${this.#turn}`, + usage: { + requests: 1, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + output: response.output, + }, + } as any; + } + } + + const agent = new Agent({ + name: 'SequencedAgent', + model: new SequencedStreamModel(), + tools: [sequenceTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'begin', { stream: true }); + + const itemEventNames: string[] = []; + for await (const event of result.toStream()) { + if (event.type === 'run_item_stream_event') { + itemEventNames.push(event.name); + } + } + await result.completed; + + expect(itemEventNames).toEqual([ + 'message_output_created', + 'tool_called', + 'tool_output', + 'message_output_created', + ]); + }); }); diff --git a/packages/agents-core/test/runImplementation.test.ts b/packages/agents-core/test/runImplementation.test.ts index 1d935574..e5e8e96b 100644 --- a/packages/agents-core/test/runImplementation.test.ts +++ b/packages/agents-core/test/runImplementation.test.ts @@ -27,6 +27,7 @@ import { executeComputerActions, executeHandoffCalls, executeToolsAndSideEffects, + streamStepItemsToRunResult, } from '../src/runImplementation'; import { FunctionTool, @@ -312,6 +313,63 @@ describe('addStepToRunResult', () => { 'reasoning_item_created', ]); }); + + it('does not re-emit items that were already streamed', () => { + const agent = new Agent({ name: 'StreamOnce' }); + + const toolCallItem = new ToolCallItem(TEST_MODEL_FUNCTION_CALL, agent); + const toolOutputItem = new ToolCallOutputItem( + getToolCallOutputItem(TEST_MODEL_FUNCTION_CALL, 'ok'), + agent, + 'ok', + ); + + const step: any = { + newStepItems: [toolCallItem, toolOutputItem], + }; + + const streamedResult = new StreamedRunResult(); + const captured: string[] = []; + (streamedResult as any)._addItem = (evt: any) => captured.push(evt.name); + + const alreadyStreamed = new Set([toolCallItem]); + streamStepItemsToRunResult(streamedResult, [toolCallItem]); + addStepToRunResult(streamedResult, step, { skipItems: alreadyStreamed }); + + expect(captured).toEqual(['tool_called', 'tool_output']); + }); + + it('maintains event order when mixing pre-streamed and step items', () => { + const agent = new Agent({ name: 'OrderedStream' }); + + const messageItem = new MessageOutputItem(TEST_MODEL_MESSAGE, agent); + const toolCallItem = new ToolCallItem(TEST_MODEL_FUNCTION_CALL, agent); + const toolOutputItem = new ToolCallOutputItem( + getToolCallOutputItem(TEST_MODEL_FUNCTION_CALL, 'done'), + agent, + 'done', + ); + + const step: any = { + newStepItems: [messageItem, toolCallItem, toolOutputItem], + }; + + const streamedResult = new StreamedRunResult(); + const captured: string[] = []; + (streamedResult as any)._addItem = (evt: any) => captured.push(evt.name); + + const preStreamed = new Set([messageItem, toolCallItem]); + // Simulate the streaming loop emitting early items and then the step emitter + // flushing the remainder without duplicating the first two events. + streamStepItemsToRunResult(streamedResult, [messageItem, toolCallItem]); + addStepToRunResult(streamedResult, step, { skipItems: preStreamed }); + + expect(captured).toEqual([ + 'message_output_created', + 'tool_called', + 'tool_output', + ]); + }); }); // Additional tests for AgentToolUseTracker and executeComputerActions