4545)
4646from .handoffs import Handoff , HandoffInputFilter , handoff
4747from .items import (
48+ HandoffCallItem ,
4849 ItemHelpers ,
4950 ModelResponse ,
5051 RunItem ,
6061from .models .multi_provider import MultiProvider
6162from .result import RunResult , RunResultStreaming
6263from .run_context import RunContextWrapper , TContext
63- from .stream_events import AgentUpdatedStreamEvent , RawResponsesStreamEvent , RunItemStreamEvent
64+ from .stream_events import (
65+ AgentUpdatedStreamEvent ,
66+ RawResponsesStreamEvent ,
67+ RunItemStreamEvent ,
68+ StreamEvent ,
69+ )
6470from .tool import Tool
6571from .tracing import Span , SpanError , agent_span , get_current_trace , trace
6672from .tracing .span_data import AgentSpanData
@@ -1095,14 +1101,19 @@ async def _run_single_turn_streamed(
10951101 context_wrapper = context_wrapper ,
10961102 run_config = run_config ,
10971103 tool_use_tracker = tool_use_tracker ,
1104+ event_queue = streamed_result ._event_queue ,
10981105 )
10991106
1100- if emitted_tool_call_ids :
1101- import dataclasses as _dc
1107+ import dataclasses as _dc
1108+
1109+ # Filter out items that have already been sent to avoid duplicates
1110+ items_to_filter = single_step_result .new_step_items
11021111
1103- filtered_items = [
1112+ if emitted_tool_call_ids :
1113+ # Filter out tool call items that were already emitted during streaming
1114+ items_to_filter = [
11041115 item
1105- for item in single_step_result . new_step_items
1116+ for item in items_to_filter
11061117 if not (
11071118 isinstance (item , ToolCallItem )
11081119 and (
@@ -1114,15 +1125,17 @@ async def _run_single_turn_streamed(
11141125 )
11151126 ]
11161127
1117- single_step_result_filtered = _dc .replace (
1118- single_step_result , new_step_items = filtered_items
1119- )
1128+ # Filter out HandoffCallItem to avoid duplicates (already sent earlier)
1129+ items_to_filter = [
1130+ item for item in items_to_filter
1131+ if not isinstance (item , HandoffCallItem )
1132+ ]
11201133
1121- RunImpl . stream_step_result_to_queue (
1122- single_step_result_filtered , streamed_result . _event_queue
1123- )
1124- else :
1125- RunImpl .stream_step_result_to_queue (single_step_result , streamed_result ._event_queue )
1134+ # Create filtered result and send to queue
1135+ filtered_result = _dc . replace (
1136+ single_step_result , new_step_items = items_to_filter
1137+ )
1138+ RunImpl .stream_step_result_to_queue (filtered_result , streamed_result ._event_queue )
11261139 return single_step_result
11271140
11281141 @classmethod
@@ -1207,6 +1220,7 @@ async def _get_single_step_result_from_response(
12071220 context_wrapper : RunContextWrapper [TContext ],
12081221 run_config : RunConfig ,
12091222 tool_use_tracker : AgentToolUseTracker ,
1223+ event_queue : asyncio .Queue [StreamEvent | QueueCompleteSentinel ] | None = None ,
12101224 ) -> SingleStepResult :
12111225 processed_response = RunImpl .process_model_response (
12121226 agent = agent ,
@@ -1218,6 +1232,15 @@ async def _get_single_step_result_from_response(
12181232
12191233 tool_use_tracker .add_tool_use (agent , processed_response .tools_used )
12201234
1235+ # Send handoff items immediately for streaming, but avoid duplicates
1236+ if event_queue is not None and processed_response .new_items :
1237+ handoff_items = [
1238+ item for item in processed_response .new_items
1239+ if isinstance (item , HandoffCallItem )
1240+ ]
1241+ if handoff_items :
1242+ RunImpl .stream_step_items_to_queue (cast (list [RunItem ], handoff_items ), event_queue )
1243+
12211244 return await RunImpl .execute_tools_and_side_effects (
12221245 agent = agent ,
12231246 original_input = original_input ,
0 commit comments