45
45
)
46
46
from .handoffs import Handoff , HandoffInputFilter , handoff
47
47
from .items import (
48
+ HandoffCallItem ,
48
49
ItemHelpers ,
49
50
ModelResponse ,
50
51
RunItem ,
60
61
from .models .multi_provider import MultiProvider
61
62
from .result import RunResult , RunResultStreaming
62
63
from .run_context import RunContextWrapper , TContext
63
- from .stream_events import AgentUpdatedStreamEvent , RawResponsesStreamEvent , RunItemStreamEvent
64
+ from .stream_events import (
65
+ AgentUpdatedStreamEvent ,
66
+ RawResponsesStreamEvent ,
67
+ RunItemStreamEvent ,
68
+ StreamEvent ,
69
+ )
64
70
from .tool import Tool
65
71
from .tracing import Span , SpanError , agent_span , get_current_trace , trace
66
72
from .tracing .span_data import AgentSpanData
@@ -1095,14 +1101,19 @@ async def _run_single_turn_streamed(
1095
1101
context_wrapper = context_wrapper ,
1096
1102
run_config = run_config ,
1097
1103
tool_use_tracker = tool_use_tracker ,
1104
+ event_queue = streamed_result ._event_queue ,
1098
1105
)
1099
1106
1100
- if emitted_tool_call_ids :
1101
- import dataclasses as _dc
1107
+ import dataclasses as _dc
1108
+
1109
+ # Filter out items that have already been sent to avoid duplicates
1110
+ items_to_filter = single_step_result .new_step_items
1102
1111
1103
- filtered_items = [
1112
+ if emitted_tool_call_ids :
1113
+ # Filter out tool call items that were already emitted during streaming
1114
+ items_to_filter = [
1104
1115
item
1105
- for item in single_step_result . new_step_items
1116
+ for item in items_to_filter
1106
1117
if not (
1107
1118
isinstance (item , ToolCallItem )
1108
1119
and (
@@ -1114,15 +1125,17 @@ async def _run_single_turn_streamed(
1114
1125
)
1115
1126
]
1116
1127
1117
- single_step_result_filtered = _dc .replace (
1118
- single_step_result , new_step_items = filtered_items
1119
- )
1128
+ # Filter out HandoffCallItem to avoid duplicates (already sent earlier)
1129
+ items_to_filter = [
1130
+ item for item in items_to_filter
1131
+ if not isinstance (item , HandoffCallItem )
1132
+ ]
1120
1133
1121
- RunImpl . stream_step_result_to_queue (
1122
- single_step_result_filtered , streamed_result . _event_queue
1123
- )
1124
- else :
1125
- RunImpl .stream_step_result_to_queue (single_step_result , streamed_result ._event_queue )
1134
+ # Create filtered result and send to queue
1135
+ filtered_result = _dc . replace (
1136
+ single_step_result , new_step_items = items_to_filter
1137
+ )
1138
+ RunImpl .stream_step_result_to_queue (filtered_result , streamed_result ._event_queue )
1126
1139
return single_step_result
1127
1140
1128
1141
@classmethod
@@ -1207,6 +1220,7 @@ async def _get_single_step_result_from_response(
1207
1220
context_wrapper : RunContextWrapper [TContext ],
1208
1221
run_config : RunConfig ,
1209
1222
tool_use_tracker : AgentToolUseTracker ,
1223
+ event_queue : asyncio .Queue [StreamEvent | QueueCompleteSentinel ] | None = None ,
1210
1224
) -> SingleStepResult :
1211
1225
processed_response = RunImpl .process_model_response (
1212
1226
agent = agent ,
@@ -1218,6 +1232,15 @@ async def _get_single_step_result_from_response(
1218
1232
1219
1233
tool_use_tracker .add_tool_use (agent , processed_response .tools_used )
1220
1234
1235
+ # Send handoff items immediately for streaming, but avoid duplicates
1236
+ if event_queue is not None and processed_response .new_items :
1237
+ handoff_items = [
1238
+ item for item in processed_response .new_items
1239
+ if isinstance (item , HandoffCallItem )
1240
+ ]
1241
+ if handoff_items :
1242
+ RunImpl .stream_step_items_to_queue (cast (list [RunItem ], handoff_items ), event_queue )
1243
+
1221
1244
return await RunImpl .execute_tools_and_side_effects (
1222
1245
agent = agent ,
1223
1246
original_input = original_input ,
0 commit comments