Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions examples/agent-patterns/stream-agent-tools.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Agent, Runner, tool } from '@openai/agents';
import { z } from 'zod';

const getWeatherTool = tool({
name: 'get_weather',
description: 'Get the weather for a given city',
parameters: z.object({
city: z.string(),
}),
execute: async (input) => {
return `The weather in ${input.city} is sunny`;
},
});

const weatherAgent = new Agent({
name: 'Weather Agent',
tools: [getWeatherTool],
});

const getLocalNewsTool = tool({
name: 'get_local_news',
description: 'Get the local news for today',
parameters: z.object({
city: z.string(),
}),
execute: async (input) => {
return `Big news in ${input.city} today: famous local cat won Guinness World Record for loudest purr!`;
},
});

const newsAgent = new Agent({
name: 'News Agent',
instructions: 'You are a news agent that can tell the news for a given city.',
tools: [getLocalNewsTool],
});

const personalAgent = new Agent({
name: 'Personal Agent',
instructions:
'You are a personal agent that prepares a user for the day. You can use the news agent to get the news for the day, and the weather agent to get the weather for the day.',
tools: [
newsAgent.asTool({
toolName: 'news_agent',
toolDescription: 'Get the local news for today',
}),
weatherAgent.asTool({
toolName: 'weather_agent',
toolDescription: 'Get the weather for today',
}),
],
});

const runner = new Runner({
model: 'gpt-4.1-mini',
tracingDisabled: true,
});

async function main() {
const streamedRunResult = await runner.run(
personalAgent,
"What's up in Beijing today?",
{
stream: true,
// enable streaming of agent as tool events in the context scope stream
streamAgentTools: true,
},
);

for await (const event of streamedRunResult) {
console.log(JSON.stringify(event));
}
}

main().catch((error) => {
console.error(error);
process.exit(1);
});
31 changes: 22 additions & 9 deletions packages/agents-core/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export type AgentConfigWithHandoffs<
>;

// The parameter type fo needApproval function for the tool created by Agent.asTool() method
const AgentAsToolNeedApprovalSchame = z.object({ input: z.string() });
const AgentAsToolNeedApprovalSchema = z.object({ input: z.string() });

/**
* The class representing an AI agent configured with instructions, tools, guardrails, handoffs and more.
Expand Down Expand Up @@ -519,7 +519,7 @@ export class Agent<
*/
needsApproval?:
| boolean
| ToolApprovalFunction<typeof AgentAsToolNeedApprovalSchame>;
| ToolApprovalFunction<typeof AgentAsToolNeedApprovalSchema>;
/**
* Run configuration for initializing the internal agent runner.
*/
Expand All @@ -538,7 +538,7 @@ export class Agent<
agent: Agent<TContext, TOutput>;
}) => boolean | Promise<boolean>);
},
): FunctionTool<TContext, typeof AgentAsToolNeedApprovalSchame> {
): FunctionTool<TContext, typeof AgentAsToolNeedApprovalSchema> {
const {
toolName,
toolDescription,
Expand All @@ -551,7 +551,7 @@ export class Agent<
return tool({
name: toolName ?? toFunctionToolName(this.name),
description: toolDescription ?? '',
parameters: AgentAsToolNeedApprovalSchame,
parameters: AgentAsToolNeedApprovalSchema,
strict: true,
needsApproval,
isEnabled,
Expand All @@ -560,11 +560,24 @@ export class Agent<
throw new ModelBehaviorError('Agent tool called with invalid input');
}
const runner = new Runner(runConfig ?? {});
const result = await runner.run(this, data.input, {
context,
...(runOptions ?? {}),
});
const completedResult = result as CompletedRunResult<TContext, TAgent>;

let completedResult: CompletedRunResult<TContext, TAgent>;
if (context?._copyToContextScopeStream) {
const result = await runner.run(this, data.input, {
context,
...(runOptions ?? {}),
stream: true,
streamAgentTools: false, // already set in top level runner; no need to set it here again
});
await result.completed;
completedResult = result as CompletedRunResult<TContext, TAgent>;
} else {
const result = await runner.run(this, data.input, {
context,
...(runOptions ?? {}),
});
completedResult = result as CompletedRunResult<TContext, TAgent>;
}

const usesStopAtToolNames =
typeof this.toolUseBehavior === 'object' &&
Expand Down
5 changes: 4 additions & 1 deletion packages/agents-core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ export class RunRawModelStreamEvent {
/**
* @param data The raw responses stream events from the LLM.
*/
constructor(public data: ResponseStreamEvent) {}
constructor(
public data: ResponseStreamEvent,
public agentName: string,
) {}
}

/**
Expand Down
113 changes: 110 additions & 3 deletions packages/agents-core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,13 @@ export class StreamedRunResult<
#completedPromiseReject: ((err: unknown) => void) | undefined;
#cancelled: boolean = false;
#streamLoopPromise: Promise<void> | undefined;
#contextScopeStreamOwner: boolean | undefined;

constructor(
result: {
state: RunState<TContext, TAgent>;
signal?: AbortSignal;
streamAgentTools?: boolean;
} = {} as any,
) {
super(result.state);
Expand All @@ -269,6 +271,29 @@ export class StreamedRunResult<
this.#completedPromiseReject = reject;
});

if (result.streamAgentTools) {
if (this.state._context._copyToContextScopeStream) {
logger.warn(
'A context scope stream has already been created by another runner instance. Reusing it.',
);
} else {
logger.debug(
'Creating new context scope stream; all subsequent events will be copied to it.',
);
this.state._context._contextScopeStream =
new _ReadableStream<RunStreamEvent>({
start: (controller) => {
this.state._context._contextScopeStreamController = controller;
this.#contextScopeStreamOwner = true;
this.state._context._copyToContextScopeStream = true;
},
cancel: () => {
this.#cancelled = true;
},
});
}
}

if (this.#signal) {
const handleAbort = () => {
if (this.#cancelled) {
Expand Down Expand Up @@ -296,6 +321,39 @@ export class StreamedRunResult<
});
}

if (this.#contextScopeStreamOwner) {
const controller = this.state._context._contextScopeStreamController;
this.state._context._contextScopeStreamController = undefined;

if (this.state._context._contextScopeStream?.locked) {
if (controller) {
try {
controller.close();
} catch (err) {
logger.debug(
`Failed to close readable stream on abort: ${err}`,
);
}
}
} else {
if (this.state._context._contextScopeStream) {
void this.state._context._contextScopeStream
.cancel(this.#signal?.reason)
.catch((err) => {
logger.debug(
`Failed to cancel context scope stream on abort: ${err}`,
);
});
}
}
// Clean up context scope stream state on abort
// Clear the stream since it's been canceled and can't be read
this.state._context._contextScopeStream = undefined;
this.state._context._contextScopeStreamController = undefined;
this.state._context._copyToContextScopeStream = false;
this.#contextScopeStreamOwner = undefined;
}

this.#completedPromiseResolve?.();
};

Expand All @@ -314,6 +372,9 @@ export class StreamedRunResult<
_addItem(item: RunStreamEvent) {
if (!this.cancelled) {
this.#readableController?.enqueue(item);
if (this.state._context._copyToContextScopeStream) {
this.state._context._contextScopeStreamController?.enqueue(item);
}
}
}

Expand All @@ -325,6 +386,18 @@ export class StreamedRunResult<
if (!this.cancelled && this.#readableController) {
this.#readableController.close();
this.#readableController = undefined;
if (
this.state._context._contextScopeStreamController &&
this.#contextScopeStreamOwner
) {
this.state._context._contextScopeStreamController.close();
this.state._context._contextScopeStreamController = undefined;
// Clean up context scope stream state on completion
// Don't clear _contextScopeStream - it may still be consumed by toStream() or async iteration
// The stream will be garbage collected when no longer referenced
this.state._context._copyToContextScopeStream = false;
this.#contextScopeStreamOwner = undefined;
}
this.#completedPromiseResolve?.();
}
}
Expand All @@ -338,6 +411,19 @@ export class StreamedRunResult<
this.#readableController.error(err);
this.#readableController = undefined;
}
if (
!this.cancelled &&
this.state._context._contextScopeStreamController &&
this.#contextScopeStreamOwner
) {
this.state._context._contextScopeStreamController.error(err);
this.state._context._contextScopeStreamController = undefined;
// Clean up context scope stream state on error
// Clear the stream since it's been errored and can't be read
this.state._context._contextScopeStream = undefined;
this.state._context._copyToContextScopeStream = false;
this.#contextScopeStreamOwner = undefined;
}
this.#error = err;
this.#completedPromiseReject?.(err);
this.#completedPromise.catch((e) => {
Expand All @@ -357,6 +443,13 @@ export class StreamedRunResult<
* @returns A readable stream of the agent run.
*/
toStream(): ReadableStream<RunStreamEvent> {
if (
this.state._context._contextScopeStream &&
this.#contextScopeStreamOwner
) {
return this.state._context
._contextScopeStream as ReadableStream<RunStreamEvent>;
}
return this.#readableStream as ReadableStream<RunStreamEvent>;
}

Expand Down Expand Up @@ -390,7 +483,15 @@ export class StreamedRunResult<
toTextStream(
options: { compatibleWithNodeStreams?: boolean } = {},
): Readable | ReadableStream<string> {
const stream = this.#readableStream.pipeThrough(
let stream = this.#readableStream;
if (
this.state._context._contextScopeStream &&
this.#contextScopeStreamOwner
) {
stream = this.state._context._contextScopeStream;
}

const textStream = stream.pipeThrough(
new TransformStream<RunStreamEvent, string>({
transform(event, controller) {
if (
Expand All @@ -405,13 +506,19 @@ export class StreamedRunResult<
);

if (options.compatibleWithNodeStreams) {
return Readable.fromWeb(stream);
return Readable.fromWeb(textStream);
}

return stream as ReadableStream<string>;
return textStream as ReadableStream<string>;
}

[Symbol.asyncIterator](): AsyncIterator<RunStreamEvent> {
if (
this.state._context._contextScopeStream &&
this.#contextScopeStreamOwner
) {
return this.state._context._contextScopeStream[Symbol.asyncIterator]();
}
return this.#readableStream[Symbol.asyncIterator]();
}

Expand Down
11 changes: 9 additions & 2 deletions packages/agents-core/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ export type StreamRunOptions<TContext = undefined> =
* Whether to stream the run. If true, the run will emit events as the model responds.
*/
stream: true;
/**
* Whether to enable recursive streaming for agent as tool calls. If true, all events from agents invoked as tools will appear in the top-level stream result.
*/
streamAgentTools?: boolean;
};

/**
Expand Down Expand Up @@ -1147,7 +1151,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
// this loop to prevent internal false errors and unnecessary processing
return;
}
result._addItem(new RunRawModelStreamEvent(event));
result._addItem(
new RunRawModelStreamEvent(event, currentAgent.name),
);
}

if (parallelGuardrailPromise) {
Expand Down Expand Up @@ -1320,8 +1326,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {

// Initialize the streamed result with existing state
const result = new StreamedRunResult<TContext, TAgent>({
signal: options.signal,
state,
signal: options.signal,
streamAgentTools: options.streamAgentTools,
});

// Setup defaults
Expand Down
9 changes: 9 additions & 0 deletions packages/agents-core/src/runContext.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { RunStreamEvent } from './events';
import { RunToolApprovalItem } from './items';
import logger from './logger';
import { UnknownContext } from './types';
import { Usage } from './usage';
import {
ReadableStreamController,
ReadableStream as _ReadableStream,
} from '@openai/agents-core/_shims';

type ApprovalRecord = {
approved: boolean | string[];
Expand All @@ -28,6 +33,10 @@ export class RunContext<TContext = UnknownContext> {
*/
#approvals: Map<string, ApprovalRecord>;

_copyToContextScopeStream?: boolean;
_contextScopeStream?: _ReadableStream<RunStreamEvent>;
_contextScopeStreamController?: ReadableStreamController<RunStreamEvent>;

constructor(context: TContext = {} as TContext) {
this.context = context;
this.usage = new Usage();
Expand Down
Loading