Skip to content

Commit 3a041f1

Browse files
authored
Add Agent.run_stream_events() convenience method wrapping run(event_stream_handler=...) (#3084)
1 parent a04d25f commit 3a041f1

File tree

18 files changed

+693
-60
lines changed

18 files changed

+693
-60
lines changed

docs/agents.md

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,18 @@ print(result.output)
6161

6262
## Running Agents
6363

64-
There are four ways to run an agent:
64+
There are five ways to run an agent:
6565

6666
1. [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] — an async function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response.
6767
2. [`agent.run_sync()`][pydantic_ai.agent.AbstractAgent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`).
6868
3. [`agent.run_stream()`][pydantic_ai.agent.AbstractAgent.run_stream] — an async context manager which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream text and structured output as an async iterable.
69-
4. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async-iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].
69+
4. [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] — a function which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result.
70+
5. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].
7071

71-
Here's a simple example demonstrating the first three:
72+
Here's a simple example demonstrating the first four:
7273

7374
```python {title="run_agent.py"}
74-
from pydantic_ai import Agent
75+
from pydantic_ai import Agent, AgentRunResultEvent, AgentStreamEvent
7576

7677
agent = Agent('openai:gpt-4o')
7778

@@ -91,6 +92,22 @@ async def main():
9192
#> The capital of
9293
#> The capital of the UK is
9394
#> The capital of the UK is London.
95+
96+
events: list[AgentStreamEvent | AgentRunResultEvent] = []
97+
async for event in agent.run_stream_events('What is the capital of Mexico?'):
98+
events.append(event)
99+
print(events)
100+
"""
101+
[
102+
PartStartEvent(index=0, part=TextPart(content='The capital of ')),
103+
FinalResultEvent(tool_name=None, tool_call_id=None),
104+
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='Mexico is Mexico ')),
105+
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='City.')),
106+
AgentRunResultEvent(
107+
result=AgentRunResult(output='The capital of Mexico is Mexico City.')
108+
),
109+
]
110+
"""
94111
```
95112

96113
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
@@ -105,13 +122,13 @@ It also takes an optional `event_stream_handler` argument that you can use to ga
105122
The example below shows how to stream events and text output. You can also [stream structured output](output.md#streaming-structured-output).
106123

107124
!!! note
108-
As the `run_stream()` method will consider the first output matching the `output_type` to be the final output,
125+
As the `run_stream()` method will consider the first output matching the [output type](output.md#structured-output) to be the final output,
109126
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
110127

111128
If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools,
112-
use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections.
129+
use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections.
113130

114-
```python {title="run_stream_events.py"}
131+
```python {title="run_stream_event_stream_handler.py"}
115132
import asyncio
116133
from collections.abc import AsyncIterable
117134
from datetime import date
@@ -147,30 +164,32 @@ async def weather_forecast(
147164

148165
output_messages: list[str] = []
149166

167+
async def handle_event(event: AgentStreamEvent):
168+
if isinstance(event, PartStartEvent):
169+
output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}')
170+
elif isinstance(event, PartDeltaEvent):
171+
if isinstance(event.delta, TextPartDelta):
172+
output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}')
173+
elif isinstance(event.delta, ThinkingPartDelta):
174+
output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}')
175+
elif isinstance(event.delta, ToolCallPartDelta):
176+
output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}')
177+
elif isinstance(event, FunctionToolCallEvent):
178+
output_messages.append(
179+
f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
180+
)
181+
elif isinstance(event, FunctionToolResultEvent):
182+
output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}')
183+
elif isinstance(event, FinalResultEvent):
184+
output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})')
185+
150186

151187
async def event_stream_handler(
152188
ctx: RunContext,
153189
event_stream: AsyncIterable[AgentStreamEvent],
154190
):
155191
async for event in event_stream:
156-
if isinstance(event, PartStartEvent):
157-
output_messages.append(f'[Request] Starting part {event.index}: {event.part!r}')
158-
elif isinstance(event, PartDeltaEvent):
159-
if isinstance(event.delta, TextPartDelta):
160-
output_messages.append(f'[Request] Part {event.index} text delta: {event.delta.content_delta!r}')
161-
elif isinstance(event.delta, ThinkingPartDelta):
162-
output_messages.append(f'[Request] Part {event.index} thinking delta: {event.delta.content_delta!r}')
163-
elif isinstance(event.delta, ToolCallPartDelta):
164-
output_messages.append(f'[Request] Part {event.index} args delta: {event.delta.args_delta}')
165-
elif isinstance(event, FunctionToolCallEvent):
166-
output_messages.append(
167-
f'[Tools] The LLM calls tool={event.part.tool_name!r} with args={event.part.args} (tool_call_id={event.part.tool_call_id!r})'
168-
)
169-
elif isinstance(event, FunctionToolResultEvent):
170-
output_messages.append(f'[Tools] Tool call {event.tool_call_id!r} returned => {event.result.content}')
171-
elif isinstance(event, FinalResultEvent):
172-
output_messages.append(f'[Result] The model starting producing a final result (tool_name={event.tool_name})')
173-
192+
await handle_event(event)
174193

175194
async def main():
176195
user_prompt = 'What will the weather be like in Paris on Tuesday?'
@@ -209,24 +228,29 @@ Like `agent.run_stream()`, [`agent.run()`][pydantic_ai.agent.AbstractAgent.run_s
209228
argument that lets you stream all events from the model's streaming response and the agent's execution of tools.
210229
Unlike `run_stream()`, it always runs the agent graph to completion even if text was received ahead of tool calls that looked like it could've been the final result.
211230

231+
For convenience, a [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] method is also available as a wrapper around `run(event_stream_handler=...)`, which returns an async iterable of [`AgentStreamEvent`s][pydantic_ai.messages.AgentStreamEvent] and a [`AgentRunResultEvent`][pydantic_ai.run.AgentRunResultEvent] containing the final run result.
232+
212233
!!! note
213-
When used with an `event_stream_handler`, the `run()` method currently requires you to piece together the streamed text yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s instead of providing a `stream_text()` convenience method.
234+
As they return raw events as they come in, the `run_stream_events()` and `run(event_stream_handler=...)` methods require you to piece together the streamed text and structured output yourself from the `PartStartEvent` and subsequent `PartDeltaEvent`s.
214235

215236
To get the best of both worlds, at the expense of some additional complexity, you can use [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] as described in the next section, which lets you [iterate over the agent graph](#iterating-over-an-agents-graph) and [stream both events and output](#streaming-all-events-and-output) at every step.
216237

217-
```python {title="run_events.py" requires="run_stream_events.py"}
238+
```python {title="run_events.py" requires="run_stream_event_stream_handler.py"}
218239
import asyncio
219240

220-
from run_stream_events import event_stream_handler, output_messages, weather_agent
241+
from pydantic_ai import AgentRunResultEvent
242+
243+
from run_stream_event_stream_handler import handle_event, output_messages, weather_agent
221244

222245

223246
async def main():
224247
user_prompt = 'What will the weather be like in Paris on Tuesday?'
225248

226-
run = await weather_agent.run(user_prompt, event_stream_handler=event_stream_handler)
227-
228-
output_messages.append(f'[Final Output] {run.output}')
229-
249+
async for event in weather_agent.run_stream_events(user_prompt):
250+
if isinstance(event, AgentRunResultEvent):
251+
output_messages.append(f'[Final Output] {event.result.output}')
252+
else:
253+
await handle_event(event)
230254

231255
if __name__ == '__main__':
232256
asyncio.run(main())

docs/api/run.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# `pydantic_ai.run`
2+
3+
::: pydantic_ai.run

docs/durable_execution/dbos.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ DBOS checkpoints workflow inputs/outputs and step outputs into a database using
127127

128128
### Streaming
129129

130-
Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] is not supported when running inside of a DBOS workflow.
130+
Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.run_stream_events()`][pydantic_ai.Agent.run_stream_events] are not supported when running inside of a DBOS workflow.
131131

132132
Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `DBOSAgent` instance and using [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run].
133133
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).

docs/durable_execution/temporal.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ If you need one or more of these attributes to be available inside activities, y
177177

178178
### Streaming
179179

180-
Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported.
180+
Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream], [`Agent.run_stream_events()`][pydantic_ai.Agent.run_stream_events], and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported.
181181

182182
Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `TemporalAgent` instance and using [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] inside the workflow.
183183
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).

docs/output.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ There two main challenges with streamed results:
520520
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
521521

522522
If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools,
523-
use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` ([docs](agents.md#streaming-all-events)) or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] ([docs](agents.md#streaming-all-events-and-output)) instead.
523+
use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] ([docs](agents.md#streaming-all-events)) or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] ([docs](agents.md#streaming-all-events-and-output)) instead.
524524

525525
### Streaming Text
526526

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ nav:
131131
- api/profiles.md
132132
- api/providers.md
133133
- api/retries.md
134+
- api/run.md
134135
- pydantic_evals:
135136
- api/pydantic_evals/dataset.md
136137
- api/pydantic_evals/evaluators.md

pydantic_ai_slim/pydantic_ai/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
ModelProfile,
8989
ModelProfileSpec,
9090
)
91+
from .run import AgentRun, AgentRunResult, AgentRunResultEvent
9192
from .settings import ModelSettings
9293
from .tools import DeferredToolRequests, DeferredToolResults, RunContext, Tool, ToolApproved, ToolDefinition, ToolDenied
9394
from .toolsets import (
@@ -224,5 +225,9 @@
224225
'RunUsage',
225226
'RequestUsage',
226227
'UsageLimits',
228+
# run
229+
'AgentRun',
230+
'AgentRunResult',
231+
'AgentRunResultEvent',
227232
)
228233
__version__ = _metadata_version('pydantic_ai_slim')

pydantic_ai_slim/pydantic_ai/_agent_graph.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@
8787
class GraphAgentState:
8888
"""State kept across the execution of the agent graph."""
8989

90-
message_history: list[_messages.ModelMessage]
91-
usage: _usage.RunUsage
92-
retries: int
93-
run_step: int
90+
message_history: list[_messages.ModelMessage] = dataclasses.field(default_factory=list)
91+
usage: _usage.RunUsage = dataclasses.field(default_factory=_usage.RunUsage)
92+
retries: int = 0
93+
run_step: int = 0
9494

9595
def increment_retries(self, max_result_retries: int, error: BaseException | None = None) -> None:
9696
self.retries += 1

0 commit comments

Comments
 (0)