Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions examples/agent-patterns/stream-agent-tools.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Agent, Runner, tool } from '@openai/agents';
import { z } from 'zod';

const getWeatherTool = tool({
name: 'get_weather',
description: 'Get the weather for a given city',
parameters: z.object({
city: z.string(),
}),
execute: async (input) => {
return `The weather in ${input.city} is sunny`;
},
});

const weatherAgent = new Agent({
name: 'Weather Agent',
tools: [getWeatherTool],
});

const getLocalNewsTool = tool({
name: 'get_local_news',
description: 'Get the local news for today',
parameters: z.object({
city: z.string(),
}),
execute: async (input) => {
return `Big news in ${input.city} today: famous local cat won Guinness World Record for loudest purr!`;
},
});

const newsAgent = new Agent({
name: 'News Agent',
instructions: 'You are a news agent that can tell the news for a given city.',
tools: [getLocalNewsTool],
});

const personalAgent = new Agent({
name: 'Personal Agent',
instructions:
'You are a personal agent that prepares a user for the day. You can use the news agent to get the news for the day, and the weather agent to get the weather for the day.',
tools: [
newsAgent.asTool({
toolName: 'news_agent',
toolDescription: 'Get the local news for today',
}),
weatherAgent.asTool({
toolName: 'weather_agent',
toolDescription: 'Get the weather for today',
}),
],
});

const runner = new Runner({
model: 'gpt-4.1-mini',
tracingDisabled: true,
});

async function main() {
const streamedRunResult = await runner.run(
personalAgent,
"What's up in Beijing today?",
{
stream: true,
// enable streaming of agent as tool events in the context scope stream
streamAgentTools: true,
},
);

for await (const event of streamedRunResult) {
console.log(JSON.stringify(event));
}
}

main().catch((error) => {
console.error(error);
process.exit(1);
});
31 changes: 22 additions & 9 deletions packages/agents-core/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export type AgentConfigWithHandoffs<
>;

// The parameter type fo needApproval function for the tool created by Agent.asTool() method
const AgentAsToolNeedApprovalSchame = z.object({ input: z.string() });
const AgentAsToolNeedApprovalSchema = z.object({ input: z.string() });

/**
* The class representing an AI agent configured with instructions, tools, guardrails, handoffs and more.
Expand Down Expand Up @@ -519,7 +519,7 @@ export class Agent<
*/
needsApproval?:
| boolean
| ToolApprovalFunction<typeof AgentAsToolNeedApprovalSchame>;
| ToolApprovalFunction<typeof AgentAsToolNeedApprovalSchema>;
/**
* Run configuration for initializing the internal agent runner.
*/
Expand All @@ -538,7 +538,7 @@ export class Agent<
agent: Agent<TContext, TOutput>;
}) => boolean | Promise<boolean>);
},
): FunctionTool<TContext, typeof AgentAsToolNeedApprovalSchame> {
): FunctionTool<TContext, typeof AgentAsToolNeedApprovalSchema> {
const {
toolName,
toolDescription,
Expand All @@ -551,7 +551,7 @@ export class Agent<
return tool({
name: toolName ?? toFunctionToolName(this.name),
description: toolDescription ?? '',
parameters: AgentAsToolNeedApprovalSchame,
parameters: AgentAsToolNeedApprovalSchema,
strict: true,
needsApproval,
isEnabled,
Expand All @@ -560,11 +560,24 @@ export class Agent<
throw new ModelBehaviorError('Agent tool called with invalid input');
}
const runner = new Runner(runConfig ?? {});
const result = await runner.run(this, data.input, {
context,
...(runOptions ?? {}),
});
const completedResult = result as CompletedRunResult<TContext, TAgent>;

let completedResult: CompletedRunResult<TContext, TAgent>;
if (context?._copyToContextScopeStream) {
const result = await runner.run(this, data.input, {
context,
...(runOptions ?? {}),
stream: true,
streamAgentTools: false, // already set in top level runner; no need to set it here again
});
await result.completed;
completedResult = result as CompletedRunResult<TContext, TAgent>;
} else {
const result = await runner.run(this, data.input, {
context,
...(runOptions ?? {}),
});
completedResult = result as CompletedRunResult<TContext, TAgent>;
}

const usesStopAtToolNames =
typeof this.toolUseBehavior === 'object' &&
Expand Down
5 changes: 4 additions & 1 deletion packages/agents-core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ export class RunRawModelStreamEvent {
/**
* @param data The raw responses stream events from the LLM.
*/
constructor(public data: ResponseStreamEvent) {}
constructor(
public data: ResponseStreamEvent,
public agentName: string,
) {}
}

/**
Expand Down
111 changes: 108 additions & 3 deletions packages/agents-core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,13 @@ export class StreamedRunResult<
#completedPromiseReject: ((err: unknown) => void) | undefined;
#cancelled: boolean = false;
#streamLoopPromise: Promise<void> | undefined;
#contextScopeStreamOwner: boolean | undefined;

constructor(
result: {
state: RunState<TContext, TAgent>;
signal?: AbortSignal;
streamAgentTools?: boolean;
} = {} as any,
) {
super(result.state);
Expand All @@ -269,6 +271,27 @@ export class StreamedRunResult<
this.#completedPromiseReject = reject;
});

if (result.streamAgentTools) {
if (this.state._context._copyToContextScopeStream) {
logger.warn(
'A context scope stream has already been created by another runner instance. Reusing it.',
);
} else {
logger.debug(
'Creating new context scope stream; all subsequent events will be copied to it.',
);
this.state._context._contextScopeStream =
new _ReadableStream<RunStreamEvent>({
start: (controller) => {
this.state._context._contextScopeStreamController = controller;
this.#contextScopeStreamOwner = true;
this.state._context._copyToContextScopeStream = true;
},
cancel: () => {},
});
Comment on lines 287 to 293

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate cancellation for context scope stream

When streamAgentTools is enabled, the async iterator returned from StreamedRunResult is the context-scope stream created here, but its cancel handler is a no-op and never marks the result as cancelled. If a caller stops consuming the stream (e.g., breaks out of for await), the context stream is cancelled but #cancelled stays false, so _addItem continues to enqueue into a cancelled controller and the run loop keeps executing nested agents instead of short-circuiting. This regresses the previous cancellation semantics (line 264) and can throw once the controller is closed or waste work for abandoned streams. The cancel handler should set the same cancellation flag/cleanup as the primary stream.

Useful? React with 👍 / 👎.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

if (this.#signal) {
const handleAbort = () => {
if (this.#cancelled) {
Expand Down Expand Up @@ -296,6 +319,39 @@ export class StreamedRunResult<
});
}

if (this.#contextScopeStreamOwner) {
const controller = this.state._context._contextScopeStreamController;
this.state._context._contextScopeStreamController = undefined;

if (this.state._context._contextScopeStream?.locked) {
if (controller) {
try {
controller.close();
} catch (err) {
logger.debug(
`Failed to close readable stream on abort: ${err}`,
);
}
}
} else {
if (this.state._context._contextScopeStream) {
void this.state._context._contextScopeStream
.cancel(this.#signal?.reason)
.catch((err) => {
logger.debug(
`Failed to cancel context scope stream on abort: ${err}`,
);
});
}
}
// Clean up context scope stream state on abort
// Clear the stream since it's been canceled and can't be read
this.state._context._contextScopeStream = undefined;
this.state._context._contextScopeStreamController = undefined;
this.state._context._copyToContextScopeStream = false;
this.#contextScopeStreamOwner = undefined;
}

this.#completedPromiseResolve?.();
};

Expand All @@ -314,6 +370,9 @@ export class StreamedRunResult<
_addItem(item: RunStreamEvent) {
if (!this.cancelled) {
this.#readableController?.enqueue(item);
if (this.state._context._copyToContextScopeStream) {
this.state._context._contextScopeStreamController?.enqueue(item);
}
}
}

Expand All @@ -325,6 +384,18 @@ export class StreamedRunResult<
if (!this.cancelled && this.#readableController) {
this.#readableController.close();
this.#readableController = undefined;
if (
this.state._context._contextScopeStreamController &&
this.#contextScopeStreamOwner
) {
this.state._context._contextScopeStreamController.close();
this.state._context._contextScopeStreamController = undefined;
// Clean up context scope stream state on completion
// Don't clear _contextScopeStream - it may still be consumed by toStream() or async iteration
// The stream will be garbage collected when no longer referenced
this.state._context._copyToContextScopeStream = false;
this.#contextScopeStreamOwner = undefined;
}
this.#completedPromiseResolve?.();
}
}
Expand All @@ -338,6 +409,19 @@ export class StreamedRunResult<
this.#readableController.error(err);
this.#readableController = undefined;
}
if (
!this.cancelled &&
this.state._context._contextScopeStreamController &&
this.#contextScopeStreamOwner
) {
this.state._context._contextScopeStreamController.error(err);
this.state._context._contextScopeStreamController = undefined;
// Clean up context scope stream state on error
// Clear the stream since it's been errored and can't be read
this.state._context._contextScopeStream = undefined;
this.state._context._copyToContextScopeStream = false;
this.#contextScopeStreamOwner = undefined;
}
this.#error = err;
this.#completedPromiseReject?.(err);
this.#completedPromise.catch((e) => {
Expand All @@ -357,6 +441,13 @@ export class StreamedRunResult<
* @returns A readable stream of the agent run.
*/
toStream(): ReadableStream<RunStreamEvent> {
if (
this.state._context._contextScopeStream &&
this.#contextScopeStreamOwner
) {
return this.state._context
._contextScopeStream as ReadableStream<RunStreamEvent>;
}
return this.#readableStream as ReadableStream<RunStreamEvent>;
}

Expand Down Expand Up @@ -390,7 +481,15 @@ export class StreamedRunResult<
toTextStream(
options: { compatibleWithNodeStreams?: boolean } = {},
): Readable | ReadableStream<string> {
const stream = this.#readableStream.pipeThrough(
let stream = this.#readableStream;
if (
this.state._context._contextScopeStream &&
this.#contextScopeStreamOwner
) {
stream = this.state._context._contextScopeStream;
}

const textStream = stream.pipeThrough(
new TransformStream<RunStreamEvent, string>({
transform(event, controller) {
if (
Expand All @@ -405,13 +504,19 @@ export class StreamedRunResult<
);

if (options.compatibleWithNodeStreams) {
return Readable.fromWeb(stream);
return Readable.fromWeb(textStream);
}

return stream as ReadableStream<string>;
return textStream as ReadableStream<string>;
}

[Symbol.asyncIterator](): AsyncIterator<RunStreamEvent> {
if (
this.state._context._contextScopeStream &&
this.#contextScopeStreamOwner
) {
return this.state._context._contextScopeStream[Symbol.asyncIterator]();
}
return this.#readableStream[Symbol.asyncIterator]();
}

Expand Down
11 changes: 9 additions & 2 deletions packages/agents-core/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ export type StreamRunOptions<TContext = undefined> =
* Whether to stream the run. If true, the run will emit events as the model responds.
*/
stream: true;
/**
* Whether to enable recursive streaming for agent as tool calls. If true, all events from agents invoked as tools will appear in the top-level stream result.
*/
streamAgentTools?: boolean;
};

/**
Expand Down Expand Up @@ -1147,7 +1151,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
// this loop to prevent internal false errors and unnecessary processing
return;
}
result._addItem(new RunRawModelStreamEvent(event));
result._addItem(
new RunRawModelStreamEvent(event, currentAgent.name),
);
}

if (parallelGuardrailPromise) {
Expand Down Expand Up @@ -1320,8 +1326,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {

// Initialize the streamed result with existing state
const result = new StreamedRunResult<TContext, TAgent>({
signal: options.signal,
state,
signal: options.signal,
streamAgentTools: options.streamAgentTools,
});

// Setup defaults
Expand Down
9 changes: 9 additions & 0 deletions packages/agents-core/src/runContext.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { RunStreamEvent } from './events';
import { RunToolApprovalItem } from './items';
import logger from './logger';
import { UnknownContext } from './types';
import { Usage } from './usage';
import {
ReadableStreamController,
ReadableStream as _ReadableStream,
} from '@openai/agents-core/_shims';

type ApprovalRecord = {
approved: boolean | string[];
Expand All @@ -28,6 +33,10 @@ export class RunContext<TContext = UnknownContext> {
*/
#approvals: Map<string, ApprovalRecord>;

_copyToContextScopeStream?: boolean;
_contextScopeStream?: _ReadableStream<RunStreamEvent>;
_contextScopeStreamController?: ReadableStreamController<RunStreamEvent>;

constructor(context: TContext = {} as TContext) {
this.context = context;
this.usage = new Usage();
Expand Down
Loading