Skip to content

Commit 31d9547

Browse files
committed
Emit session-only streamed items
1 parent e1c4153 commit 31d9547

File tree

2 files changed

+18
-11
lines changed

2 files changed

+18
-11
lines changed

src/agents/run.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,14 +1592,21 @@ async def _run_single_turn_streamed(
15921592

15931593
import dataclasses as _dc
15941594

1595-
# Filter out items that have already been sent to avoid duplicates
1596-
items_to_filter = single_step_result.new_step_items
1595+
# Stream session items (unfiltered) when available for observability.
1596+
streaming_items = (
1597+
single_step_result.session_step_items
1598+
if single_step_result.session_step_items is not None
1599+
else single_step_result.new_step_items
1600+
)
1601+
1602+
# Filter out items that have already been sent to avoid duplicates.
1603+
items_to_stream = streaming_items
15971604

15981605
if emitted_tool_call_ids:
15991606
# Filter out tool call items that were already emitted during streaming
1600-
items_to_filter = [
1607+
items_to_stream = [
16011608
item
1602-
for item in items_to_filter
1609+
for item in items_to_stream
16031610
if not (
16041611
isinstance(item, ToolCallItem)
16051612
and (
@@ -1613,9 +1620,9 @@ async def _run_single_turn_streamed(
16131620

16141621
if emitted_reasoning_item_ids:
16151622
# Filter out reasoning items that were already emitted during streaming
1616-
items_to_filter = [
1623+
items_to_stream = [
16171624
item
1618-
for item in items_to_filter
1625+
for item in items_to_stream
16191626
if not (
16201627
isinstance(item, ReasoningItem)
16211628
and (reasoning_id := getattr(item.raw_item, "id", None))
@@ -1624,12 +1631,12 @@ async def _run_single_turn_streamed(
16241631
]
16251632

16261633
# Filter out HandoffCallItem to avoid duplicates (already sent earlier)
1627-
items_to_filter = [
1628-
item for item in items_to_filter if not isinstance(item, HandoffCallItem)
1634+
items_to_stream = [
1635+
item for item in items_to_stream if not isinstance(item, HandoffCallItem)
16291636
]
16301637

16311638
# Create filtered result and send to queue
1632-
filtered_result = _dc.replace(single_step_result, new_step_items=items_to_filter)
1639+
filtered_result = _dc.replace(single_step_result, new_step_items=items_to_stream)
16331640
RunImpl.stream_step_result_to_queue(filtered_result, streamed_result._event_queue)
16341641
return single_step_result
16351642

tests/test_agent_runner_streamed.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -801,8 +801,8 @@ async def test_streaming_events():
801801
"tool_call_output": 2,
802802
"message": 2, # get_text_message("a_message") + get_final_output_message(...)
803803
"handoff": 1, # get_handoff_tool_call(agent_1)
804-
# Handoff output is summarized in conversation history, not emitted as a streamed item.
805-
"handoff_output": 0,
804+
# Handoff output is now emitted as a streamed item for observability.
805+
"handoff_output": 1,
806806
}
807807

808808
total_expected_item_count = sum(expected_item_type_map.values())

0 commit comments

Comments
 (0)