Skip to content

Commit a6810ba

Browse files
authored
Merge branch 'pydantic:main' into fix-makefile
2 parents 1f30684 + f2b9e9f commit a6810ba

35 files changed

+7774
-9
lines changed

docs/api/durable_exec.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
::: pydantic_ai.durable_exec.temporal
44

55
::: pydantic_ai.durable_exec.dbos
6+
7+
::: pydantic_ai.durable_exec.prefect

docs/durable_execution/overview.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](../agents.md#streaming-all-events) and [MCP](../mcp/client.md), with the added benefit of fault tolerance.
44

5-
Pydantic AI natively supports two durable execution solutions:
5+
Pydantic AI natively supports three durable execution solutions:
66

77
- [Temporal](./temporal.md)
88
- [DBOS](./dbos.md)
9+
- [Prefect](./prefect.md)
910

10-
These integrations only uses Pydantic AI's public interface, so they also serve as a reference for integrating with other durable systems.
11+
These integrations only use Pydantic AI's public interface, so they also serve as a reference for integrating with other durable systems.

docs/durable_execution/prefect.md

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
# Durable Execution with Prefect
2+
3+
[Prefect](https://www.prefect.io/) is a workflow orchestration framework for building resilient data pipelines in Python, natively integrated with Pydantic AI.
4+
5+
## Durable Execution
6+
7+
Prefect 3.0 brings [transactional semantics](https://www.prefect.io/blog/transactional-ml-pipelines-with-prefect-3-0) to your Python workflows, allowing you to group tasks into atomic units and define failure modes. If any part of a transaction fails, the entire transaction can be rolled back to a clean state.
8+
9+
* **Flows** are the top-level entry points for your workflow. They can contain tasks and other flows.
10+
* **Tasks** are individual units of work that can be retried, cached, and monitored independently.
11+
12+
Prefect 3.0's approach to transactional orchestration makes your workflows automatically **idempotent**: rerunnable without duplication or inconsistency across any environment. Every task is executed within a transaction that governs when and where the task's result record is persisted. If the task runs again under an identical context, it will not re-execute but instead load its previous result.
13+
14+
The diagram below shows the overall architecture of an agentic application with Prefect.
15+
Prefect uses client-side task orchestration by default, with optional server connectivity for advanced features like scheduling and monitoring.
16+
17+
```text
18+
+---------------------+
19+
| Prefect Server | (Monitoring,
20+
| or Cloud | scheduling, UI,
21+
+---------------------+ orchestration)
22+
^
23+
|
24+
Flow state, | Schedule flows,
25+
metadata, | track execution
26+
logs |
27+
|
28+
+------------------------------------------------------+
29+
| Application Process |
30+
| +----------------------------------------------+ |
31+
| | Flow (Agent.run) | |
32+
| +----------------------------------------------+ |
33+
| | | | |
34+
| v v v |
35+
| +-----------+ +------------+ +-------------+ |
36+
| | Task | | Task | | Task | |
37+
| | (Tool) | | (MCP Tool) | | (Model API) | |
38+
| +-----------+ +------------+ +-------------+ |
39+
| | | | |
40+
| Cache & Cache & Cache & |
41+
| persist persist persist |
42+
| to to to |
43+
| v v v |
44+
| +----------------------------------------------+ |
45+
| | Result Storage (Local FS, S3, etc.) | |
46+
| +----------------------------------------------+ |
47+
+------------------------------------------------------+
48+
| | |
49+
v v v
50+
[External APIs, services, databases, etc.]
51+
```
52+
53+
See the [Prefect documentation](https://docs.prefect.io/) for more information.
54+
55+
## Durable Agent
56+
57+
Any agent can be wrapped in a [`PrefectAgent`][pydantic_ai.durable_exec.prefect.PrefectAgent] to get durable execution. `PrefectAgent` automatically:
58+
59+
* Wraps [`Agent.run`][pydantic_ai.Agent.run] and [`Agent.run_sync`][pydantic_ai.Agent.run_sync] as Prefect flows.
60+
* Wraps [model requests](../models/overview.md) as Prefect tasks.
61+
* Wraps [tool calls](../tools.md) as Prefect tasks (configurable per-tool).
62+
* Wraps [MCP communication](../mcp/client.md) as Prefect tasks.
63+
64+
Event stream handlers are **automatically wrapped** by Prefect when running inside a Prefect flow. Each event from the stream is processed in a separate Prefect task for durability. You can customize the task behavior using the `event_stream_handler_task_config` parameter when creating the `PrefectAgent`. Do **not** manually decorate event stream handlers with `@task`. For examples, see the [streaming docs](../agents.md#streaming-all-events)
65+
66+
The original agent, model, and MCP server can still be used as normal outside the Prefect flow.
67+
68+
Here is a simple but complete example of wrapping an agent for durable execution. All it requires is to install Pydantic AI with Prefect:
69+
70+
```bash
71+
pip/uv-add pydantic-ai[prefect]
72+
```
73+
74+
Or if you're using the slim package, you can install it with the `prefect` optional group:
75+
76+
```bash
77+
pip/uv-add pydantic-ai-slim[prefect]
78+
```
79+
80+
```python {title="prefect_agent.py" test="skip"}
81+
from pydantic_ai import Agent
82+
from pydantic_ai.durable_exec.prefect import PrefectAgent
83+
84+
agent = Agent(
85+
'gpt-4o',
86+
instructions="You're an expert in geography.",
87+
name='geography', # (1)!
88+
)
89+
90+
prefect_agent = PrefectAgent(agent) # (2)!
91+
92+
async def main():
93+
result = await prefect_agent.run('What is the capital of Mexico?') # (3)!
94+
print(result.output)
95+
#> Mexico City (Ciudad de México, CDMX)
96+
```
97+
98+
1. The agent's `name` is used to uniquely identify its flows and tasks.
99+
2. Wrapping the agent with `PrefectAgent` enables durable execution for all agent runs.
100+
3. [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run] works like [`Agent.run()`][pydantic_ai.Agent.run], but runs as a Prefect flow and executes model requests, decorated tool calls, and MCP communication as Prefect tasks.
101+
102+
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
103+
104+
For more information on how to use Prefect in Python applications, see their [Python documentation](https://docs.prefect.io/v3/how-to-guides/workflows/write-and-run).
105+
106+
## Prefect Integration Considerations
107+
108+
When using Prefect with Pydantic AI agents, there are a few important considerations to ensure workflows behave correctly.
109+
110+
### Agent Requirements
111+
112+
Each agent instance must have a unique `name` so Prefect can correctly identify and track its flows and tasks.
113+
114+
### Tool Wrapping
115+
116+
Agent tools are automatically wrapped as Prefect tasks, which means they benefit from:
117+
118+
* **Retry logic**: Failed tool calls can be retried automatically
119+
* **Caching**: Tool results are cached based on their inputs
120+
* **Observability**: Tool execution is tracked in the Prefect UI
121+
122+
You can customize tool task behavior using `tool_task_config` (applies to all tools) or `tool_task_config_by_name` (per-tool configuration):
123+
124+
```python {title="prefect_agent_config.py" test="skip"}
125+
from pydantic_ai import Agent
126+
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig
127+
128+
agent = Agent('gpt-4o', name='my_agent')
129+
130+
@agent.tool_plain
131+
def fetch_data(url: str) -> str:
132+
# This tool will be wrapped as a Prefect task
133+
...
134+
135+
prefect_agent = PrefectAgent(
136+
agent,
137+
tool_task_config=TaskConfig(retries=3), # Default for all tools
138+
tool_task_config_by_name={
139+
'fetch_data': TaskConfig(timeout_seconds=10.0), # Specific to fetch_data
140+
'simple_tool': None, # Disable task wrapping for simple_tool
141+
},
142+
)
143+
```
144+
145+
Set a tool's config to `None` in `tool_task_config_by_name` to disable task wrapping for that specific tool.
146+
147+
### Streaming
148+
149+
When running inside a Prefect flow, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] works but doesn't provide real-time streaming because Prefect tasks consume their entire execution before returning results. The method will execute fully and return the complete result at once.
150+
151+
For real-time streaming behavior inside Prefect flows, you can set an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `PrefectAgent` instance and use [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run].
152+
153+
**Note**: Event stream handlers behave differently when running inside a Prefect flow versus outside:
154+
- **Outside a flow**: The handler receives events as they stream from the model
155+
- **Inside a flow**: Each event is wrapped as a Prefect task for durability, which may affect timing but ensures reliability
156+
157+
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).
158+
159+
## Task Configuration
160+
161+
You can customize Prefect task behavior, such as retries and timeouts, by passing [`TaskConfig`][pydantic_ai.durable_exec.prefect.TaskConfig] objects to the `PrefectAgent` constructor:
162+
163+
- `mcp_task_config`: Configuration for MCP server communication tasks
164+
- `model_task_config`: Configuration for model request tasks
165+
- `tool_task_config`: Default configuration for all tool calls
166+
- `tool_task_config_by_name`: Per-tool task configuration (overrides `tool_task_config`)
167+
- `event_stream_handler_task_config`: Configuration for event stream handler tasks (applies when running inside a Prefect flow)
168+
169+
Available `TaskConfig` options:
170+
171+
- `retries`: Maximum number of retries for the task (default: `0`)
172+
- `retry_delay_seconds`: Delay between retries in seconds (can be a single value or list for exponential backoff, default: `1.0`)
173+
- `timeout_seconds`: Maximum time in seconds for the task to complete
174+
- `cache_policy`: Custom Prefect cache policy for the task
175+
- `persist_result`: Whether to persist the task result
176+
- `result_storage`: Prefect result storage for the task (e.g., `'s3-bucket/my-storage'` or a `WritableFileSystem` block)
177+
- `log_prints`: Whether to log print statements from the task (default: `False`)
178+
179+
Example:
180+
181+
```python {title="prefect_agent_config.py" test="skip"}
182+
from pydantic_ai import Agent
183+
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig
184+
185+
agent = Agent(
186+
'gpt-4o',
187+
instructions="You're an expert in geography.",
188+
name='geography',
189+
)
190+
191+
prefect_agent = PrefectAgent(
192+
agent,
193+
model_task_config=TaskConfig(
194+
retries=3,
195+
retry_delay_seconds=[1.0, 2.0, 4.0], # Exponential backoff
196+
timeout_seconds=30.0,
197+
),
198+
)
199+
200+
async def main():
201+
result = await prefect_agent.run('What is the capital of France?')
202+
print(result.output)
203+
#> Paris
204+
```
205+
206+
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
207+
208+
### Retry Considerations
209+
210+
Pydantic AI and provider API clients have their own retry logic. When using Prefect, you may want to:
211+
212+
* Disable [HTTP Request Retries](../retries.md) in Pydantic AI
213+
* Turn off your provider API client's retry logic (e.g., `max_retries=0` on a [custom OpenAI client](../models/openai.md#custom-openai-client))
214+
* Rely on Prefect's task-level retry configuration for consistency
215+
216+
This prevents requests from being retried multiple times at different layers.
217+
218+
## Caching and Idempotency
219+
220+
Prefect 3.0 provides built-in caching and transactional semantics. Tasks with identical inputs will not re-execute if their results are already cached, making workflows naturally idempotent and resilient to failures.
221+
222+
* **Task inputs**: Messages, settings, parameters, tool arguments, and serializable dependencies
223+
224+
**Note**: For user dependencies to be included in cache keys, they must be serializable (e.g., Pydantic models or basic Python types). Non-serializable dependencies are automatically excluded from cache computation.
225+
226+
## Observability with Prefect and Logfire
227+
228+
Prefect provides a built-in UI for monitoring flow runs, task executions, and failures. You can:
229+
230+
* View real-time flow run status
231+
* Debug failures with full stack traces
232+
* Set up alerts and notifications
233+
234+
To access the Prefect UI, you can either:
235+
236+
1. Use [Prefect Cloud](https://www.prefect.io/cloud) (managed service)
237+
2. Run a local [Prefect server](https://docs.prefect.io/v3/how-to-guides/self-hosted/server-cli) with `prefect server start`
238+
239+
You can also use [Pydantic Logfire](../logfire.md) for detailed observability. When using both Prefect and Logfire, you'll get complementary views:
240+
241+
* **Prefect**: Workflow-level orchestration, task status, and retry history
242+
* **Logfire**: Fine-grained tracing of agent runs, model requests, and tool invocations
243+
244+
When using Logfire with Prefect, you can enable distributed tracing to see spans for your Prefect runs included with your agent runs, model requests, and tool invocations.
245+
246+
For more information about Prefect monitoring, see the [Prefect documentation](https://docs.prefect.io/).
247+
248+
## Deployments and Scheduling
249+
250+
To deploy and schedule a `PrefectAgent`, wrap it in a Prefect flow and use the flow's [`serve()`](https://docs.prefect.io/v3/how-to-guides/deployments/create-deployments#create-a-deployment-with-serve) or [`deploy()`](https://docs.prefect.io/v3/how-to-guides/deployments/deploy-via-python) methods:
251+
252+
```python {title="serve_agent.py" test="skip"}
253+
from prefect import flow
254+
255+
from pydantic_ai import Agent
256+
from pydantic_ai.durable_exec.prefect import PrefectAgent
257+
258+
agent = Agent(
259+
'openai:gpt-4o',
260+
name='daily_report_agent',
261+
instructions='Generate a daily summary report.',
262+
)
263+
264+
prefect_agent = PrefectAgent(agent)
265+
266+
@flow
267+
async def daily_report_flow(user_prompt: str):
268+
"""Generate a daily report using the agent."""
269+
result = await prefect_agent.run(user_prompt)
270+
return result.output
271+
272+
# Serve the flow with a daily schedule
273+
if __name__ == '__main__':
274+
daily_report_flow.serve(
275+
name='daily-report-deployment',
276+
cron='0 9 * * *', # Run daily at 9am
277+
parameters={'user_prompt': "Generate today's report"},
278+
tags=['production', 'reports'],
279+
)
280+
```
281+
282+
The `serve()` method accepts scheduling options:
283+
284+
- **`cron`**: Cron schedule string (e.g., `'0 9 * * *'` for daily at 9am)
285+
- **`interval`**: Schedule interval in seconds or as a timedelta
286+
- **`rrule`**: iCalendar RRule schedule string
287+
288+
For production deployments with Docker, Kubernetes, or other infrastructure, use the flow's [`deploy()`](https://docs.prefect.io/v3/how-to-guides/deployments/deploy-via-python) method. See the [Prefect deployment documentation](https://docs.prefect.io/v3/how-to-guides/deployments/create-deploymentsy) for more information.

docs/install.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pip/uv-add "pydantic-ai-slim[openai]"
5858
* `a2a` - installs `fasta2a` [PyPI ↗](https://pypi.org/project/fasta2a){:target="_blank"}
5959
* `ag-ui` - installs `ag-ui-protocol` [PyPI ↗](https://pypi.org/project/ag-ui-protocol){:target="_blank"} and `starlette` [PyPI ↗](https://pypi.org/project/starlette){:target="_blank"}
6060
* `dbos` - installs [`dbos`](durable_execution/dbos.md) [PyPI ↗](https://pypi.org/project/dbos){:target="_blank"}
61+
* `prefect` - installs [`prefect`](durable_execution/prefect.md) [PyPI ↗](https://pypi.org/project/prefect){:target="_blank"}
6162

6263
See the [models](models/overview.md) documentation for information on which optional dependencies are required for each model.
6364

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ nav:
6565
- Overview: durable_execution/overview.md
6666
- Temporal: durable_execution/temporal.md
6767
- DBOS: durable_execution/dbos.md
68+
- Prefect: durable_execution/prefect.md
6869
- Agent-User Interaction (AG-UI): ag-ui.md
6970
- Agent2Agent (A2A): a2a.md
7071

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from ._agent import PrefectAgent
2+
from ._cache_policies import DEFAULT_PYDANTIC_AI_CACHE_POLICY
3+
from ._function_toolset import PrefectFunctionToolset
4+
from ._mcp_server import PrefectMCPServer
5+
from ._model import PrefectModel
6+
from ._types import TaskConfig
7+
8+
__all__ = [
9+
'PrefectAgent',
10+
'PrefectModel',
11+
'PrefectMCPServer',
12+
'PrefectFunctionToolset',
13+
'TaskConfig',
14+
'DEFAULT_PYDANTIC_AI_CACHE_POLICY',
15+
]

0 commit comments

Comments
 (0)