Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ class SingleStepResult:
"""Items generated before the current step."""

new_step_items: list[RunItem]
"""Items generated during this current step."""
"""Items generated during this current step. May be filtered during handoffs to avoid
duplication in model input."""

next_step: NextStepHandoff | NextStepFinalOutput | NextStepRunAgain
"""The next step to take."""
Expand All @@ -256,11 +257,18 @@ class SingleStepResult:
tool_output_guardrail_results: list[ToolOutputGuardrailResult]
"""Tool output guardrail results from this step."""

session_step_items: list[RunItem] | None = None
"""Full unfiltered items for session history. When set, these are used instead of
new_step_items for session saving and generated_items property."""

@property
def generated_items(self) -> list[RunItem]:
"""Items generated during the agent run (i.e. everything generated after
`original_input`)."""
return self.pre_step_items + self.new_step_items
`original_input`). Uses session_step_items when available for full observability."""
items = (
self.session_step_items if self.session_step_items is not None else self.new_step_items
)
return self.pre_step_items + items


def get_model_tracing_impl(
Expand Down Expand Up @@ -1290,6 +1298,12 @@ async def execute_handoffs(
)
pre_step_items = list(filtered.pre_handoff_items)
new_step_items = list(filtered.new_items)
# For custom input filters, use input_items if available, otherwise new_items
if filtered.input_items is not None:
session_step_items = list(filtered.new_items)
new_step_items = list(filtered.input_items)
else:
session_step_items = None
elif should_nest_history and handoff_input_data is not None:
nested = nest_handoff_history(
handoff_input_data,
Expand All @@ -1301,7 +1315,16 @@ async def execute_handoffs(
else list(nested.input_history)
)
pre_step_items = list(nested.pre_handoff_items)
new_step_items = list(nested.new_items)
# Keep full new_items for session history.
session_step_items = list(nested.new_items)
# Use input_items (filtered) for model input if available.
if nested.input_items is not None:
new_step_items = list(nested.input_items)
else:
new_step_items = session_step_items
else:
# No filtering or nesting - session_step_items not needed
session_step_items = None

return SingleStepResult(
original_input=original_input,
Expand All @@ -1311,6 +1334,7 @@ async def execute_handoffs(
next_step=NextStepHandoff(new_agent),
tool_input_guardrail_results=[],
tool_output_guardrail_results=[],
session_step_items=session_step_items,
)

@classmethod
Expand Down
16 changes: 12 additions & 4 deletions src/agents/handoffs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class HandoffInputData:
later on, it is optional for backwards compatibility.
"""

input_items: tuple[RunItem, ...] | None = None
"""
Items to include in the next agent's input. When set, these items are used instead of
new_items for building the input to the next agent. This allows filtering duplicates
from agent input while preserving all items in new_items for session history.
"""

def clone(self, **kwargs: Any) -> HandoffInputData:
"""
Make a copy of the handoff input data, with the given arguments changed. For example, you
Expand Down Expand Up @@ -117,10 +124,11 @@ class Handoff(Generic[TContext, TAgent]):
filter inputs (for example, to remove older inputs or remove tools from existing inputs). The
function receives the entire conversation history so far, including the input item that
triggered the handoff and a tool call output item representing the handoff tool's output. You
are free to modify the input history or new items as you see fit. The next agent that runs will
receive ``handoff_input_data.all_items``. IMPORTANT: in streaming mode, we will not stream
anything as a result of this function. The items generated before will already have been
streamed.
are free to modify the input history or new items as you see fit. The next agent receives the
input history plus ``input_items`` when provided, otherwise it receives ``new_items``. Use
``input_items`` to filter model input while keeping ``new_items`` intact for session history.
IMPORTANT: in streaming mode, we will not stream anything as a result of this function. The
items generated before will already have been streamed.
"""

nest_handoff_history: bool | None = None
Expand Down
58 changes: 45 additions & 13 deletions src/agents/handoffs/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
_conversation_history_start = _DEFAULT_CONVERSATION_HISTORY_START
_conversation_history_end = _DEFAULT_CONVERSATION_HISTORY_END

# Item types that are summarized in the conversation history.
# They should not be forwarded verbatim to the next agent to avoid duplication.
_SUMMARY_ONLY_INPUT_TYPES = {
"function_call",
"function_call_output",
}


def set_conversation_history_wrappers(
*,
Expand Down Expand Up @@ -67,23 +74,34 @@ def nest_handoff_history(

normalized_history = _normalize_input_history(handoff_input_data.input_history)
flattened_history = _flatten_nested_history_messages(normalized_history)
pre_items_as_inputs = [
_run_item_to_plain_input(item) for item in handoff_input_data.pre_handoff_items
]
new_items_as_inputs = [_run_item_to_plain_input(item) for item in handoff_input_data.new_items]

# Convert items to plain inputs for the transcript summary.
pre_items_as_inputs: list[TResponseInputItem] = []
filtered_pre_items: list[RunItem] = []
for run_item in handoff_input_data.pre_handoff_items:
plain_input = _run_item_to_plain_input(run_item)
pre_items_as_inputs.append(plain_input)
if _should_forward_pre_item(plain_input):
filtered_pre_items.append(run_item)

new_items_as_inputs: list[TResponseInputItem] = []
filtered_input_items: list[RunItem] = []
for run_item in handoff_input_data.new_items:
plain_input = _run_item_to_plain_input(run_item)
new_items_as_inputs.append(plain_input)
if _should_forward_new_item(plain_input):
filtered_input_items.append(run_item)

transcript = flattened_history + pre_items_as_inputs + new_items_as_inputs

mapper = history_mapper or default_handoff_history_mapper
history_items = mapper(transcript)
filtered_pre_items = tuple(
item
for item in handoff_input_data.pre_handoff_items
if _get_run_item_role(item) != "assistant"
)

return handoff_input_data.clone(
input_history=tuple(deepcopy(item) for item in history_items),
pre_handoff_items=filtered_pre_items,
pre_handoff_items=tuple(filtered_pre_items),
# new_items stays unchanged for session history.
input_items=tuple(filtered_input_items),
)


Expand Down Expand Up @@ -231,6 +249,20 @@ def _split_role_and_name(role_text: str) -> tuple[str, str | None]:
return (role_text or "developer", None)


def _get_run_item_role(run_item: RunItem) -> str | None:
role_candidate = run_item.to_input_item().get("role")
return role_candidate if isinstance(role_candidate, str) else None
def _should_forward_pre_item(input_item: TResponseInputItem) -> bool:
"""Return False when the previous transcript item is represented in the summary."""
role_candidate = input_item.get("role")
if isinstance(role_candidate, str) and role_candidate == "assistant":
return False
type_candidate = input_item.get("type")
return not (isinstance(type_candidate, str) and type_candidate in _SUMMARY_ONLY_INPUT_TYPES)


def _should_forward_new_item(input_item: TResponseInputItem) -> bool:
"""Return False for tool or side-effect items that the summary already covers."""
# Items with a role should always be forwarded.
role_candidate = input_item.get("role")
if isinstance(role_candidate, str) and role_candidate:
return True
type_candidate = input_item.get("type")
return not (isinstance(type_candidate, str) and type_candidate in _SUMMARY_ONLY_INPUT_TYPES)
3 changes: 3 additions & 0 deletions src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ class RunResultStreaming(RunResultBase):
default=None,
)

_model_input_items: list[RunItem] = field(default_factory=list, repr=False)
"""Filtered items used to build model input between streaming turns."""

# Queues that the background run_loop writes to
_event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field(
default_factory=asyncio.Queue, repr=False
Expand Down
72 changes: 54 additions & 18 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ async def run(
):
current_turn = 0
original_input: str | list[TResponseInputItem] = _copy_str_or_list(prepared_input)
generated_items: list[RunItem] = []
generated_items: list[RunItem] = [] # For model input (may be filtered on handoffs)
session_items: list[RunItem] = [] # For observability (always unfiltered)
model_responses: list[ModelResponse] = []

context_wrapper: RunContextWrapper[TContext] = RunContextWrapper(
Expand Down Expand Up @@ -705,7 +706,15 @@ async def run(

model_responses.append(turn_result.model_response)
original_input = turn_result.original_input
generated_items = turn_result.generated_items
# For model input, use new_step_items (filtered on handoffs)
generated_items = turn_result.pre_step_items + turn_result.new_step_items
# Accumulate unfiltered items for observability
session_items_for_turn = (
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items
)
session_items.extend(session_items_for_turn)

if server_conversation_tracker is not None:
server_conversation_tracker.track_server_items(turn_result.model_response)
Expand All @@ -725,7 +734,7 @@ async def run(
)
result = RunResult(
input=original_input,
new_items=generated_items,
new_items=session_items, # Use unfiltered items for observability
raw_responses=model_responses,
final_output=turn_result.next_step.output,
_last_agent=current_agent,
Expand All @@ -742,7 +751,9 @@ async def run(
await self._save_result_to_session(
session,
[],
turn_result.new_step_items,
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
turn_result.model_response.response_id,
)

Expand All @@ -757,7 +768,9 @@ async def run(
await self._save_result_to_session(
session,
[],
turn_result.new_step_items,
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
turn_result.model_response.response_id,
)
current_agent = cast(Agent[TContext], turn_result.next_step.new_agent)
Expand All @@ -772,7 +785,9 @@ async def run(
await self._save_result_to_session(
session,
[],
turn_result.new_step_items,
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
turn_result.model_response.response_id,
)
else:
Expand All @@ -789,7 +804,7 @@ async def run(
except AgentsException as exc:
exc.run_data = RunErrorDetails(
input=original_input,
new_items=generated_items,
new_items=session_items, # Use unfiltered items for observability
raw_responses=model_responses,
last_agent=current_agent,
context_wrapper=context_wrapper,
Expand Down Expand Up @@ -1227,7 +1242,17 @@ async def _start_streaming(
turn_result.model_response
]
streamed_result.input = turn_result.original_input
streamed_result.new_items = turn_result.generated_items
# Keep filtered items for building model input on the next turn.
streamed_result._model_input_items = (
turn_result.pre_step_items + turn_result.new_step_items
)
# Accumulate unfiltered items for observability
session_items_for_turn = (
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items
)
streamed_result.new_items.extend(session_items_for_turn)

if server_conversation_tracker is not None:
server_conversation_tracker.track_server_items(turn_result.model_response)
Expand All @@ -1245,7 +1270,9 @@ async def _start_streaming(
await AgentRunner._save_result_to_session(
session,
[],
turn_result.new_step_items,
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
turn_result.model_response.response_id,
)

Expand Down Expand Up @@ -1294,7 +1321,9 @@ async def _start_streaming(
await AgentRunner._save_result_to_session(
session,
[],
turn_result.new_step_items,
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
turn_result.model_response.response_id,
)

Expand All @@ -1310,7 +1339,9 @@ async def _start_streaming(
await AgentRunner._save_result_to_session(
session,
[],
turn_result.new_step_items,
turn_result.session_step_items
if turn_result.session_step_items is not None
else turn_result.new_step_items,
turn_result.model_response.response_id,
)

Expand Down Expand Up @@ -1423,11 +1454,11 @@ async def _run_single_turn_streamed(

if server_conversation_tracker is not None:
input = server_conversation_tracker.prepare_input(
streamed_result.input, streamed_result.new_items
streamed_result.input, streamed_result._model_input_items
)
else:
input = ItemHelpers.input_to_new_input_list(streamed_result.input)
input.extend([item.to_input_item() for item in streamed_result.new_items])
input.extend([item.to_input_item() for item in streamed_result._model_input_items])

# THIS IS THE RESOLVED CONFLICT BLOCK
filtered = await cls._maybe_filter_model_input(
Expand Down Expand Up @@ -1547,7 +1578,7 @@ async def _run_single_turn_streamed(
single_step_result = await cls._get_single_step_result_from_response(
agent=agent,
original_input=streamed_result.input,
pre_step_items=streamed_result.new_items,
pre_step_items=streamed_result._model_input_items,
new_response=final_response,
output_schema=output_schema,
all_tools=all_tools,
Expand Down Expand Up @@ -1738,7 +1769,7 @@ async def _get_single_step_result_from_streamed_response(
tool_use_tracker: AgentToolUseTracker,
) -> SingleStepResult:
original_input = streamed_result.input
pre_step_items = streamed_result.new_items
pre_step_items = streamed_result._model_input_items
event_queue = streamed_result._event_queue

processed_response = RunImpl.process_model_response(
Expand All @@ -1763,10 +1794,15 @@ async def _get_single_step_result_from_streamed_response(
context_wrapper=context_wrapper,
run_config=run_config,
)
# Use session_step_items (unfiltered) if available for streaming observability,
# otherwise fall back to new_step_items.
streaming_items = (
single_step_result.session_step_items
if single_step_result.session_step_items is not None
else single_step_result.new_step_items
)
new_step_items = [
item
for item in single_step_result.new_step_items
if item not in new_items_processed_response
item for item in streaming_items if item not in new_items_processed_response
]
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)

Expand Down
Loading