Skip to content

Commit 7012fdb

Browse files
author
pokliu
committed
feat: Add streaming tool output support
1 parent c422030 commit 7012fdb

File tree

13 files changed

+704
-22
lines changed

13 files changed

+704
-22
lines changed

docs/ja/streaming.md

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,64 @@ async def main():
8888

8989
if __name__ == "__main__":
9090
asyncio.run(main())
91-
```
91+
```
92+
93+
## ツール出力ストリーミングイベント
94+
95+
[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent] を使用すると、ツールの実行中に増分出力を受け取ることができます。これは、長時間実行されるツールでユーザーにリアルタイムで進捗を表示したい場合に有用です。
96+
97+
ストリーミングツールを作成するには、文字列チャンクを yield する非同期ジェネレータ関数を定義します:
98+
99+
```python
100+
import asyncio
101+
from collections.abc import AsyncIterator
102+
from agents import Agent, Runner, ToolOutputStreamEvent, function_tool
103+
104+
@function_tool
105+
async def search_documents(query: str) -> AsyncIterator[str]:
106+
"""ドキュメントを検索し、見つかった結果をストリーミングします。"""
107+
documents = [
108+
f"ドキュメント 1 には {query} に関する情報が含まれています...\n",
109+
f"ドキュメント 2 には {query} に関する追加の詳細があります...\n",
110+
f"ドキュメント 3 は {query} の分析を提供します...\n",
111+
]
112+
113+
for doc in documents:
114+
# 処理時間をシミュレート
115+
await asyncio.sleep(0.5)
116+
# 増分結果を yield
117+
yield doc
118+
119+
120+
async def main():
121+
agent = Agent(
122+
name="Research Assistant",
123+
instructions="あなたはユーザーの情報検索を支援します。",
124+
tools=[search_documents],
125+
)
126+
127+
result = Runner.run_streamed(
128+
agent,
129+
input="AI に関する情報を検索してください",
130+
)
131+
132+
async for event in result.stream_events():
133+
# ツールストリーミングイベントを処理
134+
if event.type == "tool_output_stream_event":
135+
print(f"[{event.tool_name}] {event.delta}", end="", flush=True)
136+
# 最終ツール出力を処理
137+
elif event.type == "run_item_stream_event" and event.name == "tool_output":
138+
print(f"\n✓ ツール完了\n")
139+
140+
141+
if __name__ == "__main__":
142+
asyncio.run(main())
143+
```
144+
145+
ストリーミングツールに関する重要なポイント:
146+
147+
- ストリーミングツールは `AsyncIterator[str]`(文字列を yield する非同期ジェネレータ)を返す必要があります
148+
- yield された各チャンクは `ToolOutputStreamEvent` として発行されます
149+
- すべてのチャンクは自動的に蓄積され、最終的なツール出力として LLM に送信されます
150+
- 非ストリーミングツールはストリーミングツールと一緒に正常に動作します
151+
- 非ストリーミングモード(`Runner.run()`)では、ストリーミングツールは返す前にすべてのチャンクを自動的に収集します

docs/ko/streaming.md

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,64 @@ async def main():
8888

8989
if __name__ == "__main__":
9090
asyncio.run(main())
91-
```
91+
```
92+
93+
## 도구 출력 스트리밍 이벤트
94+
95+
[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent]를 사용하면 도구가 실행되는 동안 증분 출력을 받을 수 있습니다. 이는 장시간 실행되는 도구에서 사용자에게 실시간으로 진행 상황을 표시하려는 경우에 유용합니다.
96+
97+
스트리밍 도구를 만들려면 문자열 청크를 yield하는 비동기 제너레이터 함수를 정의하세요:
98+
99+
```python
100+
import asyncio
101+
from collections.abc import AsyncIterator
102+
from agents import Agent, Runner, ToolOutputStreamEvent, function_tool
103+
104+
@function_tool
105+
async def search_documents(query: str) -> AsyncIterator[str]:
106+
"""문서를 검색하고 발견된 결과를 스트리밍합니다."""
107+
documents = [
108+
f"문서 1에는 {query}에 대한 정보가 포함되어 있습니다...\n",
109+
f"문서 2에는 {query}에 대한 추가 세부정보가 있습니다...\n",
110+
f"문서 3은 {query}에 대한 분석을 제공합니다...\n",
111+
]
112+
113+
for doc in documents:
114+
# 처리 시간 시뮬레이션
115+
await asyncio.sleep(0.5)
116+
# 증분 결과 yield
117+
yield doc
118+
119+
120+
async def main():
121+
agent = Agent(
122+
name="Research Assistant",
123+
instructions="당신은 사용자가 정보를 검색하도록 돕습니다.",
124+
tools=[search_documents],
125+
)
126+
127+
result = Runner.run_streamed(
128+
agent,
129+
input="AI에 관한 정보를 검색하세요",
130+
)
131+
132+
async for event in result.stream_events():
133+
# 도구 스트리밍 이벤트 처리
134+
if event.type == "tool_output_stream_event":
135+
print(f"[{event.tool_name}] {event.delta}", end="", flush=True)
136+
# 최종 도구 출력 처리
137+
elif event.type == "run_item_stream_event" and event.name == "tool_output":
138+
print(f"\n✓ 도구 완료\n")
139+
140+
141+
if __name__ == "__main__":
142+
asyncio.run(main())
143+
```
144+
145+
스트리밍 도구에 대한 주요 사항:
146+
147+
- 스트리밍 도구는 `AsyncIterator[str]`(문자열을 yield하는 비동기 제너레이터)을 반환해야 합니다
148+
- yield된 각 청크는 `ToolOutputStreamEvent`로 발행됩니다
149+
- 모든 청크는 자동으로 누적되어 최종 도구 출력으로 LLM에 전송됩니다
150+
- 비스트리밍 도구는 스트리밍 도구와 함께 정상적으로 작동합니다
151+
- 비스트리밍 모드(`Runner.run()`)에서 스트리밍 도구는 반환하기 전에 모든 청크를 자동으로 수집합니다

docs/streaming.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,63 @@ async def main():
8585
if __name__ == "__main__":
8686
asyncio.run(main())
8787
```
88+
89+
## Tool output streaming events
90+
91+
[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent] allows you to receive incremental output from tools as they execute. This is useful for long-running tools where you want to show progress to the user in real-time.
92+
93+
To create a streaming tool, define an async generator function that yields string chunks:
94+
95+
```python
96+
import asyncio
97+
from collections.abc import AsyncIterator
98+
from agents import Agent, Runner, ToolOutputStreamEvent, function_tool
99+
100+
@function_tool
101+
async def search_documents(query: str) -> AsyncIterator[str]:
102+
"""Search through documents and stream results as they are found."""
103+
documents = [
104+
f"Document 1 contains information about {query}...\n",
105+
f"Document 2 has additional details on {query}...\n",
106+
f"Document 3 provides analysis of {query}...\n",
107+
]
108+
109+
for doc in documents:
110+
# Simulate processing time
111+
await asyncio.sleep(0.5)
112+
# Yield incremental results
113+
yield doc
114+
115+
116+
async def main():
117+
agent = Agent(
118+
name="Research Assistant",
119+
instructions="You help users search for information.",
120+
tools=[search_documents],
121+
)
122+
123+
result = Runner.run_streamed(
124+
agent,
125+
input="Search for information about AI",
126+
)
127+
128+
async for event in result.stream_events():
129+
# Handle tool streaming events
130+
if event.type == "tool_output_stream_event":
131+
print(f"[{event.tool_name}] {event.delta}", end="", flush=True)
132+
# Handle final tool output
133+
elif event.type == "run_item_stream_event" and event.name == "tool_output":
134+
print(f"\n✓ Tool completed\n")
135+
136+
137+
if __name__ == "__main__":
138+
asyncio.run(main())
139+
```
140+
141+
Key points about streaming tools:
142+
143+
- Streaming tools must return `AsyncIterator[str]` (an async generator that yields strings)
144+
- Each yielded chunk is emitted as a `ToolOutputStreamEvent`
145+
- All chunks are automatically accumulated and sent to the LLM as the final tool output
146+
- Non-streaming tools continue to work normally alongside streaming tools
147+
- In non-streaming mode (`Runner.run()`), streaming tools automatically collect all chunks before returning

docs/zh/streaming.md

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,64 @@ async def main():
8888

8989
if __name__ == "__main__":
9090
asyncio.run(main())
91-
```
91+
```
92+
93+
## 工具输出流式事件
94+
95+
[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent] 允许你在工具执行时接收增量输出。这对于长时间运行的工具非常有用,可以实时向用户显示进度。
96+
97+
要创建流式工具,请定义一个异步生成器函数,逐块 yield 字符串:
98+
99+
```python
100+
import asyncio
101+
from collections.abc import AsyncIterator
102+
from agents import Agent, Runner, ToolOutputStreamEvent, function_tool
103+
104+
@function_tool
105+
async def search_documents(query: str) -> AsyncIterator[str]:
106+
"""搜索文档并流式返回找到的结果。"""
107+
documents = [
108+
f"文档 1 包含关于 {query} 的信息...\n",
109+
f"文档 2 提供关于 {query} 的更多细节...\n",
110+
f"文档 3 分析了 {query}...\n",
111+
]
112+
113+
for doc in documents:
114+
# 模拟处理时间
115+
await asyncio.sleep(0.5)
116+
# yield 增量结果
117+
yield doc
118+
119+
120+
async def main():
121+
agent = Agent(
122+
name="Research Assistant",
123+
instructions="你帮助用户搜索信息。",
124+
tools=[search_documents],
125+
)
126+
127+
result = Runner.run_streamed(
128+
agent,
129+
input="搜索关于人工智能的信息",
130+
)
131+
132+
async for event in result.stream_events():
133+
# 处理工具流式事件
134+
if event.type == "tool_output_stream_event":
135+
print(f"[{event.tool_name}] {event.delta}", end="", flush=True)
136+
# 处理最终工具输出
137+
elif event.type == "run_item_stream_event" and event.name == "tool_output":
138+
print(f"\n✓ 工具完成\n")
139+
140+
141+
if __name__ == "__main__":
142+
asyncio.run(main())
143+
```
144+
145+
关于流式工具的要点:
146+
147+
- 流式工具必须返回 `AsyncIterator[str]`(一个 yield 字符串的异步生成器)
148+
- 每个 yield 的块都会作为 `ToolOutputStreamEvent` 发出
149+
- 所有块会自动累积并作为最终工具输出发送给 LLM
150+
- 非流式工具可以与流式工具正常共存
151+
- 在非流式模式(`Runner.run()`)中,流式工具会在返回前自动收集所有块
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
Example of using streaming tools with the Agents SDK.
3+
4+
This example demonstrates how to create a tool that yields incremental output,
5+
allowing you to stream tool execution results to the user in real-time.
6+
"""
7+
8+
import asyncio
9+
from collections.abc import AsyncIterator
10+
11+
from agents import Agent, Runner, ToolOutputStreamEvent, function_tool
12+
13+
14+
@function_tool
15+
async def search_documents(query: str) -> AsyncIterator[str]:
16+
"""Search through documents and stream results as they are found.
17+
18+
Args:
19+
query: The search query.
20+
21+
Yields:
22+
Incremental search results.
23+
"""
24+
# Simulate searching through multiple documents
25+
documents = [
26+
f"Document 1 contains information about {query}...\n",
27+
f"Document 2 has additional details on {query}...\n",
28+
f"Document 3 provides analysis of {query}...\n",
29+
]
30+
31+
for doc in documents:
32+
# Simulate processing time
33+
await asyncio.sleep(0.5)
34+
# Yield incremental results
35+
yield doc
36+
37+
38+
@function_tool
39+
async def generate_report(topic: str) -> AsyncIterator[str]:
40+
"""Generate a report on a topic, streaming the output as it's generated.
41+
42+
Args:
43+
topic: The topic to generate a report on.
44+
45+
Yields:
46+
Incremental report content.
47+
"""
48+
sections = [
49+
f"# Report on {topic}\n\n",
50+
f"## Introduction\n\nThis report covers {topic} in detail.\n\n",
51+
f"## Analysis\n\nOur analysis of {topic} shows several key points...\n\n",
52+
f"## Conclusion\n\nIn summary, {topic} is an important topic.\n\n",
53+
]
54+
55+
for section in sections:
56+
await asyncio.sleep(0.3)
57+
yield section
58+
59+
60+
async def main():
61+
# Create an agent with streaming tools
62+
agent = Agent(
63+
name="Research Assistant",
64+
instructions="You are a helpful research assistant that can search documents and generate reports.",
65+
tools=[search_documents, generate_report],
66+
)
67+
68+
# Run the agent in streaming mode
69+
result = Runner.run_streamed(
70+
agent,
71+
input="Search for information about artificial intelligence and generate a brief report.",
72+
)
73+
74+
print("Streaming agent output:\n")
75+
76+
# Stream events and display tool outputs in real-time
77+
async for event in result.stream_events():
78+
# Handle tool streaming events
79+
if event.type == "tool_output_stream_event":
80+
assert isinstance(event, ToolOutputStreamEvent)
81+
print(f"[{event.tool_name}] {event.delta}", end="", flush=True)
82+
83+
# Handle run item events (final outputs)
84+
elif event.type == "run_item_stream_event":
85+
if event.name == "tool_output":
86+
print(f"\n✓ Tool '{event.item.agent.name}' completed\n")
87+
elif event.name == "message_output_created":
88+
print(f"\n[Agent Response]: {event.item}\n")
89+
90+
# Get final result
91+
print("\n" + "=" * 60)
92+
print("Final output:", result.final_output)
93+
print("=" * 60)
94+
95+
96+
if __name__ == "__main__":
97+
asyncio.run(main())

src/agents/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
RawResponsesStreamEvent,
6666
RunItemStreamEvent,
6767
StreamEvent,
68+
ToolOutputStreamEvent,
6869
)
6970
from .tool import (
7071
CodeInterpreterTool,
@@ -263,6 +264,7 @@ def enable_verbose_stdout_logging():
263264
"RawResponsesStreamEvent",
264265
"RunItemStreamEvent",
265266
"AgentUpdatedStreamEvent",
267+
"ToolOutputStreamEvent",
266268
"StreamEvent",
267269
"FunctionTool",
268270
"FunctionToolResult",

0 commit comments

Comments
 (0)