Skip to content

Commit 019c953

Browse files
committed
Feat: separate tool_call_item and tool_call_output_item in stream events
1 parent 4cb07d5 commit 019c953

File tree

4 files changed

+1943
-1835
lines changed

4 files changed

+1943
-1835
lines changed

src/agents/_run_impl.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -911,12 +911,12 @@ async def run_single_output_guardrail(
911911
return result
912912

913913
@classmethod
914-
def stream_step_result_to_queue(
914+
def stream_step_items_to_queue(
915915
cls,
916-
step_result: SingleStepResult,
916+
new_step_items: list[RunItem],
917917
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
918918
):
919-
for item in step_result.new_step_items:
919+
for item in new_step_items:
920920
if isinstance(item, MessageOutputItem):
921921
event = RunItemStreamEvent(item=item, name="message_output_created")
922922
elif isinstance(item, HandoffCallItem):
@@ -941,6 +941,14 @@ def stream_step_result_to_queue(
941941
if event:
942942
queue.put_nowait(event)
943943

944+
@classmethod
945+
def stream_step_result_to_queue(
946+
cls,
947+
step_result: SingleStepResult,
948+
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
949+
):
950+
cls.stream_step_items_to_queue(step_result.new_step_items, queue)
951+
944952
@classmethod
945953
async def _check_for_final_output_from_tools(
946954
cls,

src/agents/run.py

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -904,10 +904,9 @@ async def _run_single_turn_streamed(
904904
raise ModelBehaviorError("Model did not produce a final response!")
905905

906906
# 3. Now, we can process the turn as we do in the non-streaming case
907-
single_step_result = await cls._get_single_step_result_from_response(
907+
return await cls._get_single_step_result_from_streamed_response(
908908
agent=agent,
909-
original_input=streamed_result.input,
910-
pre_step_items=streamed_result.new_items,
909+
streamed_result=streamed_result,
911910
new_response=final_response,
912911
output_schema=output_schema,
913912
all_tools=all_tools,
@@ -918,9 +917,6 @@ async def _run_single_turn_streamed(
918917
tool_use_tracker=tool_use_tracker,
919918
)
920919

921-
RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue)
922-
return single_step_result
923-
924920
@classmethod
925921
async def _run_single_turn(
926922
cls,
@@ -1023,6 +1019,57 @@ async def _get_single_step_result_from_response(
10231019
run_config=run_config,
10241020
)
10251021

1022+
@classmethod
1023+
async def _get_single_step_result_from_streamed_response(
1024+
cls,
1025+
*,
1026+
agent: Agent[TContext],
1027+
all_tools: list[Tool],
1028+
streamed_result: RunResultStreaming,
1029+
new_response: ModelResponse,
1030+
output_schema: AgentOutputSchemaBase | None,
1031+
handoffs: list[Handoff],
1032+
hooks: RunHooks[TContext],
1033+
context_wrapper: RunContextWrapper[TContext],
1034+
run_config: RunConfig,
1035+
tool_use_tracker: AgentToolUseTracker,
1036+
) -> SingleStepResult:
1037+
1038+
original_input = streamed_result.input
1039+
pre_step_items = streamed_result.new_items
1040+
event_queue = streamed_result._event_queue
1041+
1042+
processed_response = RunImpl.process_model_response(
1043+
agent=agent,
1044+
all_tools=all_tools,
1045+
response=new_response,
1046+
output_schema=output_schema,
1047+
handoffs=handoffs,
1048+
)
1049+
new_items_processed_response = processed_response.new_items
1050+
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
1051+
RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue)
1052+
1053+
single_step_result = await RunImpl.execute_tools_and_side_effects(
1054+
agent=agent,
1055+
original_input=original_input,
1056+
pre_step_items=pre_step_items,
1057+
new_response=new_response,
1058+
processed_response=processed_response,
1059+
output_schema=output_schema,
1060+
hooks=hooks,
1061+
context_wrapper=context_wrapper,
1062+
run_config=run_config,
1063+
)
1064+
new_step_items = [
1065+
item
1066+
for item in single_step_result.new_step_items
1067+
if item not in new_items_processed_response
1068+
]
1069+
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)
1070+
1071+
return single_step_result
1072+
10261073
@classmethod
10271074
async def _run_input_guardrails(
10281075
cls,

tests/test_stream_events.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import asyncio
2+
import time
3+
4+
import pytest
5+
6+
from agents import Agent, Runner, function_tool
7+
8+
from .fake_model import FakeModel
9+
from .test_responses import get_function_tool_call, get_text_message
10+
11+
12+
@function_tool
13+
async def foo() -> str:
14+
await asyncio.sleep(3)
15+
return "success!"
16+
17+
@pytest.mark.asyncio
18+
async def test_stream_events_main():
19+
model = FakeModel()
20+
agent = Agent(
21+
name="Joker",
22+
model=model,
23+
tools=[foo],
24+
)
25+
26+
model.add_multiple_turn_outputs(
27+
[
28+
# First turn: a message and tool call
29+
[
30+
get_text_message("a_message"),
31+
get_function_tool_call("foo", ""),
32+
],
33+
# Second turn: text message
34+
[get_text_message("done")],
35+
]
36+
)
37+
38+
result = Runner.run_streamed(
39+
agent,
40+
input="Hello",
41+
)
42+
tool_call_start_time = -1
43+
tool_call_end_time = -1
44+
async for event in result.stream_events():
45+
if event.type == "run_item_stream_event":
46+
if event.item.type == "tool_call_item":
47+
tool_call_start_time = time.time_ns()
48+
elif event.item.type == "tool_call_output_item":
49+
tool_call_end_time = time.time_ns()
50+
51+
assert tool_call_start_time > 0, "tool_call_item was not observed"
52+
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
53+
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"

0 commit comments

Comments
 (0)