From 9ad2949661e4c5767aaab5908408dd014ccf385e Mon Sep 17 00:00:00 2001 From: Seth Gilchrist Date: Tue, 9 Sep 2025 00:00:00 -0700 Subject: [PATCH 1/2] Save session on turn rather than at final response (#1550) --- src/agents/run.py | 32 +++++-------- tests/test_agent_runner.py | 97 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 19 deletions(-) diff --git a/src/agents/run.py b/src/agents/run.py index 4575edb3f..e92e1c2ef 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -438,6 +438,9 @@ async def run( current_agent = starting_agent should_run_agent_start_hooks = True + # save the original input to the session if enabled + await self._save_result_to_session(session, original_input, []) + try: while True: all_tools = await AgentRunner._get_all_tools(current_agent, context_wrapper) @@ -537,9 +540,7 @@ async def run( output_guardrail_results=output_guardrail_results, context_wrapper=context_wrapper, ) - - # Save the conversation to session if enabled - await self._save_result_to_session(session, input, result) + await self._save_result_to_session(session, [], turn_result.new_step_items) return result elif isinstance(turn_result.next_step, NextStepHandoff): @@ -548,7 +549,7 @@ async def run( current_span = None should_run_agent_start_hooks = True elif isinstance(turn_result.next_step, NextStepRunAgain): - pass + await self._save_result_to_session(session, [], turn_result.new_step_items) else: raise AgentsException( f"Unknown next step type: {type(turn_result.next_step)}" @@ -784,6 +785,8 @@ async def _start_streaming( # Update the streamed result with the prepared input streamed_result.input = prepared_input + await AgentRunner._save_result_to_session(session, starting_input, []) + while True: if streamed_result.is_complete: break @@ -887,24 +890,15 @@ async def _start_streaming( streamed_result.is_complete = True # Save the conversation to session if enabled - # Create a temporary RunResult for session saving - temp_result = RunResult( - input=streamed_result.input, - new_items=streamed_result.new_items, - raw_responses=streamed_result.raw_responses, - final_output=streamed_result.final_output, - _last_agent=current_agent, - input_guardrail_results=streamed_result.input_guardrail_results, - output_guardrail_results=streamed_result.output_guardrail_results, - context_wrapper=context_wrapper, - ) await AgentRunner._save_result_to_session( - session, starting_input, temp_result + session, [], turn_result.new_step_items ) streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) elif isinstance(turn_result.next_step, NextStepRunAgain): - pass + await AgentRunner._save_result_to_session( + session, [], turn_result.new_step_items + ) except AgentsException as exc: streamed_result.is_complete = True streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) @@ -1510,7 +1504,7 @@ async def _save_result_to_session( cls, session: Session | None, original_input: str | list[TResponseInputItem], - result: RunResult, + new_items: list[RunItem], ) -> None: """Save the conversation turn to session.""" if session is None: @@ -1520,7 +1514,7 @@ async def _save_result_to_session( input_list = ItemHelpers.input_to_new_input_list(original_input) # Convert new items to input format - new_items_as_input = [item.to_input_item() for item in result.new_items] + new_items_as_input = [item.to_input_item() for item in new_items] # Save all items from this turn items_to_save = input_list + new_items_as_input diff --git a/tests/test_agent_runner.py b/tests/test_agent_runner.py index c8ae5b5f2..887defa5b 100644 --- a/tests/test_agent_runner.py +++ b/tests/test_agent_runner.py @@ -1,7 +1,10 @@ from __future__ import annotations import json +import tempfile +from pathlib import Path from typing import Any +from unittest.mock import patch import pytest from typing_extensions import TypedDict @@ -20,6 +23,7 @@ RunConfig, RunContextWrapper, Runner, + SQLiteSession, UserError, handoff, ) @@ -780,3 +784,96 @@ async def add_tool() -> str: assert executed["called"] is True assert result.final_output == "done" + + +@pytest.mark.asyncio +async def test_session_add_items_called_multiple_times_for_multi_turn_completion(): + """Test that SQLiteSession.add_items is called multiple times + during a multi-turn agent completion. + + """ + with tempfile.TemporaryDirectory() as temp_dir: + db_path = Path(temp_dir) / "test_agent_runner_session_multi_turn_calls.db" + session_id = "runner_session_multi_turn_calls" + session = SQLiteSession(session_id, db_path) + + # Define a tool that will be called by the orchestrator agent + @function_tool + async def echo_tool(text: str) -> str: + return f"Echo: {text}" + + # Orchestrator agent that calls the tool multiple times in one completion + orchestrator_agent = Agent( + name="orchestrator_agent", + instructions=( + "Call echo_tool twice with inputs of 'foo' and 'bar', then return a summary." + ), + tools=[echo_tool], + ) + + # Patch the model to simulate two tool calls and a final message + model = FakeModel() + orchestrator_agent.model = model + model.add_multiple_turn_outputs( + [ + # First turn: tool call + [get_function_tool_call("echo_tool", json.dumps({"text": "foo"}), call_id="1")], + # Second turn: tool call + [get_function_tool_call("echo_tool", json.dumps({"text": "bar"}), call_id="2")], + # Third turn: final output + [get_final_output_message("Summary: Echoed foo and bar")], + ] + ) + + # Patch add_items to count calls + with patch.object(SQLiteSession, "add_items", wraps=session.add_items) as mock_add_items: + result = await Runner.run(orchestrator_agent, input="foo and bar", session=session) + + expected_items = [ + {"content": "foo and bar", "role": "user"}, + { + "arguments": '{"text": "foo"}', + "call_id": "1", + "name": "echo_tool", + "type": "function_call", + "id": "1", + }, + {"call_id": "1", "output": "Echo: foo", "type": "function_call_output"}, + { + "arguments": '{"text": "bar"}', + "call_id": "2", + "name": "echo_tool", + "type": "function_call", + "id": "1", + }, + {"call_id": "2", "output": "Echo: bar", "type": "function_call_output"}, + { + "id": "1", + "content": [ + { + "annotations": [], + "text": "Summary: Echoed foo and bar", + "type": "output_text", + } + ], + "role": "assistant", + "status": "completed", + "type": "message", + }, + ] + + expected_calls = [ + # First call is the initial input + (([expected_items[0]],),), + # Second call is the first tool call and its result + (([expected_items[1], expected_items[2]],),), + # Third call is the second tool call and its result + (([expected_items[3], expected_items[4]],),), + # Fourth call is the final output + (([expected_items[5]],),), + ] + assert mock_add_items.call_args_list == expected_calls + assert result.final_output == "Summary: Echoed foo and bar" + assert (await session.get_items()) == expected_items + + session.close() From 5a9cab876b4d4b37d6d8d9ef57a6a9040aabf8e9 Mon Sep 17 00:00:00 2001 From: Wen-Tien Chang Date: Tue, 9 Sep 2025 15:00:38 +0800 Subject: [PATCH 2/2] Prevent preamble messages from being treated as final output when tool calls are pending (#1689) Co-authored-by: Kazuhiro Sera --- src/agents/_run_impl.py | 71 ++++++++++++++--------------- tests/test_agent_runner.py | 12 +++-- tests/test_agent_runner_streamed.py | 23 +++++----- 3 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/agents/_run_impl.py b/src/agents/_run_impl.py index 56784004c..a2d872bf1 100644 --- a/src/agents/_run_impl.py +++ b/src/agents/_run_impl.py @@ -330,43 +330,40 @@ async def execute_tools_and_side_effects( ItemHelpers.extract_last_text(message_items[-1].raw_item) if message_items else None ) - # There are two possibilities that lead to a final output: - # 1. Structured output schema => always leads to a final output - # 2. Plain text output schema => only leads to a final output if there are no tool calls - if output_schema and not output_schema.is_plain_text() and potential_final_output_text: - final_output = output_schema.validate_json(potential_final_output_text) - return await cls.execute_final_output( - agent=agent, - original_input=original_input, - new_response=new_response, - pre_step_items=pre_step_items, - new_step_items=new_step_items, - final_output=final_output, - hooks=hooks, - context_wrapper=context_wrapper, - ) - elif ( - not output_schema or output_schema.is_plain_text() - ) and not processed_response.has_tools_or_approvals_to_run(): - return await cls.execute_final_output( - agent=agent, - original_input=original_input, - new_response=new_response, - pre_step_items=pre_step_items, - new_step_items=new_step_items, - final_output=potential_final_output_text or "", - hooks=hooks, - context_wrapper=context_wrapper, - ) - else: - # If there's no final output, we can just run again - return SingleStepResult( - original_input=original_input, - model_response=new_response, - pre_step_items=pre_step_items, - new_step_items=new_step_items, - next_step=NextStepRunAgain(), - ) + # Generate final output only when there are no pending tool calls or approval requests. + if not processed_response.has_tools_or_approvals_to_run(): + if output_schema and not output_schema.is_plain_text() and potential_final_output_text: + final_output = output_schema.validate_json(potential_final_output_text) + return await cls.execute_final_output( + agent=agent, + original_input=original_input, + new_response=new_response, + pre_step_items=pre_step_items, + new_step_items=new_step_items, + final_output=final_output, + hooks=hooks, + context_wrapper=context_wrapper, + ) + elif not output_schema or output_schema.is_plain_text(): + return await cls.execute_final_output( + agent=agent, + original_input=original_input, + new_response=new_response, + pre_step_items=pre_step_items, + new_step_items=new_step_items, + final_output=potential_final_output_text or "", + hooks=hooks, + context_wrapper=context_wrapper, + ) + + # If there's no final output, we can just run again + return SingleStepResult( + original_input=original_input, + model_response=new_response, + pre_step_items=pre_step_items, + new_step_items=new_step_items, + next_step=NextStepRunAgain(), + ) @classmethod def maybe_reset_tool_choice( diff --git a/tests/test_agent_runner.py b/tests/test_agent_runner.py index 887defa5b..661afd6ef 100644 --- a/tests/test_agent_runner.py +++ b/tests/test_agent_runner.py @@ -196,11 +196,13 @@ async def test_structured_output(): [get_function_tool_call("foo", json.dumps({"bar": "baz"}))], # Second turn: a message and a handoff [get_text_message("a_message"), get_handoff_tool_call(agent_1)], - # Third turn: tool call and structured output + # Third turn: tool call with preamble message [ + get_text_message(json.dumps(Foo(bar="preamble"))), get_function_tool_call("bar", json.dumps({"bar": "baz"})), - get_final_output_message(json.dumps(Foo(bar="baz"))), ], + # Fourth turn: structured output + [get_final_output_message(json.dumps(Foo(bar="baz")))], ] ) @@ -213,10 +215,10 @@ async def test_structured_output(): ) assert result.final_output == Foo(bar="baz") - assert len(result.raw_responses) == 3, "should have three model responses" - assert len(result.to_input_list()) == 10, ( + assert len(result.raw_responses) == 4, "should have four model responses" + assert len(result.to_input_list()) == 11, ( "should have input: 2 orig inputs, function call, function call result, message, handoff, " - "handoff output, tool call, tool call result, final output message" + "handoff output, preamble message, tool call, tool call result, final output" ) assert result.last_agent == agent_1, "should have handed off to agent_1" diff --git a/tests/test_agent_runner_streamed.py b/tests/test_agent_runner_streamed.py index d4afbd2e0..ff807ca96 100644 --- a/tests/test_agent_runner_streamed.py +++ b/tests/test_agent_runner_streamed.py @@ -207,11 +207,13 @@ async def test_structured_output(): [get_function_tool_call("foo", json.dumps({"bar": "baz"}))], # Second turn: a message and a handoff [get_text_message("a_message"), get_handoff_tool_call(agent_1)], - # Third turn: tool call and structured output + # Third turn: tool call with preamble message [ + get_text_message(json.dumps(Foo(bar="preamble"))), get_function_tool_call("bar", json.dumps({"bar": "baz"})), - get_final_output_message(json.dumps(Foo(bar="baz"))), ], + # Fourth turn: structured output + [get_final_output_message(json.dumps(Foo(bar="baz")))], ] ) @@ -226,10 +228,10 @@ async def test_structured_output(): pass assert result.final_output == Foo(bar="baz") - assert len(result.raw_responses) == 3, "should have three model responses" - assert len(result.to_input_list()) == 10, ( + assert len(result.raw_responses) == 4, "should have four model responses" + assert len(result.to_input_list()) == 11, ( "should have input: 2 orig inputs, function call, function call result, message, handoff, " - "handoff output, tool call, tool call result, final output" + "handoff output, preamble message, tool call, tool call result, final output" ) assert result.last_agent == agent_1, "should have handed off to agent_1" @@ -624,11 +626,10 @@ async def test_streaming_events(): [get_function_tool_call("foo", json.dumps({"bar": "baz"}))], # Second turn: a message and a handoff [get_text_message("a_message"), get_handoff_tool_call(agent_1)], - # Third turn: tool call and structured output - [ - get_function_tool_call("bar", json.dumps({"bar": "baz"})), - get_final_output_message(json.dumps(Foo(bar="baz"))), - ], + # Third turn: tool call + [get_function_tool_call("bar", json.dumps({"bar": "baz"}))], + # Fourth turn: structured output + [get_final_output_message(json.dumps(Foo(bar="baz")))], ] ) @@ -652,7 +653,7 @@ async def test_streaming_events(): agent_data.append(event) assert result.final_output == Foo(bar="baz") - assert len(result.raw_responses) == 3, "should have three model responses" + assert len(result.raw_responses) == 4, "should have four model responses" assert len(result.to_input_list()) == 10, ( "should have input: 2 orig inputs, function call, function call result, message, handoff, " "handoff output, tool call, tool call result, final output"