From c6161a4e366a94ce1066fe6cbb92b490c0c14068 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 1 Oct 2025 13:56:26 -0700 Subject: [PATCH 1/3] fix: improve the compatibility for conversationId / previousResponseId + tool calls ref: https://github.com/openai/openai-agents-python/pull/1827 --- .changeset/heavy-foxes-sit.md | 7 + .../content/docs/guides/running-agents.mdx | 28 +++ .../docs/running-agents/conversationId.ts | 25 ++ .../docs/running-agents/previousResponseId.ts | 21 ++ packages/agents-core/src/run.ts | 166 +++++++++++-- packages/agents-core/test/run.stream.test.ts | 226 ++++++++++++++++++ packages/agents-core/test/run.test.ts | 204 ++++++++++++++++ 7 files changed, 662 insertions(+), 15 deletions(-) create mode 100644 .changeset/heavy-foxes-sit.md create mode 100644 examples/docs/running-agents/conversationId.ts create mode 100644 examples/docs/running-agents/previousResponseId.ts diff --git a/.changeset/heavy-foxes-sit.md b/.changeset/heavy-foxes-sit.md new file mode 100644 index 00000000..6eca3600 --- /dev/null +++ b/.changeset/heavy-foxes-sit.md @@ -0,0 +1,7 @@ +--- +'@openai/agents-core': patch +--- + +fix: improve the compatibility for conversationId / previousResponseId + tool calls + +ref: https://github.com/openai/openai-agents-python/pull/1827 diff --git a/docs/src/content/docs/guides/running-agents.mdx b/docs/src/content/docs/guides/running-agents.mdx index b4eb64f8..d890c4a6 100644 --- a/docs/src/content/docs/guides/running-agents.mdx +++ b/docs/src/content/docs/guides/running-agents.mdx @@ -8,6 +8,8 @@ import helloWorldWithRunnerExample from '../../../../../examples/docs/hello-worl import helloWorldExample from '../../../../../examples/docs/hello-world.ts?raw'; import runningAgentsExceptionExample from '../../../../../examples/docs/running-agents/exceptions1.ts?raw'; import chatLoopExample from '../../../../../examples/docs/running-agents/chatLoop.ts?raw'; +import conversationIdExample from '../../../../../examples/docs/running-agents/conversationId.ts?raw'; +import previousResponseIdExample from '../../../../../examples/docs/running-agents/previousResponseId.ts?raw'; Agents do nothing by themselves – you **run** them with the `Runner` class or the `run()` utility. @@ -95,6 +97,32 @@ Each call to `runner.run()` (or `run()` utility) represents one **turn** in your See [the chat example](https://github.com/openai/openai-agents-js/tree/main/examples/basic/chat.ts) for an interactive version. +### Server-managed conversations + +You can let the OpenAI Responses API persist conversation history for you instead of sending your entire local transcript on every turn. This is useful when you are coordinating long conversations or multiple services. See the [Conversation state guide](https://platform.openai.com/docs/guides/conversation-state?api-mode=responses) for details. + +OpenAI exposes two ways to reuse server-side state: + +#### 1. `conversationId` for an entire conversation + +You can create a conversation once using [Conversations API](https://platform.openai.com/docs/api-reference/conversations/create) and then reuse its ID for every turn. The SDK automatically includes only the newly generated items. + + + +#### 2. `previousResponseId` to continue from the last turn + +If you want to start only with Responses API anyway, you can chain each request using the ID returned from the previous response. This keeps the context alive across turns without creating a full conversation resource. + + + ## Exceptions The SDK throws a small set of errors you can catch: diff --git a/examples/docs/running-agents/conversationId.ts b/examples/docs/running-agents/conversationId.ts new file mode 100644 index 00000000..ebd72191 --- /dev/null +++ b/examples/docs/running-agents/conversationId.ts @@ -0,0 +1,25 @@ +import { Agent, run } from '@openai/agents'; +import { OpenAI } from 'openai'; + +const agent = new Agent({ + name: 'Assistant', + instructions: 'Reply very concisely.', +}); + +async function main() { + // Create a server-managed conversation: + const client = new OpenAI(); + const { id: conversationId } = await client.conversations.create({}); + + const first = await run(agent, 'What city is the Golden Gate Bridge in?', { + conversationId, + }); + console.log(first.finalOutput); + // -> "San Francisco" + + const second = await run(agent, 'What state is it in?', { conversationId }); + console.log(second.finalOutput); + // -> "California" +} + +main().catch(console.error); diff --git a/examples/docs/running-agents/previousResponseId.ts b/examples/docs/running-agents/previousResponseId.ts new file mode 100644 index 00000000..8ae549d6 --- /dev/null +++ b/examples/docs/running-agents/previousResponseId.ts @@ -0,0 +1,21 @@ +import { Agent, run } from '@openai/agents'; + +const agent = new Agent({ + name: 'Assistant', + instructions: 'Reply very concisely.', +}); + +async function main() { + const first = await run(agent, 'What city is the Golden Gate Bridge in?'); + console.log(first.finalOutput); + // -> "San Francisco" + + const previousResponseId = first.lastResponseId; + const second = await run(agent, 'What state is it in?', { + previousResponseId, + }); + console.log(second.finalOutput); + // -> "California" +} + +main().catch(console.error); diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 47471505..14bbdbbb 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -178,6 +178,106 @@ export function getTracing( return 'enabled_without_data'; } +function toAgentInputList( + originalInput: string | AgentInputItem[], +): AgentInputItem[] { + if (typeof originalInput === 'string') { + return [{ type: 'message', role: 'user', content: originalInput }]; + } + + return [...originalInput]; +} + +/** + * Internal module for tracking the items in turns and ensuring that we don't send duplicate items. + * This logic is vital for properly handling the items to send during multiple turns + * when you use either `conversationId` or `previousResponseId`. + * Both scenarios expect an agent loop to send only the new items for each Responses API cal. + * + * see also: https://platform.openai.com/docs/guides/conversation-state?api-mode=responses + */ +class ServerConversationTracker { + // Conversation ID: + // - https://platform.openai.com/docs/guides/conversation-state?api-mode=responses#using-the-conversations-api + // - https://platform.openai.com/docs/api-reference/conversations/create + public conversationId?: string; + + // Previous Response ID: + // https://platform.openai.com/docs/guides/conversation-state?api-mode=responses#passing-context-from-the-previous-response + public previousResponseId?: string; + + // Using this flag because WeakSet does not provides a way to check its size + private sentInitialInput = false; + // The items already sent to the model; using WeakSet for memory efficiency + private sentItems = new WeakSet(); + // The items received from the server; using WeakSet for memory efficiency + private serverItems = new WeakSet(); + + constructor({ + conversationId, + previousResponseId, + }: { + conversationId?: string; + previousResponseId?: string; + }) { + this.conversationId = conversationId ?? undefined; + this.previousResponseId = previousResponseId ?? undefined; + } + + trackServerItems(modelResponse: ModelResponse | undefined) { + if (!modelResponse) { + return; + } + for (const item of modelResponse.output) { + if (item && typeof item === 'object') { + this.serverItems.add(item); + } + } + if ( + !this.conversationId && + this.previousResponseId !== undefined && + modelResponse.responseId + ) { + this.previousResponseId = modelResponse.responseId; + } + } + + prepareInput( + originalInput: string | AgentInputItem[], + generatedItems: RunItem[], + ): AgentInputItem[] { + const inputItems: AgentInputItem[] = []; + + if (!this.sentInitialInput) { + const initialItems = toAgentInputList(originalInput); + for (const item of initialItems) { + inputItems.push(item); + if (item && typeof item === 'object') { + this.sentItems.add(item); + } + } + this.sentInitialInput = true; + } + + for (const item of generatedItems) { + if (item.type === 'tool_approval_item') { + continue; + } + const rawItem = item.rawItem; + if (!rawItem || typeof rawItem !== 'object') { + continue; + } + if (this.sentItems.has(rawItem) || this.serverItems.has(rawItem)) { + continue; + } + inputItems.push(rawItem as AgentInputItem); + this.sentItems.add(rawItem); + } + + return inputItems; + } +} + export function getTurnInput( originalInput: string | AgentInputItem[], generatedItems: RunItem[], @@ -185,12 +285,7 @@ export function getTurnInput( const rawItems = generatedItems .filter((item) => item.type !== 'tool_approval_item') // don't include approval items to avoid double function calls .map((item) => item.rawItem); - - if (typeof originalInput === 'string') { - originalInput = [{ type: 'message', role: 'user', content: originalInput }]; - } - - return [...originalInput, ...rawItems]; + return [...toAgentInputList(originalInput), ...rawItems]; } /** @@ -254,6 +349,14 @@ export class Runner extends RunHooks> { options.maxTurns ?? DEFAULT_MAX_TURNS, ); + const serverConversationTracker = + options.conversationId || options.previousResponseId + ? new ServerConversationTracker({ + conversationId: options.conversationId, + previousResponseId: options.previousResponseId, + }) + : undefined; + try { while (true) { const explictlyModelSet = @@ -355,10 +458,12 @@ export class Runner extends RunHooks> { await this.#runInputGuardrails(state); } - const turnInput = getTurnInput( - state._originalInput, - state._generatedItems, - ); + const turnInput = serverConversationTracker + ? serverConversationTracker.prepareInput( + state._originalInput, + state._generatedItems, + ) + : getTurnInput(state._originalInput, state._generatedItems); if (state._noActiveAgentRun) { state._currentAgent.emit( @@ -385,14 +490,21 @@ export class Runner extends RunHooks> { state._toolUseTracker, modelSettings, ); + const previousResponseId = + serverConversationTracker?.previousResponseId ?? + options.previousResponseId; + const conversationId = + serverConversationTracker?.conversationId ?? + options.conversationId; + state._lastTurnResponse = await model.getResponse({ systemInstructions: await state._currentAgent.getSystemPrompt( state._context, ), prompt: await state._currentAgent.getPrompt(state._context), input: turnInput, - previousResponseId: options.previousResponseId, - conversationId: options.conversationId, + previousResponseId, + conversationId, modelSettings, tools: serializedTools, outputType: convertAgentOutputTypeToSerializable( @@ -409,6 +521,10 @@ export class Runner extends RunHooks> { state._context.usage.add(state._lastTurnResponse.usage); state._noActiveAgentRun = false; + serverConversationTracker?.trackServerItems( + state._lastTurnResponse, + ); + const processedResponse = processModelResponse( state._lastTurnResponse, state._currentAgent, @@ -623,6 +739,14 @@ export class Runner extends RunHooks> { result: StreamedRunResult, options: StreamRunOptions, ): Promise { + const serverConversationTracker = + options.conversationId || options.previousResponseId + ? new ServerConversationTracker({ + conversationId: options.conversationId, + previousResponseId: options.previousResponseId, + }) + : undefined; + try { while (true) { const currentAgent = result.state._currentAgent; @@ -739,7 +863,12 @@ export class Runner extends RunHooks> { modelSettings, ); - const turnInput = getTurnInput(result.input, result.newItems); + const turnInput = serverConversationTracker + ? serverConversationTracker.prepareInput( + result.input, + result.newItems, + ) + : getTurnInput(result.input, result.newItems); if (result.state._noActiveAgentRun) { currentAgent.emit( @@ -752,14 +881,20 @@ export class Runner extends RunHooks> { let finalResponse: ModelResponse | undefined = undefined; + const previousResponseId = + serverConversationTracker?.previousResponseId ?? + options.previousResponseId; + const conversationId = + serverConversationTracker?.conversationId ?? options.conversationId; + for await (const event of model.getStreamedResponse({ systemInstructions: await currentAgent.getSystemPrompt( result.state._context, ), prompt: await currentAgent.getPrompt(result.state._context), input: turnInput, - previousResponseId: options.previousResponseId, - conversationId: options.conversationId, + previousResponseId, + conversationId, modelSettings, tools: serializedTools, handoffs: serializedHandoffs, @@ -798,6 +933,7 @@ export class Runner extends RunHooks> { } result.state._lastTurnResponse = finalResponse; + serverConversationTracker?.trackServerItems(finalResponse); result.state._modelResponses.push(result.state._lastTurnResponse); const processedResponse = processModelResponse( diff --git a/packages/agents-core/test/run.stream.test.ts b/packages/agents-core/test/run.stream.test.ts index 3b98987e..4cde2266 100644 --- a/packages/agents-core/test/run.stream.test.ts +++ b/packages/agents-core/test/run.stream.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect, beforeAll, vi } from 'vitest'; import { z } from 'zod'; import { Agent, + AgentInputItem, run, Runner, setDefaultModelProvider, @@ -10,6 +11,7 @@ import { RunStreamEvent, RunAgentUpdatedStreamEvent, RunItemStreamEvent, + StreamedRunResult, handoff, Model, ModelRequest, @@ -19,6 +21,7 @@ import { tool, } from '../src'; import { FakeModel, FakeModelProvider, fakeModelMessage } from './stubs'; +import * as protocol from '../src/types/protocol'; // Test for unhandled rejection when stream loop throws @@ -394,4 +397,227 @@ describe('Runner.run (streaming)', () => { 'message_output_created', ]); }); + + describe('server-managed conversation state', () => { + type Turn = { output: protocol.ModelItem[]; responseId?: string }; + + class TrackingStreamingModel implements Model { + public requests: ModelRequest[] = []; + public firstRequest: ModelRequest | undefined; + public lastRequest: ModelRequest | undefined; + + constructor(private readonly turns: Turn[]) {} + + private recordRequest(request: ModelRequest) { + const clonedInput: string | AgentInputItem[] = + typeof request.input === 'string' + ? request.input + : (JSON.parse(JSON.stringify(request.input)) as AgentInputItem[]); + + const recorded: ModelRequest = { + ...request, + input: clonedInput, + }; + + this.requests.push(recorded); + this.lastRequest = recorded; + this.firstRequest ??= recorded; + } + + async getResponse(_request: ModelRequest): Promise { + throw new Error('Not implemented'); + } + + async *getStreamedResponse( + request: ModelRequest, + ): AsyncIterable { + this.recordRequest(request); + const turn = this.turns.shift(); + if (!turn) { + throw new Error('No response configured'); + } + + const responseId = turn.responseId ?? `resp-${this.requests.length}`; + yield { + type: 'response_done', + response: { + id: responseId, + usage: { + requests: 1, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + output: JSON.parse( + JSON.stringify(turn.output), + ) as protocol.ModelItem[], + }, + } as StreamEvent; + } + } + + const buildTurn = ( + items: protocol.ModelItem[], + responseId?: string, + ): Turn => ({ + output: JSON.parse(JSON.stringify(items)) as protocol.ModelItem[], + responseId, + }); + + const buildToolCall = (callId: string, arg: string): FunctionCallItem => ({ + id: callId, + type: 'function_call', + name: 'test', + callId, + status: 'completed', + arguments: JSON.stringify({ test: arg }), + }); + + const serverTool = tool({ + name: 'test', + description: 'test tool', + parameters: z.object({ test: z.string() }), + execute: async ({ test }) => `result:${test}`, + }); + + async function drain>( + result: StreamedRunResult, + ) { + for await (const _ of result.toStream()) { + // drain + } + await result.completed; + } + + it('only sends new items when using conversationId across turns', async () => { + const model = new TrackingStreamingModel([ + buildTurn( + [fakeModelMessage('a_message'), buildToolCall('call-1', 'foo')], + 'resp-1', + ), + buildTurn( + [fakeModelMessage('b_message'), buildToolCall('call-2', 'bar')], + 'resp-2', + ), + buildTurn([fakeModelMessage('done')], 'resp-3'), + ]); + + const agent = new Agent({ + name: 'StreamTest', + model, + tools: [serverTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'user_message', { + stream: true, + conversationId: 'conv-test-123', + }); + + await drain(result); + + expect(result.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(3); + expect(model.requests.map((req) => req.conversationId)).toEqual([ + 'conv-test-123', + 'conv-test-123', + 'conv-test-123', + ]); + + const firstInput = model.requests[0].input; + expect(Array.isArray(firstInput)).toBe(true); + expect(firstInput as AgentInputItem[]).toHaveLength(1); + const userMessage = (firstInput as AgentInputItem[])[0] as any; + expect(userMessage.role).toBe('user'); + expect(userMessage.content).toBe('user_message'); + + const secondItems = model.requests[1].input as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-1', + }); + + const thirdItems = model.requests[2].input as AgentInputItem[]; + expect(thirdItems).toHaveLength(1); + expect(thirdItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-2', + }); + }); + + it('only sends new items and updates previousResponseId across turns', async () => { + const model = new TrackingStreamingModel([ + buildTurn( + [fakeModelMessage('a_message'), buildToolCall('call-1', 'foo')], + 'resp-789', + ), + buildTurn([fakeModelMessage('done')], 'resp-900'), + ]); + + const agent = new Agent({ + name: 'StreamPrev', + model, + tools: [serverTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'user_message', { + stream: true, + previousResponseId: 'initial-response-123', + }); + + await drain(result); + + expect(result.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + expect(model.requests[0].previousResponseId).toBe('initial-response-123'); + + const secondRequest = model.requests[1]; + expect(secondRequest.previousResponseId).toBe('resp-789'); + const secondItems = secondRequest.input as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-1', + }); + }); + + it('sends full history when no server-managed state is provided', async () => { + const model = new TrackingStreamingModel([ + buildTurn( + [fakeModelMessage('a_message'), buildToolCall('call-1', 'foo')], + 'resp-789', + ), + buildTurn([fakeModelMessage('done')], 'resp-900'), + ]); + + const agent = new Agent({ + name: 'StreamDefault', + model, + tools: [serverTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'user_message', { stream: true }); + + await drain(result); + + expect(result.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + + const secondItems = model.requests[1].input as AgentInputItem[]; + expect(secondItems).toHaveLength(4); + expect(secondItems[0]).toMatchObject({ role: 'user' }); + expect(secondItems[1]).toMatchObject({ role: 'assistant' }); + expect(secondItems[2]).toMatchObject({ + type: 'function_call', + name: 'test', + }); + expect(secondItems[3]).toMatchObject({ + type: 'function_call_result', + callId: 'call-1', + }); + }); + }); }); diff --git a/packages/agents-core/test/run.test.ts b/packages/agents-core/test/run.test.ts index eeabb544..8205baf6 100644 --- a/packages/agents-core/test/run.test.ts +++ b/packages/agents-core/test/run.test.ts @@ -11,6 +11,7 @@ import { import { z } from 'zod'; import { Agent, + AgentInputItem, MaxTurnsExceededError, ModelResponse, OutputGuardrailTripwireTriggered, @@ -32,6 +33,7 @@ import { RunContext } from '../src/runContext'; import { RunState } from '../src/runState'; import * as protocol from '../src/types/protocol'; import { Usage } from '../src/usage'; +import { tool } from '../src/tool'; import { FakeModel, fakeModelMessage, @@ -661,6 +663,208 @@ describe('Runner.run', () => { }); }); + describe('server-managed conversation state', () => { + type TurnResponse = ModelResponse; + + class TrackingModel implements Model { + public requests: ModelRequest[] = []; + public firstRequest: ModelRequest | undefined; + public lastRequest: ModelRequest | undefined; + + constructor(private readonly responses: TurnResponse[]) {} + + private recordRequest(request: ModelRequest) { + const clonedInput: string | AgentInputItem[] = + typeof request.input === 'string' + ? request.input + : (JSON.parse(JSON.stringify(request.input)) as AgentInputItem[]); + + const recorded: ModelRequest = { + ...request, + input: clonedInput, + }; + + this.requests.push(recorded); + this.lastRequest = recorded; + this.firstRequest ??= recorded; + } + + async getResponse(request: ModelRequest): Promise { + this.recordRequest(request); + const response = this.responses.shift(); + if (!response) { + throw new Error('No response configured'); + } + return response; + } + + getStreamedResponse( + _request: ModelRequest, + ): AsyncIterable { + throw new Error('Not implemented'); + } + } + + const buildResponse = ( + items: protocol.ModelItem[], + responseId?: string, + ): ModelResponse => ({ + output: JSON.parse(JSON.stringify(items)) as protocol.ModelItem[], + usage: new Usage(), + responseId, + }); + + const buildToolCall = ( + callId: string, + arg: string, + ): protocol.FunctionCallItem => ({ + id: callId, + type: 'function_call', + name: 'test', + callId, + status: 'completed', + arguments: JSON.stringify({ test: arg }), + }); + + const serverTool = tool({ + name: 'test', + description: 'test tool', + parameters: z.object({ test: z.string() }), + execute: async ({ test }) => `result:${test}`, + }); + + it('only sends new items when using conversationId across turns', async () => { + const model = new TrackingModel([ + buildResponse( + [fakeModelMessage('a_message'), buildToolCall('call-1', 'foo')], + 'resp-1', + ), + buildResponse( + [fakeModelMessage('b_message'), buildToolCall('call-2', 'bar')], + 'resp-2', + ), + buildResponse([fakeModelMessage('done')], 'resp-3'), + ]); + + const agent = new Agent({ + name: 'Test', + model, + tools: [serverTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'user_message', { + conversationId: 'conv-test-123', + }); + + expect(result.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(3); + expect(model.requests.map((req) => req.conversationId)).toEqual([ + 'conv-test-123', + 'conv-test-123', + 'conv-test-123', + ]); + + const firstInput = model.requests[0].input; + expect(Array.isArray(firstInput)).toBe(true); + expect(firstInput as AgentInputItem[]).toHaveLength(1); + const userMessage = (firstInput as AgentInputItem[])[0] as any; + expect(userMessage.role).toBe('user'); + expect(userMessage.content).toBe('user_message'); + + const secondInput = model.requests[1].input; + expect(Array.isArray(secondInput)).toBe(true); + const secondItems = secondInput as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-1', + }); + + const thirdInput = model.requests[2].input; + expect(Array.isArray(thirdInput)).toBe(true); + const thirdItems = thirdInput as AgentInputItem[]; + expect(thirdItems).toHaveLength(1); + expect(thirdItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-2', + }); + }); + + it('only sends new items and updates previousResponseId across turns', async () => { + const model = new TrackingModel([ + buildResponse( + [fakeModelMessage('a_message'), buildToolCall('call-1', 'foo')], + 'resp-789', + ), + buildResponse([fakeModelMessage('done')], 'resp-900'), + ]); + + const agent = new Agent({ + name: 'Test', + model, + tools: [serverTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'user_message', { + previousResponseId: 'initial-response-123', + }); + + expect(result.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + + expect(model.requests[0].previousResponseId).toBe('initial-response-123'); + + const secondRequest = model.requests[1]; + expect(secondRequest.previousResponseId).toBe('resp-789'); + expect(Array.isArray(secondRequest.input)).toBe(true); + const secondItems = secondRequest.input as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-1', + }); + }); + + it('sends full history when no server-managed state is provided', async () => { + const model = new TrackingModel([ + buildResponse( + [fakeModelMessage('a_message'), buildToolCall('call-1', 'foo')], + 'resp-789', + ), + buildResponse([fakeModelMessage('done')], 'resp-900'), + ]); + + const agent = new Agent({ + name: 'Test', + model, + tools: [serverTool], + }); + + const runner = new Runner(); + const result = await runner.run(agent, 'user_message'); + + expect(result.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + + const secondInput = model.requests[1].input; + expect(Array.isArray(secondInput)).toBe(true); + const secondItems = secondInput as AgentInputItem[]; + expect(secondItems).toHaveLength(4); + expect(secondItems[0]).toMatchObject({ role: 'user' }); + expect(secondItems[1]).toMatchObject({ role: 'assistant' }); + expect(secondItems[2]).toMatchObject({ + type: 'function_call', + name: 'test', + }); + expect(secondItems[3]).toMatchObject({ + type: 'function_call_result', + callId: 'call-1', + }); + }); + }); + describe('selectModel', () => { const MODEL_A = 'gpt-4o'; const MODEL_B = 'gpt-4.1-mini'; From c1207afd043bff726d7f4fb023db63cd0c028fd0 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Thu, 2 Oct 2025 11:31:28 -0700 Subject: [PATCH 2/3] Apply suggestions from code review --- packages/agents-core/src/run.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 14bbdbbb..0f0508d9 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -192,7 +192,7 @@ function toAgentInputList( * Internal module for tracking the items in turns and ensuring that we don't send duplicate items. * This logic is vital for properly handling the items to send during multiple turns * when you use either `conversationId` or `previousResponseId`. - * Both scenarios expect an agent loop to send only the new items for each Responses API cal. + * Both scenarios expect an agent loop to send only new items for each Responses API call. * * see also: https://platform.openai.com/docs/guides/conversation-state?api-mode=responses */ @@ -206,7 +206,7 @@ class ServerConversationTracker { // https://platform.openai.com/docs/guides/conversation-state?api-mode=responses#passing-context-from-the-previous-response public previousResponseId?: string; - // Using this flag because WeakSet does not provides a way to check its size + // Using this flag because WeakSet does not provide a way to check its size private sentInitialInput = false; // The items already sent to the model; using WeakSet for memory efficiency private sentItems = new WeakSet(); From 9e6afaffb92534482a5f2e4d8e57441c945224cc Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Thu, 2 Oct 2025 23:00:34 -0700 Subject: [PATCH 3/3] add moe tests --- examples/basic/conversations.ts | 38 ++- packages/agents-core/src/run.ts | 111 ++++++-- packages/agents-core/test/run.stream.test.ts | 61 ++++ packages/agents-core/test/run.test.ts | 283 ++++++++++++++++++- 4 files changed, 463 insertions(+), 30 deletions(-) diff --git a/examples/basic/conversations.ts b/examples/basic/conversations.ts index 02806e4a..d0054b3b 100644 --- a/examples/basic/conversations.ts +++ b/examples/basic/conversations.ts @@ -1,5 +1,6 @@ -import { Agent, run } from '@openai/agents'; +import { Agent, run, tool } from '@openai/agents'; import OpenAI from 'openai'; +import z from 'zod'; async function main() { const client = new OpenAI(); @@ -9,22 +10,47 @@ async function main() { console.log(`New conversation: ${JSON.stringify(newConvo, null, 2)}`); const conversationId = newConvo.id; + const getWeatherTool = tool({ + name: 'get_weather', + description: 'Get the weather for a given city', + parameters: z.object({ city: z.string() }), + strict: true, + async execute({ city }) { + return `The weather in ${city} is sunny.`; + }, + }); + const agent = new Agent({ name: 'Assistant', instructions: 'You are a helpful assistant. be VERY concise.', + tools: [getWeatherTool], }); // Set the conversation ID for the runs console.log('\n### Agent runs:\n'); - const runOptions = { conversationId }; + const options = { conversationId }; let result = await run( agent, 'What is the largest country in South America?', - runOptions, + options, + ); + // First run: The largest country in South America is Brazil. + console.log(`First run: ${result.finalOutput}`); + result = await run(agent, 'What is the capital of that country?', options); + // Second run: The capital of Brazil is Brasília. + console.log(`Second run: ${result.finalOutput}`); + + result = await run(agent, 'What is the weather in the city today?', options); + // Thrid run: The weather in Brasília today is sunny. + console.log(`Thrid run: ${result.finalOutput}`); + + result = await run( + agent, + `Can you share the same information about the smallest country's capital in South America?`, + options, ); - console.log(`First run: ${result.finalOutput}`); // e.g., Brazil - result = await run(agent, 'What is the capital of that country?', runOptions); - console.log(`Second run: ${result.finalOutput}`); // e.g., Brasilia + // Fourth run: The smallest country in South America is Suriname. Its capital is Paramaribo. The weather in Paramaribo today is sunny. + console.log(`Fourth run: ${result.finalOutput}`); console.log('\n### Conversation items:\n'); const convo = await client.conversations.items.list(conversationId); diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 0f0508d9..9bd19cbc 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -224,6 +224,54 @@ class ServerConversationTracker { this.previousResponseId = previousResponseId ?? undefined; } + /** + * Pre-populates tracker caches from an existing RunState when resuming server-managed runs. + */ + primeFromState({ + originalInput, + generatedItems, + modelResponses, + }: { + originalInput: string | AgentInputItem[]; + generatedItems: RunItem[]; + modelResponses: ModelResponse[]; + }) { + if (this.sentInitialInput) { + return; + } + + for (const item of toAgentInputList(originalInput)) { + if (item && typeof item === 'object') { + this.sentItems.add(item); + } + } + + this.sentInitialInput = true; + + const latestResponse = modelResponses[modelResponses.length - 1]; + for (const response of modelResponses) { + for (const item of response.output) { + if (item && typeof item === 'object') { + this.serverItems.add(item); + } + } + } + + if (!this.conversationId && latestResponse?.responseId) { + this.previousResponseId = latestResponse.responseId; + } + + for (const item of generatedItems) { + const rawItem = item.rawItem; + if (!rawItem || typeof rawItem !== 'object') { + continue; + } + if (this.serverItems.has(rawItem)) { + this.sentItems.add(rawItem); + } + } + } + trackServerItems(modelResponse: ModelResponse | undefined) { if (!modelResponse) { return; @@ -337,17 +385,17 @@ export class Runner extends RunHooks> { ): Promise> { return withNewSpanContext(async () => { // if we have a saved state we use that one, otherwise we create a new one - const state = - input instanceof RunState - ? input - : new RunState( - options.context instanceof RunContext - ? options.context - : new RunContext(options.context), - input, - startingAgent, - options.maxTurns ?? DEFAULT_MAX_TURNS, - ); + const isResumedState = input instanceof RunState; + const state = isResumedState + ? input + : new RunState( + options.context instanceof RunContext + ? options.context + : new RunContext(options.context), + input, + startingAgent, + options.maxTurns ?? DEFAULT_MAX_TURNS, + ); const serverConversationTracker = options.conversationId || options.previousResponseId @@ -357,6 +405,14 @@ export class Runner extends RunHooks> { }) : undefined; + if (serverConversationTracker && isResumedState) { + serverConversationTracker.primeFromState({ + originalInput: state._originalInput, + generatedItems: state._generatedItems, + modelResponses: state._modelResponses, + }); + } + try { while (true) { const explictlyModelSet = @@ -738,6 +794,7 @@ export class Runner extends RunHooks> { >( result: StreamedRunResult, options: StreamRunOptions, + isResumedState: boolean, ): Promise { const serverConversationTracker = options.conversationId || options.previousResponseId @@ -747,6 +804,14 @@ export class Runner extends RunHooks> { }) : undefined; + if (serverConversationTracker && isResumedState) { + serverConversationTracker.primeFromState({ + originalInput: result.state._originalInput, + generatedItems: result.state._generatedItems, + modelResponses: result.state._modelResponses, + }); + } + try { while (true) { const currentAgent = result.state._currentAgent; @@ -1051,17 +1116,17 @@ export class Runner extends RunHooks> { options = options ?? ({} as StreamRunOptions); return withNewSpanContext(async () => { // Initialize or reuse existing state - const state: RunState = - input instanceof RunState - ? input - : new RunState( - options.context instanceof RunContext - ? options.context - : new RunContext(options.context), - input as string | AgentInputItem[], - agent, - options.maxTurns ?? DEFAULT_MAX_TURNS, - ); + const isResumedState = input instanceof RunState; + const state: RunState = isResumedState + ? input + : new RunState( + options.context instanceof RunContext + ? options.context + : new RunContext(options.context), + input as string | AgentInputItem[], + agent, + options.maxTurns ?? DEFAULT_MAX_TURNS, + ); // Initialize the streamed result with existing state const result = new StreamedRunResult({ @@ -1073,7 +1138,7 @@ export class Runner extends RunHooks> { result.maxTurns = options.maxTurns ?? state._maxTurns; // Continue the stream loop without blocking - this.#runStreamLoop(result, options).then( + this.#runStreamLoop(result, options, isResumedState).then( () => { result._done(); }, diff --git a/packages/agents-core/test/run.stream.test.ts b/packages/agents-core/test/run.stream.test.ts index 4cde2266..911e1170 100644 --- a/packages/agents-core/test/run.stream.test.ts +++ b/packages/agents-core/test/run.stream.test.ts @@ -583,6 +583,67 @@ describe('Runner.run (streaming)', () => { }); }); + it('does not resend prior items when resuming a streamed run with conversationId', async () => { + const approvalTool = tool({ + name: 'test', + description: 'approval tool', + parameters: z.object({ test: z.string() }), + needsApproval: async () => true, + execute: async ({ test }) => `result:${test}`, + }); + + const model = new TrackingStreamingModel([ + buildTurn([buildToolCall('call-stream', 'foo')], 'resp-stream-1'), + buildTurn([fakeModelMessage('done')], 'resp-stream-2'), + ]); + + const agent = new Agent({ + name: 'StreamApprovalAgent', + model, + tools: [approvalTool], + }); + + const runner = new Runner(); + const firstResult = await runner.run(agent, 'user_message', { + stream: true, + conversationId: 'conv-stream-approval', + }); + + await drain(firstResult); + + expect(firstResult.interruptions).toHaveLength(1); + const approvalItem = firstResult.interruptions[0]; + firstResult.state.approve(approvalItem); + + const secondResult = await runner.run(agent, firstResult.state, { + stream: true, + conversationId: 'conv-stream-approval', + }); + + await drain(secondResult); + + expect(secondResult.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + expect(model.requests.map((req) => req.conversationId)).toEqual([ + 'conv-stream-approval', + 'conv-stream-approval', + ]); + + const firstInput = model.requests[0].input as AgentInputItem[]; + expect(firstInput).toHaveLength(1); + expect(firstInput[0]).toMatchObject({ + role: 'user', + content: 'user_message', + }); + + const secondInput = model.requests[1].input as AgentInputItem[]; + expect(secondInput).toHaveLength(1); + expect(secondInput[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-stream', + }); + }); + it('sends full history when no server-managed state is provided', async () => { const model = new TrackingStreamingModel([ buildTurn( diff --git a/packages/agents-core/test/run.test.ts b/packages/agents-core/test/run.test.ts index 8205baf6..cec46347 100644 --- a/packages/agents-core/test/run.test.ts +++ b/packages/agents-core/test/run.test.ts @@ -33,7 +33,7 @@ import { RunContext } from '../src/runContext'; import { RunState } from '../src/runState'; import * as protocol from '../src/types/protocol'; import { Usage } from '../src/usage'; -import { tool } from '../src/tool'; +import { tool, hostedMcpTool } from '../src/tool'; import { FakeModel, fakeModelMessage, @@ -827,6 +827,287 @@ describe('Runner.run', () => { }); }); + it('does not resend prior items when resuming with conversationId', async () => { + const approvalTool = tool({ + name: 'test', + description: 'tool that requires approval', + parameters: z.object({ test: z.string() }), + needsApproval: async () => true, + execute: async ({ test }) => `result:${test}`, + }); + + const model = new TrackingModel([ + buildResponse([buildToolCall('call-approved', 'foo')], 'resp-1'), + buildResponse([fakeModelMessage('done')], 'resp-2'), + ]); + + const agent = new Agent({ + name: 'ApprovalAgent', + model, + tools: [approvalTool], + }); + + const runner = new Runner(); + const firstResult = await runner.run(agent, 'user_message', { + conversationId: 'conv-approval', + }); + + expect(firstResult.interruptions).toHaveLength(1); + const approvalItem = firstResult.interruptions[0]; + firstResult.state.approve(approvalItem); + + const secondResult = await runner.run(agent, firstResult.state, { + conversationId: 'conv-approval', + }); + + expect(secondResult.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + + const firstInput = model.requests[0].input; + expect(Array.isArray(firstInput)).toBe(true); + const firstItems = firstInput as AgentInputItem[]; + expect(firstItems).toHaveLength(1); + expect(firstItems[0]).toMatchObject({ + role: 'user', + content: 'user_message', + }); + + const secondRequest = model.requests[1]; + expect(secondRequest.conversationId).toBe('conv-approval'); + expect(Array.isArray(secondRequest.input)).toBe(true); + const secondItems = secondRequest.input as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-approved', + }); + }); + + it('does not resend prior items when resuming with previousResponseId', async () => { + const approvalTool = tool({ + name: 'test', + description: 'tool that requires approval', + parameters: z.object({ test: z.string() }), + needsApproval: async () => true, + execute: async ({ test }) => `result:${test}`, + }); + + const model = new TrackingModel([ + buildResponse([buildToolCall('call-prev', 'foo')], 'resp-prev-1'), + buildResponse([fakeModelMessage('done')], 'resp-prev-2'), + ]); + + const agent = new Agent({ + name: 'ApprovalPrevAgent', + model, + tools: [approvalTool], + }); + + const runner = new Runner(); + const firstResult = await runner.run(agent, 'user_message', { + previousResponseId: 'initial-response', + }); + + expect(firstResult.interruptions).toHaveLength(1); + const approvalItem = firstResult.interruptions[0]; + firstResult.state.approve(approvalItem); + + const secondResult = await runner.run(agent, firstResult.state, { + previousResponseId: 'initial-response', + }); + + expect(secondResult.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + + expect(model.requests[0].previousResponseId).toBe('initial-response'); + + const secondRequest = model.requests[1]; + expect(secondRequest.previousResponseId).toBe('resp-prev-1'); + expect(Array.isArray(secondRequest.input)).toBe(true); + const secondItems = secondRequest.input as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-prev', + }); + }); + + it('does not resend items when resuming multiple times without new approvals', async () => { + const approvalTool = tool({ + name: 'test', + description: 'approval tool', + parameters: z.object({ test: z.string() }), + needsApproval: async () => true, + execute: async ({ test }) => `result:${test}`, + }); + + const model = new TrackingModel([ + buildResponse([buildToolCall('call-repeat', 'foo')], 'resp-repeat-1'), + buildResponse([fakeModelMessage('done')], 'resp-repeat-2'), + ]); + + const agent = new Agent({ + name: 'RepeatAgent', + model, + tools: [approvalTool], + }); + + const runner = new Runner(); + const firstResult = await runner.run(agent, 'user_message', { + conversationId: 'conv-repeat', + }); + + expect(firstResult.interruptions).toHaveLength(1); + const approvalItem = firstResult.interruptions[0]; + firstResult.state.approve(approvalItem); + + const secondResult = await runner.run(agent, firstResult.state, { + conversationId: 'conv-repeat', + }); + + expect(secondResult.finalOutput).toBe('done'); + + const thirdResult = await runner.run(agent, secondResult.state, { + conversationId: 'conv-repeat', + }); + + expect(thirdResult.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + }); + + it('sends newly appended generated items when resuming', async () => { + const approvalTool = tool({ + name: 'test', + description: 'approval tool', + parameters: z.object({ test: z.string() }), + needsApproval: async () => true, + execute: async ({ test }) => `result:${test}`, + }); + + const model = new TrackingModel([ + buildResponse([buildToolCall('call-extra', 'foo')], 'resp-extra-1'), + buildResponse([fakeModelMessage('done')], 'resp-extra-2'), + ]); + + const agent = new Agent({ + name: 'ExtraAgent', + model, + tools: [approvalTool], + }); + + const runner = new Runner(); + const firstResult = await runner.run(agent, 'user_message', { + conversationId: 'conv-extra', + }); + + expect(firstResult.interruptions).toHaveLength(1); + const approvalItem = firstResult.interruptions[0]; + + const extraMessage = new MessageOutputItem( + fakeModelMessage('cached note'), + agent, + ); + firstResult.state._generatedItems.push(extraMessage); + + firstResult.state.approve(approvalItem); + + const secondResult = await runner.run(agent, firstResult.state, { + conversationId: 'conv-extra', + }); + + expect(secondResult.finalOutput).toBe('done'); + expect(model.requests).toHaveLength(2); + + const secondItems = model.requests[1].input as AgentInputItem[]; + expect(secondItems).toHaveLength(2); + expect(secondItems[0]).toMatchObject({ + type: 'message', + content: expect.arrayContaining([ + expect.objectContaining({ text: 'cached note' }), + ]), + }); + expect(secondItems[1]).toMatchObject({ + type: 'function_call_result', + callId: 'call-extra', + }); + }); + + it('sends only approved items when mixing function and MCP approvals', async () => { + const functionTool = tool({ + name: 'test', + description: 'function tool', + parameters: z.object({ test: z.string() }), + needsApproval: async () => true, + execute: async ({ test }) => `result:${test}`, + }); + + const mcpTool = hostedMcpTool({ + serverLabel: 'demo_server', + serverUrl: 'https://example.com', + requireApproval: { + always: { toolNames: ['demo_tool'] }, + }, + }); + + const mcpApprovalCall: protocol.HostedToolCallItem = { + type: 'hosted_tool_call', + id: 'approval-id', + name: 'mcp_approval_request', + status: 'completed', + providerData: { + type: 'mcp_approval_request', + server_label: 'demo_server', + name: 'demo_tool', + id: 'approval-id', + arguments: '{}', + }, + } as protocol.HostedToolCallItem; + + const model = new TrackingModel([ + buildResponse( + [mcpApprovalCall, buildToolCall('call-mixed', 'foo')], + 'resp-mixed-1', + ), + buildResponse([fakeModelMessage('still waiting')], 'resp-mixed-2'), + ]); + + const agent = new Agent({ + name: 'MixedAgent', + model, + tools: [functionTool, mcpTool], + }); + + const runner = new Runner(); + const firstResult = await runner.run(agent, 'user_message', { + conversationId: 'conv-mixed', + }); + + const functionApproval = firstResult.interruptions.find( + (item) => item.rawItem.type === 'function_call', + ); + const mcpApproval = firstResult.interruptions.find( + (item) => item.rawItem.type === 'hosted_tool_call', + ); + + expect(functionApproval).toBeDefined(); + expect(mcpApproval).toBeDefined(); + + firstResult.state.approve(functionApproval!); + + const secondResult = await runner.run(agent, firstResult.state, { + conversationId: 'conv-mixed', + }); + + expect(model.requests).toHaveLength(2); + const secondItems = model.requests[1].input as AgentInputItem[]; + expect(secondItems).toHaveLength(1); + expect(secondItems[0]).toMatchObject({ + type: 'function_call_result', + callId: 'call-mixed', + }); + expect(secondResult.finalOutput).toBe('still waiting'); + }); + it('sends full history when no server-managed state is provided', async () => { const model = new TrackingModel([ buildResponse(