diff --git a/src/agents/_run_impl.py b/src/agents/_run_impl.py index 0368ffa197..7815b0f23a 100644 --- a/src/agents/_run_impl.py +++ b/src/agents/_run_impl.py @@ -244,7 +244,8 @@ class SingleStepResult: """Items generated before the current step.""" new_step_items: list[RunItem] - """Items generated during this current step.""" + """Items generated during this current step. May be filtered during handoffs to avoid + duplication in model input.""" next_step: NextStepHandoff | NextStepFinalOutput | NextStepRunAgain """The next step to take.""" @@ -255,11 +256,18 @@ class SingleStepResult: tool_output_guardrail_results: list[ToolOutputGuardrailResult] """Tool output guardrail results from this step.""" + session_step_items: list[RunItem] | None = None + """Full unfiltered items for session history. When set, these are used instead of + new_step_items for session saving and generated_items property.""" + @property def generated_items(self) -> list[RunItem]: """Items generated during the agent run (i.e. everything generated after - `original_input`).""" - return self.pre_step_items + self.new_step_items + `original_input`). Uses session_step_items when available for full observability.""" + items = ( + self.session_step_items if self.session_step_items is not None else self.new_step_items + ) + return self.pre_step_items + items def get_model_tracing_impl( @@ -1286,6 +1294,12 @@ async def execute_handoffs( ) pre_step_items = list(filtered.pre_handoff_items) new_step_items = list(filtered.new_items) + # For custom input filters, use input_items if available, otherwise new_items + if filtered.input_items is not None: + session_step_items = list(filtered.new_items) + new_step_items = list(filtered.input_items) + else: + session_step_items = None elif should_nest_history and handoff_input_data is not None: nested = nest_handoff_history( handoff_input_data, @@ -1297,7 +1311,16 @@ async def execute_handoffs( else list(nested.input_history) ) pre_step_items = list(nested.pre_handoff_items) - new_step_items = list(nested.new_items) + # Keep full new_items for session history. + session_step_items = list(nested.new_items) + # Use input_items (filtered) for model input if available. + if nested.input_items is not None: + new_step_items = list(nested.input_items) + else: + new_step_items = session_step_items + else: + # No filtering or nesting - session_step_items not needed + session_step_items = None return SingleStepResult( original_input=original_input, @@ -1307,6 +1330,7 @@ async def execute_handoffs( next_step=NextStepHandoff(new_agent), tool_input_guardrail_results=[], tool_output_guardrail_results=[], + session_step_items=session_step_items, ) @classmethod diff --git a/src/agents/handoffs/__init__.py b/src/agents/handoffs/__init__.py index 0876bfa581..11372dde0e 100644 --- a/src/agents/handoffs/__init__.py +++ b/src/agents/handoffs/__init__.py @@ -62,6 +62,13 @@ class HandoffInputData: later on, it is optional for backwards compatibility. """ + input_items: tuple[RunItem, ...] | None = None + """ + Items to include in the next agent's input. When set, these items are used instead of + new_items for building the input to the next agent. This allows filtering duplicates + from agent input while preserving all items in new_items for session history. + """ + def clone(self, **kwargs: Any) -> HandoffInputData: """ Make a copy of the handoff input data, with the given arguments changed. For example, you diff --git a/src/agents/handoffs/history.py b/src/agents/handoffs/history.py index e2623c471f..9dd164cac5 100644 --- a/src/agents/handoffs/history.py +++ b/src/agents/handoffs/history.py @@ -26,6 +26,13 @@ _conversation_history_start = _DEFAULT_CONVERSATION_HISTORY_START _conversation_history_end = _DEFAULT_CONVERSATION_HISTORY_END +# Item types that are summarized in the conversation history. +# They should not be forwarded verbatim to the next agent to avoid duplication. +_SUMMARY_ONLY_INPUT_TYPES = { + "function_call", + "function_call_output", +} + def set_conversation_history_wrappers( *, @@ -67,23 +74,34 @@ def nest_handoff_history( normalized_history = _normalize_input_history(handoff_input_data.input_history) flattened_history = _flatten_nested_history_messages(normalized_history) - pre_items_as_inputs = [ - _run_item_to_plain_input(item) for item in handoff_input_data.pre_handoff_items - ] - new_items_as_inputs = [_run_item_to_plain_input(item) for item in handoff_input_data.new_items] + + # Convert items to plain inputs for the transcript summary. + pre_items_as_inputs: list[TResponseInputItem] = [] + filtered_pre_items: list[RunItem] = [] + for run_item in handoff_input_data.pre_handoff_items: + plain_input = _run_item_to_plain_input(run_item) + pre_items_as_inputs.append(plain_input) + if _should_forward_pre_item(plain_input): + filtered_pre_items.append(run_item) + + new_items_as_inputs: list[TResponseInputItem] = [] + filtered_input_items: list[RunItem] = [] + for run_item in handoff_input_data.new_items: + plain_input = _run_item_to_plain_input(run_item) + new_items_as_inputs.append(plain_input) + if _should_forward_new_item(plain_input): + filtered_input_items.append(run_item) + transcript = flattened_history + pre_items_as_inputs + new_items_as_inputs mapper = history_mapper or default_handoff_history_mapper history_items = mapper(transcript) - filtered_pre_items = tuple( - item - for item in handoff_input_data.pre_handoff_items - if _get_run_item_role(item) != "assistant" - ) return handoff_input_data.clone( input_history=tuple(deepcopy(item) for item in history_items), - pre_handoff_items=filtered_pre_items, + pre_handoff_items=tuple(filtered_pre_items), + # new_items stays unchanged for session history. + input_items=tuple(filtered_input_items), ) @@ -231,6 +249,20 @@ def _split_role_and_name(role_text: str) -> tuple[str, str | None]: return (role_text or "developer", None) -def _get_run_item_role(run_item: RunItem) -> str | None: - role_candidate = run_item.to_input_item().get("role") - return role_candidate if isinstance(role_candidate, str) else None +def _should_forward_pre_item(input_item: TResponseInputItem) -> bool: + """Return False when the previous transcript item is represented in the summary.""" + role_candidate = input_item.get("role") + if isinstance(role_candidate, str) and role_candidate == "assistant": + return False + type_candidate = input_item.get("type") + return not (isinstance(type_candidate, str) and type_candidate in _SUMMARY_ONLY_INPUT_TYPES) + + +def _should_forward_new_item(input_item: TResponseInputItem) -> bool: + """Return False for tool or side-effect items that the summary already covers.""" + # Items with a role should always be forwarded. + role_candidate = input_item.get("role") + if isinstance(role_candidate, str) and role_candidate: + return True + type_candidate = input_item.get("type") + return not (isinstance(type_candidate, str) and type_candidate in _SUMMARY_ONLY_INPUT_TYPES) diff --git a/src/agents/run.py b/src/agents/run.py index 145169f6e9..839eea52e3 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -583,7 +583,8 @@ async def run( ): current_turn = 0 original_input: str | list[TResponseInputItem] = _copy_str_or_list(prepared_input) - generated_items: list[RunItem] = [] + generated_items: list[RunItem] = [] # For model input (may be filtered on handoffs) + session_items: list[RunItem] = [] # For observability (always unfiltered) model_responses: list[ModelResponse] = [] context_wrapper: RunContextWrapper[TContext] = RunContextWrapper( @@ -705,7 +706,15 @@ async def run( model_responses.append(turn_result.model_response) original_input = turn_result.original_input - generated_items = turn_result.generated_items + # For model input, use new_step_items (filtered on handoffs) + generated_items = turn_result.pre_step_items + turn_result.new_step_items + # Accumulate unfiltered items for observability + session_items_for_turn = ( + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items + ) + session_items.extend(session_items_for_turn) if server_conversation_tracker is not None: server_conversation_tracker.track_server_items(turn_result.model_response) @@ -725,7 +734,7 @@ async def run( ) result = RunResult( input=original_input, - new_items=generated_items, + new_items=session_items, # Use unfiltered items for observability raw_responses=model_responses, final_output=turn_result.next_step.output, _last_agent=current_agent, @@ -740,7 +749,11 @@ async def run( for guardrail_result in input_guardrail_results ): await self._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items, ) return result @@ -752,7 +765,11 @@ async def run( for guardrail_result in input_guardrail_results ): await self._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items, ) current_agent = cast(Agent[TContext], turn_result.next_step.new_agent) current_span.finish(reset_current=True) @@ -764,7 +781,11 @@ async def run( for guardrail_result in input_guardrail_results ): await self._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items, ) else: raise AgentsException( @@ -780,7 +801,7 @@ async def run( except AgentsException as exc: exc.run_data = RunErrorDetails( input=original_input, - new_items=generated_items, + new_items=session_items, # Use unfiltered items for observability raw_responses=model_responses, last_agent=current_agent, context_wrapper=context_wrapper, @@ -1218,7 +1239,13 @@ async def _start_streaming( turn_result.model_response ] streamed_result.input = turn_result.original_input - streamed_result.new_items = turn_result.generated_items + # Accumulate unfiltered items for observability + session_items_for_turn = ( + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items + ) + streamed_result.new_items.extend(session_items_for_turn) if server_conversation_tracker is not None: server_conversation_tracker.track_server_items(turn_result.model_response) @@ -1234,7 +1261,11 @@ async def _start_streaming( ) if should_skip_session_save is False: await AgentRunner._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items, ) current_agent = turn_result.next_step.new_agent @@ -1280,7 +1311,11 @@ async def _start_streaming( ) if should_skip_session_save is False: await AgentRunner._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items, ) streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) @@ -1293,7 +1328,11 @@ async def _start_streaming( ) if should_skip_session_save is False: await AgentRunner._save_result_to_session( - session, [], turn_result.new_step_items + session, + [], + turn_result.session_step_items + if turn_result.session_step_items is not None + else turn_result.new_step_items, ) # Check for soft cancel after turn completion @@ -1745,10 +1784,15 @@ async def _get_single_step_result_from_streamed_response( context_wrapper=context_wrapper, run_config=run_config, ) + # Use session_step_items (unfiltered) if available for streaming observability, + # otherwise fall back to new_step_items. + streaming_items = ( + single_step_result.session_step_items + if single_step_result.session_step_items is not None + else single_step_result.new_step_items + ) new_step_items = [ - item - for item in single_step_result.new_step_items - if item not in new_items_processed_response + item for item in streaming_items if item not in new_items_processed_response ] RunImpl.stream_step_items_to_queue(new_step_items, event_queue) diff --git a/tests/test_agent_runner_streamed.py b/tests/test_agent_runner_streamed.py index 222afda78c..a520140a7c 100644 --- a/tests/test_agent_runner_streamed.py +++ b/tests/test_agent_runner_streamed.py @@ -737,7 +737,8 @@ async def test_streaming_events(): "tool_call_output": 2, "message": 2, # get_text_message("a_message") + get_final_output_message(...) "handoff": 1, # get_handoff_tool_call(agent_1) - "handoff_output": 1, # handoff_output_item + # handoff_output is summarized in conversation history, not duplicated as raw item + "handoff_output": 0, } total_expected_item_count = sum(expected_item_type_map.values()) diff --git a/tests/test_handoff_history_duplication.py b/tests/test_handoff_history_duplication.py new file mode 100644 index 0000000000..617c7ef710 --- /dev/null +++ b/tests/test_handoff_history_duplication.py @@ -0,0 +1,276 @@ +"""Tests for handoff history duplication fix (Issue #2171). + +These tests verify that when nest_handoff_history is enabled, +function_call and function_call_output items are NOT duplicated +in the input sent to the next agent. +""" + +from typing import Any, cast + +from openai.types.responses import ( + ResponseFunctionToolCall, + ResponseOutputMessage, + ResponseOutputText, +) + +from agents import Agent +from agents.handoffs import HandoffInputData, nest_handoff_history +from agents.items import ( + HandoffCallItem, + HandoffOutputItem, + MessageOutputItem, + ToolCallItem, + ToolCallOutputItem, +) + + +def _create_mock_agent() -> Agent: + """Create a mock agent for testing.""" + return Agent(name="test_agent") + + +def _create_tool_call_item(agent: Agent) -> ToolCallItem: + """Create a mock ToolCallItem.""" + raw_item = ResponseFunctionToolCall( + id="call_tool_123", + call_id="call_tool_123", + name="get_weather", + arguments='{"city": "London"}', + type="function_call", + ) + return ToolCallItem(agent=agent, raw_item=raw_item, type="tool_call_item") + + +def _create_tool_output_item(agent: Agent) -> ToolCallOutputItem: + """Create a mock ToolCallOutputItem.""" + raw_item = { + "type": "function_call_output", + "call_id": "call_tool_123", + "output": "Sunny, 22°C", + } + return ToolCallOutputItem( + agent=agent, + raw_item=raw_item, + output="Sunny, 22°C", + type="tool_call_output_item", + ) + + +def _create_handoff_call_item(agent: Agent) -> HandoffCallItem: + """Create a mock HandoffCallItem.""" + raw_item = ResponseFunctionToolCall( + id="call_handoff_456", + call_id="call_handoff_456", + name="transfer_to_agent_b", + arguments="{}", + type="function_call", + ) + return HandoffCallItem(agent=agent, raw_item=raw_item, type="handoff_call_item") + + +def _create_handoff_output_item(agent: Agent[Any]) -> HandoffOutputItem: + """Create a mock HandoffOutputItem.""" + raw_item: dict[str, str] = { + "type": "function_call_output", + "call_id": "call_handoff_456", + "output": '{"assistant": "agent_b"}', + } + return HandoffOutputItem( + agent=agent, + raw_item=cast(Any, raw_item), + source_agent=agent, + target_agent=agent, + type="handoff_output_item", + ) + + +def _create_message_item(agent: Agent) -> MessageOutputItem: + """Create a mock MessageOutputItem.""" + raw_item = ResponseOutputMessage( + id="msg_123", + content=[ResponseOutputText(text="Hello!", type="output_text", annotations=[])], + role="assistant", + status="completed", + type="message", + ) + return MessageOutputItem(agent=agent, raw_item=raw_item, type="message_output_item") + + +class TestHandoffHistoryDuplicationFix: + """Tests for Issue #2171: nest_handoff_history duplication fix.""" + + def test_pre_handoff_tool_items_are_filtered(self): + """Verify ToolCallItem and ToolCallOutputItem in pre_handoff_items are filtered. + + These items should NOT appear in the filtered output because they are + already included in the summary message. + """ + agent = _create_mock_agent() + + handoff_data = HandoffInputData( + input_history=({"role": "user", "content": "Hello"},), + pre_handoff_items=( + _create_tool_call_item(agent), + _create_tool_output_item(agent), + ), + new_items=(), + ) + + nested = nest_handoff_history(handoff_data) + + # pre_handoff_items should be empty (tool items filtered) + assert len(nested.pre_handoff_items) == 0, ( + "ToolCallItem and ToolCallOutputItem should be filtered from pre_handoff_items" + ) + + # Summary should contain the conversation + assert len(nested.input_history) == 1 + first_item = nested.input_history[0] + assert isinstance(first_item, dict) + assert "" in str(first_item.get("content", "")) + + def test_new_items_handoff_output_is_filtered_for_input(self): + """Verify HandoffOutputItem in new_items is filtered from input_items. + + The HandoffOutputItem is a function_call_output which would be duplicated. + It should be filtered from input_items but preserved in new_items. + """ + agent = _create_mock_agent() + + handoff_data = HandoffInputData( + input_history=({"role": "user", "content": "Hello"},), + pre_handoff_items=(), + new_items=( + _create_handoff_call_item(agent), + _create_handoff_output_item(agent), + ), + ) + + nested = nest_handoff_history(handoff_data) + + # new_items should still have both items (for session history) + assert len(nested.new_items) == 2, "new_items should preserve all items for session history" + + # input_items should be populated and filtered + assert nested.input_items is not None, "input_items should be populated" + + # input_items should NOT contain HandoffOutputItem (it's function_call_output) + has_handoff_output = any(isinstance(item, HandoffOutputItem) for item in nested.input_items) + assert not has_handoff_output, "HandoffOutputItem should be filtered from input_items" + + def test_message_items_are_preserved_in_new_items(self): + """Verify MessageOutputItem in new_items is preserved. + + Message items have a 'role' and should NOT be filtered from input_items. + Note: pre_handoff_items are converted to summary text regardless of type. + """ + agent = _create_mock_agent() + + handoff_data = HandoffInputData( + input_history=({"role": "user", "content": "Hello"},), + pre_handoff_items=(), # pre_handoff items go into summary + new_items=(_create_message_item(agent),), + ) + + nested = nest_handoff_history(handoff_data) + + # Message items should be preserved in new_items + assert len(nested.new_items) == 1, "MessageOutputItem should be preserved in new_items" + # And in input_items (since it has a role) + assert nested.input_items is not None + assert len(nested.input_items) == 1, "MessageOutputItem should be preserved in input_items" + assert isinstance(nested.input_items[0], MessageOutputItem) + + def test_summary_contains_filtered_items_as_text(self): + """Verify the summary message contains the filtered tool items as text. + + This ensures observability - the items are not lost, just converted to text. + """ + agent = _create_mock_agent() + + handoff_data = HandoffInputData( + input_history=({"role": "user", "content": "Hello"},), + pre_handoff_items=( + _create_tool_call_item(agent), + _create_tool_output_item(agent), + ), + new_items=(), + ) + + nested = nest_handoff_history(handoff_data) + + first_item = nested.input_history[0] + assert isinstance(first_item, dict) + summary = str(first_item.get("content", "")) + + # Summary should contain function_call reference + assert "function_call" in summary or "get_weather" in summary, ( + "Summary should contain the tool call that was filtered" + ) + + def test_input_items_field_exists_after_nesting(self): + """Verify the input_items field is populated after nest_handoff_history. + + This is the key field that separates model input from session history. + """ + agent = _create_mock_agent() + + handoff_data = HandoffInputData( + input_history=({"role": "user", "content": "Hello"},), + pre_handoff_items=(), + new_items=(_create_handoff_call_item(agent),), + ) + + nested = nest_handoff_history(handoff_data) + + assert nested.input_items is not None, ( + "input_items should be populated after nest_handoff_history" + ) + + def test_full_handoff_scenario_no_duplication(self): + """Full end-to-end test of the handoff scenario from Issue #2171. + + Simulates: User -> Agent does tool call -> Agent hands off to next agent + Verifies: Next agent receives summary only, no duplicate raw items. + """ + agent = _create_mock_agent() + + # Full scenario: tool call in pre_handoff, handoff in new_items + handoff_data = HandoffInputData( + input_history=({"role": "user", "content": "What's the weather?"},), + pre_handoff_items=( + _create_tool_call_item(agent), # function_call + _create_tool_output_item(agent), # function_call_output + ), + new_items=( + _create_message_item(agent), # assistant message + _create_handoff_call_item(agent), # function_call (handoff) + _create_handoff_output_item(agent), # function_call_output (handoff) + ), + ) + + nested = nest_handoff_history(handoff_data) + + # Count what would be sent to the model + total_model_items = ( + len(nested.input_history) # Summary + + len(nested.pre_handoff_items) # Filtered pre-handoff + + len(nested.input_items or []) # Filtered new items + ) + + # Before fix: would have 6+ items (summary + raw tool items) + # After fix: should have ~2 items (summary + message) + assert total_model_items <= 3, ( + f"Model should receive at most 3 items (summary + messages), got {total_model_items}" + ) + + # Verify no raw function_call_output items in model input + all_input_items = list(nested.pre_handoff_items) + list(nested.input_items or []) + function_call_outputs = [ + item + for item in all_input_items + if isinstance(item, (ToolCallOutputItem, HandoffOutputItem)) + ] + assert len(function_call_outputs) == 0, ( + "No function_call_output items should be in model input" + ) diff --git a/tests/test_soft_cancel.py b/tests/test_soft_cancel.py index 395f2fb6f3..ddb51f8f17 100644 --- a/tests/test_soft_cancel.py +++ b/tests/test_soft_cancel.py @@ -421,7 +421,7 @@ async def on_invoke_handoff(context, data): handoff_seen = False async for event in result.stream_events(): - if event.type == "run_item_stream_event" and event.name == "handoff_occured": + if event.type == "run_item_stream_event" and event.name == "handoff_requested": handoff_seen = True # Cancel right after handoff result.cancel(mode="after_turn")