diff --git a/examples/agent-patterns/stream-agent-tools.ts b/examples/agent-patterns/stream-agent-tools.ts new file mode 100644 index 00000000..423a5aeb --- /dev/null +++ b/examples/agent-patterns/stream-agent-tools.ts @@ -0,0 +1,77 @@ +import { Agent, Runner, tool } from '@openai/agents'; +import { z } from 'zod'; + +const getWeatherTool = tool({ + name: 'get_weather', + description: 'Get the weather for a given city', + parameters: z.object({ + city: z.string(), + }), + execute: async (input) => { + return `The weather in ${input.city} is sunny`; + }, +}); + +const weatherAgent = new Agent({ + name: 'Weather Agent', + tools: [getWeatherTool], +}); + +const getLocalNewsTool = tool({ + name: 'get_local_news', + description: 'Get the local news for today', + parameters: z.object({ + city: z.string(), + }), + execute: async (input) => { + return `Big news in ${input.city} today: famous local cat won Guinness World Record for loudest purr!`; + }, +}); + +const newsAgent = new Agent({ + name: 'News Agent', + instructions: 'You are a news agent that can tell the news for a given city.', + tools: [getLocalNewsTool], +}); + +const personalAgent = new Agent({ + name: 'Personal Agent', + instructions: + 'You are a personal agent that prepares a user for the day. You can use the news agent to get the news for the day, and the weather agent to get the weather for the day.', + tools: [ + newsAgent.asTool({ + toolName: 'news_agent', + toolDescription: 'Get the local news for today', + }), + weatherAgent.asTool({ + toolName: 'weather_agent', + toolDescription: 'Get the weather for today', + }), + ], +}); + +const runner = new Runner({ + model: 'gpt-4.1-mini', + tracingDisabled: true, +}); + +async function main() { + const streamedRunResult = await runner.run( + personalAgent, + "What's up in Beijing today?", + { + stream: true, + // enable streaming of agent as tool events in the context scope stream + streamAgentTools: true, + }, + ); + + for await (const event of streamedRunResult) { + console.log(JSON.stringify(event)); + } +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); diff --git a/packages/agents-core/src/agent.ts b/packages/agents-core/src/agent.ts index fbd73608..de7efd0d 100644 --- a/packages/agents-core/src/agent.ts +++ b/packages/agents-core/src/agent.ts @@ -321,7 +321,7 @@ export type AgentConfigWithHandoffs< >; // The parameter type fo needApproval function for the tool created by Agent.asTool() method -const AgentAsToolNeedApprovalSchame = z.object({ input: z.string() }); +const AgentAsToolNeedApprovalSchema = z.object({ input: z.string() }); /** * The class representing an AI agent configured with instructions, tools, guardrails, handoffs and more. @@ -519,7 +519,7 @@ export class Agent< */ needsApproval?: | boolean - | ToolApprovalFunction; + | ToolApprovalFunction; /** * Run configuration for initializing the internal agent runner. */ @@ -538,7 +538,7 @@ export class Agent< agent: Agent; }) => boolean | Promise); }, - ): FunctionTool { + ): FunctionTool { const { toolName, toolDescription, @@ -551,7 +551,7 @@ export class Agent< return tool({ name: toolName ?? toFunctionToolName(this.name), description: toolDescription ?? '', - parameters: AgentAsToolNeedApprovalSchame, + parameters: AgentAsToolNeedApprovalSchema, strict: true, needsApproval, isEnabled, @@ -560,11 +560,24 @@ export class Agent< throw new ModelBehaviorError('Agent tool called with invalid input'); } const runner = new Runner(runConfig ?? {}); - const result = await runner.run(this, data.input, { - context, - ...(runOptions ?? {}), - }); - const completedResult = result as CompletedRunResult; + + let completedResult: CompletedRunResult; + if (context?._copyToContextScopeStream) { + const result = await runner.run(this, data.input, { + context, + ...(runOptions ?? {}), + stream: true, + streamAgentTools: false, // already set in top level runner; no need to set it here again + }); + await result.completed; + completedResult = result as CompletedRunResult; + } else { + const result = await runner.run(this, data.input, { + context, + ...(runOptions ?? {}), + }); + completedResult = result as CompletedRunResult; + } const usesStopAtToolNames = typeof this.toolUseBehavior === 'object' && diff --git a/packages/agents-core/src/events.ts b/packages/agents-core/src/events.ts index 238219d7..c6d45ecc 100644 --- a/packages/agents-core/src/events.ts +++ b/packages/agents-core/src/events.ts @@ -15,7 +15,10 @@ export class RunRawModelStreamEvent { /** * @param data The raw responses stream events from the LLM. */ - constructor(public data: ResponseStreamEvent) {} + constructor( + public data: ResponseStreamEvent, + public agentName: string, + ) {} } /** diff --git a/packages/agents-core/src/result.ts b/packages/agents-core/src/result.ts index b06b1df6..21b8e17d 100644 --- a/packages/agents-core/src/result.ts +++ b/packages/agents-core/src/result.ts @@ -244,11 +244,13 @@ export class StreamedRunResult< #completedPromiseReject: ((err: unknown) => void) | undefined; #cancelled: boolean = false; #streamLoopPromise: Promise | undefined; + #contextScopeStreamOwner: boolean | undefined; constructor( result: { state: RunState; signal?: AbortSignal; + streamAgentTools?: boolean; } = {} as any, ) { super(result.state); @@ -269,6 +271,29 @@ export class StreamedRunResult< this.#completedPromiseReject = reject; }); + if (result.streamAgentTools) { + if (this.state._context._copyToContextScopeStream) { + logger.warn( + 'A context scope stream has already been created by another runner instance. Reusing it.', + ); + } else { + logger.debug( + 'Creating new context scope stream; all subsequent events will be copied to it.', + ); + this.state._context._contextScopeStream = + new _ReadableStream({ + start: (controller) => { + this.state._context._contextScopeStreamController = controller; + this.#contextScopeStreamOwner = true; + this.state._context._copyToContextScopeStream = true; + }, + cancel: () => { + this.#cancelled = true; + }, + }); + } + } + if (this.#signal) { const handleAbort = () => { if (this.#cancelled) { @@ -296,6 +321,39 @@ export class StreamedRunResult< }); } + if (this.#contextScopeStreamOwner) { + const controller = this.state._context._contextScopeStreamController; + this.state._context._contextScopeStreamController = undefined; + + if (this.state._context._contextScopeStream?.locked) { + if (controller) { + try { + controller.close(); + } catch (err) { + logger.debug( + `Failed to close readable stream on abort: ${err}`, + ); + } + } + } else { + if (this.state._context._contextScopeStream) { + void this.state._context._contextScopeStream + .cancel(this.#signal?.reason) + .catch((err) => { + logger.debug( + `Failed to cancel context scope stream on abort: ${err}`, + ); + }); + } + } + // Clean up context scope stream state on abort + // Clear the stream since it's been canceled and can't be read + this.state._context._contextScopeStream = undefined; + this.state._context._contextScopeStreamController = undefined; + this.state._context._copyToContextScopeStream = false; + this.#contextScopeStreamOwner = undefined; + } + this.#completedPromiseResolve?.(); }; @@ -314,6 +372,9 @@ export class StreamedRunResult< _addItem(item: RunStreamEvent) { if (!this.cancelled) { this.#readableController?.enqueue(item); + if (this.state._context._copyToContextScopeStream) { + this.state._context._contextScopeStreamController?.enqueue(item); + } } } @@ -325,6 +386,18 @@ export class StreamedRunResult< if (!this.cancelled && this.#readableController) { this.#readableController.close(); this.#readableController = undefined; + if ( + this.state._context._contextScopeStreamController && + this.#contextScopeStreamOwner + ) { + this.state._context._contextScopeStreamController.close(); + this.state._context._contextScopeStreamController = undefined; + // Clean up context scope stream state on completion + // Don't clear _contextScopeStream - it may still be consumed by toStream() or async iteration + // The stream will be garbage collected when no longer referenced + this.state._context._copyToContextScopeStream = false; + this.#contextScopeStreamOwner = undefined; + } this.#completedPromiseResolve?.(); } } @@ -338,6 +411,19 @@ export class StreamedRunResult< this.#readableController.error(err); this.#readableController = undefined; } + if ( + !this.cancelled && + this.state._context._contextScopeStreamController && + this.#contextScopeStreamOwner + ) { + this.state._context._contextScopeStreamController.error(err); + this.state._context._contextScopeStreamController = undefined; + // Clean up context scope stream state on error + // Clear the stream since it's been errored and can't be read + this.state._context._contextScopeStream = undefined; + this.state._context._copyToContextScopeStream = false; + this.#contextScopeStreamOwner = undefined; + } this.#error = err; this.#completedPromiseReject?.(err); this.#completedPromise.catch((e) => { @@ -357,6 +443,13 @@ export class StreamedRunResult< * @returns A readable stream of the agent run. */ toStream(): ReadableStream { + if ( + this.state._context._contextScopeStream && + this.#contextScopeStreamOwner + ) { + return this.state._context + ._contextScopeStream as ReadableStream; + } return this.#readableStream as ReadableStream; } @@ -390,7 +483,15 @@ export class StreamedRunResult< toTextStream( options: { compatibleWithNodeStreams?: boolean } = {}, ): Readable | ReadableStream { - const stream = this.#readableStream.pipeThrough( + let stream = this.#readableStream; + if ( + this.state._context._contextScopeStream && + this.#contextScopeStreamOwner + ) { + stream = this.state._context._contextScopeStream; + } + + const textStream = stream.pipeThrough( new TransformStream({ transform(event, controller) { if ( @@ -405,13 +506,19 @@ export class StreamedRunResult< ); if (options.compatibleWithNodeStreams) { - return Readable.fromWeb(stream); + return Readable.fromWeb(textStream); } - return stream as ReadableStream; + return textStream as ReadableStream; } [Symbol.asyncIterator](): AsyncIterator { + if ( + this.state._context._contextScopeStream && + this.#contextScopeStreamOwner + ) { + return this.state._context._contextScopeStream[Symbol.asyncIterator](); + } return this.#readableStream[Symbol.asyncIterator](); } diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 91cc25b3..a1b37f38 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -186,6 +186,10 @@ export type StreamRunOptions = * Whether to stream the run. If true, the run will emit events as the model responds. */ stream: true; + /** + * Whether to enable recursive streaming for agent as tool calls. If true, all events from agents invoked as tools will appear in the top-level stream result. + */ + streamAgentTools?: boolean; }; /** @@ -1147,7 +1151,9 @@ export class Runner extends RunHooks> { // this loop to prevent internal false errors and unnecessary processing return; } - result._addItem(new RunRawModelStreamEvent(event)); + result._addItem( + new RunRawModelStreamEvent(event, currentAgent.name), + ); } if (parallelGuardrailPromise) { @@ -1320,8 +1326,9 @@ export class Runner extends RunHooks> { // Initialize the streamed result with existing state const result = new StreamedRunResult({ - signal: options.signal, state, + signal: options.signal, + streamAgentTools: options.streamAgentTools, }); // Setup defaults diff --git a/packages/agents-core/src/runContext.ts b/packages/agents-core/src/runContext.ts index 166ff5b5..cf4412f5 100644 --- a/packages/agents-core/src/runContext.ts +++ b/packages/agents-core/src/runContext.ts @@ -1,7 +1,12 @@ +import { RunStreamEvent } from './events'; import { RunToolApprovalItem } from './items'; import logger from './logger'; import { UnknownContext } from './types'; import { Usage } from './usage'; +import { + ReadableStreamController, + ReadableStream as _ReadableStream, +} from '@openai/agents-core/_shims'; type ApprovalRecord = { approved: boolean | string[]; @@ -28,6 +33,10 @@ export class RunContext { */ #approvals: Map; + _copyToContextScopeStream?: boolean; + _contextScopeStream?: _ReadableStream; + _contextScopeStreamController?: ReadableStreamController; + constructor(context: TContext = {} as TContext) { this.context = context; this.usage = new Usage(); diff --git a/packages/agents-core/src/utils/tools.ts b/packages/agents-core/src/utils/tools.ts index b56ce971..88b6d70e 100644 --- a/packages/agents-core/src/utils/tools.ts +++ b/packages/agents-core/src/utils/tools.ts @@ -63,15 +63,11 @@ function buildJsonSchemaFromZod( } /** - * Convert a string to a function tool name by replacing spaces with underscores and - * non-alphanumeric characters with underscores. + * Convert a string to a function tool name by replacing non-alphanumeric characters with underscores. * @param name - The name of the tool. * @returns The function tool name. */ export function toFunctionToolName(name: string): FunctionToolName { - // Replace spaces with underscores - name = name.replace(/\s/g, '_'); - // Replace non-alphanumeric characters with underscores name = name.replace(/[^a-zA-Z0-9]/g, '_'); diff --git a/packages/agents-core/test/agent.test.ts b/packages/agents-core/test/agent.test.ts index 08318e08..a424756a 100644 --- a/packages/agents-core/test/agent.test.ts +++ b/packages/agents-core/test/agent.test.ts @@ -290,6 +290,68 @@ describe('Agent', () => { ); }); + it('agent as tool runs in streaming mode when context scope stream is active', async () => { + const agent = new Agent({ + name: 'Streaming Agent', + instructions: 'You stream intermediate results.', + }); + const mockResult = {} as any; + const runSpy = vi + .spyOn(Runner.prototype, 'run') + .mockImplementation(async () => mockResult); + + const tool = agent.asTool({ + toolDescription: 'streaming tool', + }); + + const context = new RunContext(); + context._copyToContextScopeStream = true; + + await tool.invoke( + context, + JSON.stringify({ input: 'run agent as tool stream mode' }), + ); + + expect(runSpy).toHaveBeenCalledTimes(1); + const [calledAgent, calledInput, calledOptions] = runSpy.mock.calls[0]; + expect(calledAgent).toBe(agent); + expect(calledInput).toBe('run agent as tool stream mode'); + expect(calledOptions).toMatchObject({ + context, + stream: true, + streamAgentTools: false, + }); + }); + + it('agent as tool runs in non-streaming mode when context scope stream is not active', async () => { + const agent = new Agent({ + name: 'NonStreaming Agent', + instructions: 'You return the final result.', + }); + const mockResult = {} as any; + const runSpy = vi + .spyOn(Runner.prototype, 'run') + .mockImplementation(async () => mockResult); + + const tool = agent.asTool({ + toolDescription: 'standard tool', + }); + const context = new RunContext(); + + await tool.invoke( + context, + JSON.stringify({ input: 'run agent as tool non-streaming mode' }), + ); + + expect(runSpy).toHaveBeenCalledTimes(1); + const [calledAgent, calledInput, calledOptions] = runSpy.mock.calls[0]; + expect(calledAgent).toBe(agent); + expect(calledInput).toBe('run agent as tool non-streaming mode'); + expect(calledOptions).toMatchObject({ + context, + }); + }); + it('filters tools using isEnabled predicates', async () => { const conditionalTool = tool({ name: 'conditional', diff --git a/packages/agents-core/test/result.test.ts b/packages/agents-core/test/result.test.ts index 8f213147..b1945c73 100644 --- a/packages/agents-core/test/result.test.ts +++ b/packages/agents-core/test/result.test.ts @@ -5,6 +5,7 @@ import { Agent } from '../src/agent'; import { RunContext } from '../src/runContext'; import { RunRawModelStreamEvent } from '../src/events'; import logger from '../src/logger'; +import type { StreamEventTextStream } from '../src/types/protocol'; const agent = new Agent({ name: 'A' }); @@ -34,11 +35,18 @@ describe('StreamedRunResult', () => { it('collects streamed text', async () => { const state = createState(); const sr = new StreamedRunResult({ state }); + const agentName = 'test'; sr._addItem( - new RunRawModelStreamEvent({ type: 'output_text_delta', delta: 'he' }), + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'he' }, + agentName, + ), ); sr._addItem( - new RunRawModelStreamEvent({ type: 'output_text_delta', delta: 'llo' }), + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'llo' }, + agentName, + ), ); sr._done(); @@ -82,4 +90,142 @@ describe('StreamedRunResult', () => { expect(sr.cancelled).toBe(true); expect(sr.error).toBe(null); }); + + it('routes toStream through the context scope stream when streamAgentTools is enabled', async () => { + const state = createState(); + const result = new StreamedRunResult({ + state: state, + streamAgentTools: true, + }); + const contextScopeStream = state._context._contextScopeStream; + expect(contextScopeStream).toBeDefined(); + const vs = result.toStream().values(); + result._addItem( + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'hel' }, + 'Agent', + ), + ); + result._addItem( + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'lo' }, + 'Agent', + ), + ); + result._done(); + + let text = ''; + for await (const value of vs) { + expect(value.type).toBe('raw_model_stream_event'); + const rawModelEvent = value as RunRawModelStreamEvent; + expect(rawModelEvent.data.type).toBe('output_text_delta'); + const data = rawModelEvent.data as StreamEventTextStream; + text += data.delta ?? ''; + } + await result.completed; + expect(text).toBe('hello'); + }); + + it('routes toTextStream through the context scope stream when streamAgentTools is enabled', async () => { + const state = createState(); + const result = new StreamedRunResult({ + state: state, + streamAgentTools: true, + }); + const contextScopeStream = state._context._contextScopeStream; + const textStream = result.toTextStream(); + expect(contextScopeStream).toBeDefined(); + result._addItem( + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'hel' }, + 'Agent', + ), + ); + result._addItem( + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'lo' }, + 'Agent', + ), + ); + result._done(); + + const textIterator = textStream.values(); + let text = ''; + for await (const value of textIterator) { + text += value; + } + await result.completed; + expect(text).toBe('hello'); + }); + + it('routes async iteration through the context scope stream when streamAgentTools is enabled', async () => { + const state = createState(); + const result = new StreamedRunResult({ + state: state, + streamAgentTools: true, + }); + const contextScopeStream = state._context._contextScopeStream; + expect(contextScopeStream).toBeDefined(); + + result._addItem( + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'hel' }, + 'Agent', + ), + ); + result._addItem( + new RunRawModelStreamEvent( + { type: 'output_text_delta', delta: 'lo' }, + 'Agent', + ), + ); + result._done(); + + let text = ''; + for await (const event of result) { + expect(event.type).toBe('raw_model_stream_event'); + const rawModelEvent = event as RunRawModelStreamEvent; + const data = rawModelEvent.data as StreamEventTextStream; + text += data.delta ?? ''; + } + await result.completed; + expect(text).toBe('hello'); + }); + + it('clears context scope stream metadata on completion', async () => { + const state = createState(); + const sr = new StreamedRunResult({ state, streamAgentTools: true }); + expect(state._context._copyToContextScopeStream).toBe(true); + sr._done(); + await sr.completed; + expect(state._context._copyToContextScopeStream).toBe(false); + expect(state._context._contextScopeStream).toBeDefined(); + }); + + it('clears context scope stream metadata on error', async () => { + const errorState = createState(); + const srError = new StreamedRunResult({ + state: errorState, + streamAgentTools: true, + }); + const err = new Error('agg-error'); + srError._raiseError(err); + await expect(srError.completed).rejects.toBe(err); + expect(errorState._context._copyToContextScopeStream).toBe(false); + expect(errorState._context._contextScopeStream).toBeUndefined(); + }); + + it('clears context scope stream metadata on abort', async () => { + const abortState = createState(); + const controller = new AbortController(); + const srAbort = new StreamedRunResult({ + state: abortState, + streamAgentTools: true, + signal: controller.signal, + }); + controller.abort(); + await srAbort.completed; + expect(abortState._context._copyToContextScopeStream).toBe(false); + expect(abortState._context._contextScopeStream).toBeUndefined(); + }); }); diff --git a/packages/agents-core/test/run.stream.test.ts b/packages/agents-core/test/run.stream.test.ts index ad2d9c0e..f7cbdc6d 100644 --- a/packages/agents-core/test/run.stream.test.ts +++ b/packages/agents-core/test/run.stream.test.ts @@ -1034,6 +1034,757 @@ describe('Runner.run (streaming)', () => { expect(result.inputGuardrailResults).toHaveLength(1); expect(guardrail.execute).toHaveBeenCalledTimes(1); }); + + describe('streamAgentTools', () => { + class MultiTurnStreamingModel implements Model { + #callCount = 0; + constructor(private responses: ModelResponse[]) {} + async getResponse(_req: ModelRequest): Promise { + const response = this.responses[this.#callCount]; + if (!response) { + throw new Error('No response configured'); + } + return response; + } + async *getStreamedResponse( + _req: ModelRequest, + ): AsyncIterable { + const response = this.responses[this.#callCount++]; + if (!response) { + throw new Error('No response configured'); + } + yield { + type: 'response_done', + response: { + id: `resp-${this.#callCount}`, + usage: { + requests: 1, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + output: response.output, + }, + } as any; + } + } + + it('by default only streams main agent events', async () => { + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B output')], + usage: new Usage(), + }), + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: false, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + } + await result.completed; + + // Should only see AgentA events, not AgentB events + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + if (event.agentName) { + agentNames.add(event.agentName); + } + } else if (event.type === 'agent_updated_stream_event') { + if (event.agent?.name) { + agentNames.add(event.agent.name); + } + } else if (event.type === 'run_item_stream_event') { + const item = event.item; + if ('agent' in item && item.agent?.name) { + agentNames.add(item.agent.name); + } else if ('sourceAgent' in item && item.sourceAgent?.name) { + agentNames.add(item.sourceAgent.name); + } else if ('targetAgent' in item && item.targetAgent?.name) { + agentNames.add(item.targetAgent.name); + } + } + }); + + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB')).toBe(false); + }); + + it('streams agent-as-tool events when streamAgentTools is enabled', async () => { + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B output')], + usage: new Usage(), + }), + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + } + await result.completed; + + // Should see both AgentA and AgentB events + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + if (event.agentName) { + agentNames.add(event.agentName); + } + } else if (event.type === 'agent_updated_stream_event') { + if (event.agent?.name) { + agentNames.add(event.agent.name); + } + } else if (event.type === 'run_item_stream_event') { + const item = event.item; + if ('agent' in item && item.agent?.name) { + agentNames.add(item.agent.name); + } else if ('sourceAgent' in item && item.sourceAgent?.name) { + agentNames.add(item.sourceAgent.name); + } else if ('targetAgent' in item && item.targetAgent?.name) { + agentNames.add(item.targetAgent.name); + } + } + }); + + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB')).toBe(true); + }); + + it('streams handoff agent events when streamAgentTools is enabled', async () => { + const agentC = new Agent({ + name: 'AgentC', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('C output')], + usage: new Usage(), + }), + }); + + const agentBHandoffCall: FunctionCallItem = { + id: 'handoff-1', + type: 'function_call', + name: handoff(agentC).toolName, + callId: 'h1', + status: 'completed', + arguments: '{}', + }; + + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [agentBHandoffCall], + usage: new Usage(), + }), + handoffs: [handoff(agentC)], + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + } + await result.completed; + + // Should see AgentA, AgentB, and AgentC events + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + if (event.agentName) { + agentNames.add(event.agentName); + } + } else if (event.type === 'agent_updated_stream_event') { + if (event.agent?.name) { + agentNames.add(event.agent.name); + } + } else if (event.type === 'run_item_stream_event') { + const item = event.item; + if ('agent' in item && item.agent?.name) { + agentNames.add(item.agent.name); + } else if ('sourceAgent' in item && item.sourceAgent?.name) { + agentNames.add(item.sourceAgent.name); + } else if ('targetAgent' in item && item.targetAgent?.name) { + agentNames.add(item.targetAgent.name); + } + } + }); + + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB')).toBe(true); + expect(agentNames.has('AgentC')).toBe(true); + }); + + it('works with nested agent-as-tool calls', async () => { + const agentC = new Agent({ + name: 'AgentC', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('C output')], + usage: new Usage(), + }), + }); + + const agentCAsTool = agentC.asTool({ toolDescription: 'Agent C tool' }); + const agentBToolCall: FunctionCallItem = { + id: 'call-2', + type: 'function_call', + name: agentCAsTool.name, + callId: 'c2', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentB = new Agent({ + name: 'AgentB', + model: new MultiTurnStreamingModel([ + { + output: [agentBToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('B output')], + usage: new Usage(), + }, + ]), + tools: [agentCAsTool], + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + } + await result.completed; + + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + if (event.agentName) { + agentNames.add(event.agentName); + } + } else if (event.type === 'agent_updated_stream_event') { + if (event.agent?.name) { + agentNames.add(event.agent.name); + } + } else if (event.type === 'run_item_stream_event') { + const item = event.item; + if ('agent' in item && item.agent?.name) { + agentNames.add(item.agent.name); + } else if ('sourceAgent' in item && item.sourceAgent?.name) { + agentNames.add(item.sourceAgent.name); + } else if ('targetAgent' in item && item.targetAgent?.name) { + agentNames.add(item.targetAgent.name); + } + } + }); + + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB')).toBe(true); + expect(agentNames.has('AgentC')).toBe(true); + }); + + it('works with parallel agent-as-tool calls', async () => { + const agentB1 = new Agent({ + name: 'AgentB1', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B1 output')], + usage: new Usage(), + }), + }); + + const agentB2 = new Agent({ + name: 'AgentB2', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B2 output')], + usage: new Usage(), + }), + }); + + const agentB1AsTool = agentB1.asTool({ + toolDescription: 'Agent B1 tool', + }); + const agentB2AsTool = agentB2.asTool({ + toolDescription: 'Agent B2 tool', + }); + const agentAToolCall1: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentB1AsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test1' }), + }; + + const agentAToolCall2: FunctionCallItem = { + id: 'call-2', + type: 'function_call', + name: agentB2AsTool.name, + callId: 'c2', + status: 'completed', + arguments: JSON.stringify({ input: 'test2' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall1, agentAToolCall2], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentB1AsTool, agentB2AsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + } + await result.completed; + + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + if (event.agentName) { + agentNames.add(event.agentName); + } + } else if (event.type === 'agent_updated_stream_event') { + if (event.agent?.name) { + agentNames.add(event.agent.name); + } + } else if (event.type === 'run_item_stream_event') { + const item = event.item; + if ('agent' in item && item.agent?.name) { + agentNames.add(item.agent.name); + } else if ('sourceAgent' in item && item.sourceAgent?.name) { + agentNames.add(item.sourceAgent.name); + } else if ('targetAgent' in item && item.targetAgent?.name) { + agentNames.add(item.targetAgent.name); + } + } + }); + + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB1')).toBe(true); + expect(agentNames.has('AgentB2')).toBe(true); + }); + + it('includes all event types (raw, agent_updated, run_item)', async () => { + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B output')], + usage: new Usage(), + }), + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const eventTypes = new Set(); + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + eventTypes.add(event.type); + } + await result.completed; + + // Should include raw_model_stream_event, run_item_stream_event + // agent_updated_stream_event may or may not appear depending on handoffs + expect(eventTypes.has('raw_model_stream_event')).toBe(true); + expect(eventTypes.has('run_item_stream_event')).toBe(true); + expect(events.length).toBeGreaterThan(0); + }); + + it('works with toStream() method', async () => { + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B output')], + usage: new Usage(), + }), + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result.toStream()) { + events.push(event); + } + await result.completed; + + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + agentNames.add(event.agentName); + } + }); + + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB')).toBe(true); + }); + + it('works with toTextStream() method', async () => { + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('B output')], + usage: new Usage(), + }), + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + // Consume the text stream - it should complete without errors + // Note: toTextStream() only extracts text from output_text_delta events, + // so with our ImmediateStreamingModel that emits response_done events, + // the stream will be empty but should still complete successfully + const chunks: string[] = []; + for await (const chunk of result.toTextStream()) { + chunks.push(chunk); + } + await result.completed; + + // Verify the stream completed and the result is valid + expect(result.finalOutput).toBeDefined(); + expect(result.error).toBe(null); + }); + + it('handles complex scenario with handoff and nested agent-as-tool', async () => { + const agentD = new Agent({ + name: 'AgentD', + model: new ImmediateStreamingModel({ + output: [fakeModelMessage('D output')], + usage: new Usage(), + }), + }); + + const agentDAsTool = agentD.asTool({ toolDescription: 'Agent D tool' }); + const agentCToolCall: FunctionCallItem = { + id: 'call-3', + type: 'function_call', + name: agentDAsTool.name, + callId: 'c3', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentC = new Agent({ + name: 'AgentC', + model: new MultiTurnStreamingModel([ + { + output: [agentCToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('C output')], + usage: new Usage(), + }, + ]), + tools: [agentDAsTool], + }); + + const agentBHandoffCall: FunctionCallItem = { + id: 'handoff-1', + type: 'function_call', + name: handoff(agentC).toolName, + callId: 'h1', + status: 'completed', + arguments: '{}', + }; + + const agentB = new Agent({ + name: 'AgentB', + model: new ImmediateStreamingModel({ + output: [agentBHandoffCall], + usage: new Usage(), + }), + handoffs: [handoff(agentC)], + }); + + const agentBAsTool = agentB.asTool({ toolDescription: 'Agent B tool' }); + const agentAToolCall: FunctionCallItem = { + id: 'call-1', + type: 'function_call', + name: agentBAsTool.name, + callId: 'c1', + status: 'completed', + arguments: JSON.stringify({ input: 'test' }), + }; + + const agentA = new Agent({ + name: 'AgentA', + model: new MultiTurnStreamingModel([ + { + output: [agentAToolCall], + usage: new Usage(), + }, + { + output: [fakeModelMessage('A output')], + usage: new Usage(), + }, + ]), + tools: [agentBAsTool], + }); + + const runner = new Runner(); + const result = await runner.run(agentA, 'test', { + stream: true, + streamAgentTools: true, + }); + + const events: RunStreamEvent[] = []; + for await (const event of result) { + events.push(event); + } + await result.completed; + + const agentNames = new Set(); + events.forEach((event) => { + if (event.type === 'raw_model_stream_event') { + if (event.agentName) { + agentNames.add(event.agentName); + } + } else if (event.type === 'agent_updated_stream_event') { + if (event.agent?.name) { + agentNames.add(event.agent.name); + } + } else if (event.type === 'run_item_stream_event') { + const item = event.item; + if ('agent' in item && item.agent?.name) { + agentNames.add(item.agent.name); + } else if ('sourceAgent' in item && item.sourceAgent?.name) { + agentNames.add(item.sourceAgent.name); + } else if ('targetAgent' in item && item.targetAgent?.name) { + agentNames.add(item.targetAgent.name); + } + } + }); + + // Should see all agents: A -> B (handoff) -> C -> D (nested tool) + expect(agentNames.has('AgentA')).toBe(true); + expect(agentNames.has('AgentB')).toBe(true); + expect(agentNames.has('AgentC')).toBe(true); + expect(agentNames.has('AgentD')).toBe(true); + }); + }); }); class ImmediateStreamingModel implements Model {