Skip to content

Commit 6497f63

Browse files
authored
Merge branch 'pydantic:main' into sync_stream
2 parents bb5c7fe + 92289ce commit 6497f63

File tree

71 files changed

+9677
-212
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+9677
-212
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ We built Pydantic AI with one simple aim: to bring that FastAPI feeling to GenAI
3939
[Pydantic Validation](https://docs.pydantic.dev/latest/) is the validation layer of the OpenAI SDK, the Google ADK, the Anthropic SDK, LangChain, LlamaIndex, AutoGPT, Transformers, CrewAI, Instructor and many more. _Why use the derivative when you can go straight to the source?_ :smiley:
4040

4141
2. **Model-agnostic**:
42-
Supports virtually every [model](https://ai.pydantic.dev/models/overview) and provider: OpenAI, Anthropic, Gemini, DeepSeek, Grok, Cohere, Mistral, and Perplexity; Azure AI Foundry, Amazon Bedrock, Google Vertex AI, Ollama, LiteLLM, Groq, OpenRouter, Together AI, Fireworks AI, Cerebras, Hugging Face, GitHub, Heroku, Vercel. If your favorite model or provider is not listed, you can easily implement a [custom model](https://ai.pydantic.dev/models/overview#custom-models).
42+
Supports virtually every [model](https://ai.pydantic.dev/models/overview) and provider: OpenAI, Anthropic, Gemini, DeepSeek, Grok, Cohere, Mistral, and Perplexity; Azure AI Foundry, Amazon Bedrock, Google Vertex AI, Ollama, LiteLLM, Groq, OpenRouter, Together AI, Fireworks AI, Cerebras, Hugging Face, GitHub, Heroku, Vercel, Nebius. If your favorite model or provider is not listed, you can easily implement a [custom model](https://ai.pydantic.dev/models/overview#custom-models).
4343

4444
3. **Seamless Observability**:
4545
Tightly [integrates](https://ai.pydantic.dev/logfire) with [Pydantic Logfire](https://pydantic.dev/logfire), our general-purpose OpenTelemetry observability platform, for real-time debugging, evals-based performance monitoring, and behavior, tracing, and cost tracking. If you already have an observability platform that supports OTel, you can [use that too](https://ai.pydantic.dev/logfire#alternative-observability-backends).

docs/api/durable_exec.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
::: pydantic_ai.durable_exec.temporal
44

55
::: pydantic_ai.durable_exec.dbos
6+
7+
::: pydantic_ai.durable_exec.prefect

docs/api/providers.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,5 @@
4141
::: pydantic_ai.providers.ollama.OllamaProvider
4242

4343
::: pydantic_ai.providers.litellm.LiteLLMProvider
44+
45+
::: pydantic_ai.providers.nebius.NebiusProvider

docs/builtin-tools.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ The [`ImageGenerationTool`][pydantic_ai.builtin_tools.ImageGenerationTool] enabl
199199
| Provider | Supported | Notes |
200200
|----------|-----------|-------|
201201
| OpenAI Responses || Full feature support. Only supported by models newer than `gpt-4o`. Metadata about the generated image, like the [`revised_prompt`](https://platform.openai.com/docs/guides/tools-image-generation#revised-prompt) sent to the underlying image model, is available on the [`BuiltinToolReturnPart`][pydantic_ai.messages.BuiltinToolReturnPart] that's available via [`ModelResponse.builtin_tool_calls`][pydantic_ai.messages.ModelResponse.builtin_tool_calls]. |
202-
| Google || No parameter support. Only supported by [image generation models](https://ai.google.dev/gemini-api/docs/image-generation) like `gemini-2.5-flash-image-preview`. These models do not support [structured output](output.md) or [function tools](tools.md). These models will always generate images, even if this built-in tool is not explicitly specified. |
202+
| Google || No parameter support. Only supported by [image generation models](https://ai.google.dev/gemini-api/docs/image-generation) like `gemini-2.5-flash-image`. These models do not support [structured output](output.md) or [function tools](tools.md). These models will always generate images, even if this built-in tool is not explicitly specified. |
203203
| Anthropic || |
204204
| Groq || |
205205
| Bedrock || |
@@ -232,7 +232,7 @@ Image generation with Google [image generation models](https://ai.google.dev/gem
232232
```py {title="image_generation_google.py"}
233233
from pydantic_ai import Agent, BinaryImage
234234

235-
agent = Agent('google-gla:gemini-2.5-flash-image-preview')
235+
agent = Agent('google-gla:gemini-2.5-flash-image')
236236

237237
result = agent.run_sync('Tell me a two-sentence story about an axolotl with an illustration.')
238238
print(result.output)

docs/durable_execution/overview.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](../agents.md#streaming-all-events) and [MCP](../mcp/client.md), with the added benefit of fault tolerance.
44

5-
Pydantic AI natively supports two durable execution solutions:
5+
Pydantic AI natively supports three durable execution solutions:
66

77
- [Temporal](./temporal.md)
88
- [DBOS](./dbos.md)
9+
- [Prefect](./prefect.md)
910

10-
These integrations only uses Pydantic AI's public interface, so they also serve as a reference for integrating with other durable systems.
11+
These integrations only use Pydantic AI's public interface, so they also serve as a reference for integrating with other durable systems.

docs/durable_execution/prefect.md

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
# Durable Execution with Prefect
2+
3+
[Prefect](https://www.prefect.io/) is a workflow orchestration framework for building resilient data pipelines in Python, natively integrated with Pydantic AI.
4+
5+
## Durable Execution
6+
7+
Prefect 3.0 brings [transactional semantics](https://www.prefect.io/blog/transactional-ml-pipelines-with-prefect-3-0) to your Python workflows, allowing you to group tasks into atomic units and define failure modes. If any part of a transaction fails, the entire transaction can be rolled back to a clean state.
8+
9+
* **Flows** are the top-level entry points for your workflow. They can contain tasks and other flows.
10+
* **Tasks** are individual units of work that can be retried, cached, and monitored independently.
11+
12+
Prefect 3.0's approach to transactional orchestration makes your workflows automatically **idempotent**: rerunnable without duplication or inconsistency across any environment. Every task is executed within a transaction that governs when and where the task's result record is persisted. If the task runs again under an identical context, it will not re-execute but instead load its previous result.
13+
14+
The diagram below shows the overall architecture of an agentic application with Prefect.
15+
Prefect uses client-side task orchestration by default, with optional server connectivity for advanced features like scheduling and monitoring.
16+
17+
```text
18+
+---------------------+
19+
| Prefect Server | (Monitoring,
20+
| or Cloud | scheduling, UI,
21+
+---------------------+ orchestration)
22+
^
23+
|
24+
Flow state, | Schedule flows,
25+
metadata, | track execution
26+
logs |
27+
|
28+
+------------------------------------------------------+
29+
| Application Process |
30+
| +----------------------------------------------+ |
31+
| | Flow (Agent.run) | |
32+
| +----------------------------------------------+ |
33+
| | | | |
34+
| v v v |
35+
| +-----------+ +------------+ +-------------+ |
36+
| | Task | | Task | | Task | |
37+
| | (Tool) | | (MCP Tool) | | (Model API) | |
38+
| +-----------+ +------------+ +-------------+ |
39+
| | | | |
40+
| Cache & Cache & Cache & |
41+
| persist persist persist |
42+
| to to to |
43+
| v v v |
44+
| +----------------------------------------------+ |
45+
| | Result Storage (Local FS, S3, etc.) | |
46+
| +----------------------------------------------+ |
47+
+------------------------------------------------------+
48+
| | |
49+
v v v
50+
[External APIs, services, databases, etc.]
51+
```
52+
53+
See the [Prefect documentation](https://docs.prefect.io/) for more information.
54+
55+
## Durable Agent
56+
57+
Any agent can be wrapped in a [`PrefectAgent`][pydantic_ai.durable_exec.prefect.PrefectAgent] to get durable execution. `PrefectAgent` automatically:
58+
59+
* Wraps [`Agent.run`][pydantic_ai.Agent.run] and [`Agent.run_sync`][pydantic_ai.Agent.run_sync] as Prefect flows.
60+
* Wraps [model requests](../models/overview.md) as Prefect tasks.
61+
* Wraps [tool calls](../tools.md) as Prefect tasks (configurable per-tool).
62+
* Wraps [MCP communication](../mcp/client.md) as Prefect tasks.
63+
64+
Event stream handlers are **automatically wrapped** by Prefect when running inside a Prefect flow. Each event from the stream is processed in a separate Prefect task for durability. You can customize the task behavior using the `event_stream_handler_task_config` parameter when creating the `PrefectAgent`. Do **not** manually decorate event stream handlers with `@task`. For examples, see the [streaming docs](../agents.md#streaming-all-events)
65+
66+
The original agent, model, and MCP server can still be used as normal outside the Prefect flow.
67+
68+
Here is a simple but complete example of wrapping an agent for durable execution. All it requires is to install Pydantic AI with Prefect:
69+
70+
```bash
71+
pip/uv-add pydantic-ai[prefect]
72+
```
73+
74+
Or if you're using the slim package, you can install it with the `prefect` optional group:
75+
76+
```bash
77+
pip/uv-add pydantic-ai-slim[prefect]
78+
```
79+
80+
```python {title="prefect_agent.py" test="skip"}
81+
from pydantic_ai import Agent
82+
from pydantic_ai.durable_exec.prefect import PrefectAgent
83+
84+
agent = Agent(
85+
'gpt-4o',
86+
instructions="You're an expert in geography.",
87+
name='geography', # (1)!
88+
)
89+
90+
prefect_agent = PrefectAgent(agent) # (2)!
91+
92+
async def main():
93+
result = await prefect_agent.run('What is the capital of Mexico?') # (3)!
94+
print(result.output)
95+
#> Mexico City (Ciudad de México, CDMX)
96+
```
97+
98+
1. The agent's `name` is used to uniquely identify its flows and tasks.
99+
2. Wrapping the agent with `PrefectAgent` enables durable execution for all agent runs.
100+
3. [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run] works like [`Agent.run()`][pydantic_ai.Agent.run], but runs as a Prefect flow and executes model requests, decorated tool calls, and MCP communication as Prefect tasks.
101+
102+
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
103+
104+
For more information on how to use Prefect in Python applications, see their [Python documentation](https://docs.prefect.io/v3/how-to-guides/workflows/write-and-run).
105+
106+
## Prefect Integration Considerations
107+
108+
When using Prefect with Pydantic AI agents, there are a few important considerations to ensure workflows behave correctly.
109+
110+
### Agent Requirements
111+
112+
Each agent instance must have a unique `name` so Prefect can correctly identify and track its flows and tasks.
113+
114+
### Tool Wrapping
115+
116+
Agent tools are automatically wrapped as Prefect tasks, which means they benefit from:
117+
118+
* **Retry logic**: Failed tool calls can be retried automatically
119+
* **Caching**: Tool results are cached based on their inputs
120+
* **Observability**: Tool execution is tracked in the Prefect UI
121+
122+
You can customize tool task behavior using `tool_task_config` (applies to all tools) or `tool_task_config_by_name` (per-tool configuration):
123+
124+
```python {title="prefect_agent_config.py" test="skip"}
125+
from pydantic_ai import Agent
126+
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig
127+
128+
agent = Agent('gpt-4o', name='my_agent')
129+
130+
@agent.tool_plain
131+
def fetch_data(url: str) -> str:
132+
# This tool will be wrapped as a Prefect task
133+
...
134+
135+
prefect_agent = PrefectAgent(
136+
agent,
137+
tool_task_config=TaskConfig(retries=3), # Default for all tools
138+
tool_task_config_by_name={
139+
'fetch_data': TaskConfig(timeout_seconds=10.0), # Specific to fetch_data
140+
'simple_tool': None, # Disable task wrapping for simple_tool
141+
},
142+
)
143+
```
144+
145+
Set a tool's config to `None` in `tool_task_config_by_name` to disable task wrapping for that specific tool.
146+
147+
### Streaming
148+
149+
When running inside a Prefect flow, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] works but doesn't provide real-time streaming because Prefect tasks consume their entire execution before returning results. The method will execute fully and return the complete result at once.
150+
151+
For real-time streaming behavior inside Prefect flows, you can set an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `PrefectAgent` instance and use [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run].
152+
153+
**Note**: Event stream handlers behave differently when running inside a Prefect flow versus outside:
154+
- **Outside a flow**: The handler receives events as they stream from the model
155+
- **Inside a flow**: Each event is wrapped as a Prefect task for durability, which may affect timing but ensures reliability
156+
157+
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).
158+
159+
## Task Configuration
160+
161+
You can customize Prefect task behavior, such as retries and timeouts, by passing [`TaskConfig`][pydantic_ai.durable_exec.prefect.TaskConfig] objects to the `PrefectAgent` constructor:
162+
163+
- `mcp_task_config`: Configuration for MCP server communication tasks
164+
- `model_task_config`: Configuration for model request tasks
165+
- `tool_task_config`: Default configuration for all tool calls
166+
- `tool_task_config_by_name`: Per-tool task configuration (overrides `tool_task_config`)
167+
- `event_stream_handler_task_config`: Configuration for event stream handler tasks (applies when running inside a Prefect flow)
168+
169+
Available `TaskConfig` options:
170+
171+
- `retries`: Maximum number of retries for the task (default: `0`)
172+
- `retry_delay_seconds`: Delay between retries in seconds (can be a single value or list for exponential backoff, default: `1.0`)
173+
- `timeout_seconds`: Maximum time in seconds for the task to complete
174+
- `cache_policy`: Custom Prefect cache policy for the task
175+
- `persist_result`: Whether to persist the task result
176+
- `result_storage`: Prefect result storage for the task (e.g., `'s3-bucket/my-storage'` or a `WritableFileSystem` block)
177+
- `log_prints`: Whether to log print statements from the task (default: `False`)
178+
179+
Example:
180+
181+
```python {title="prefect_agent_config.py" test="skip"}
182+
from pydantic_ai import Agent
183+
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig
184+
185+
agent = Agent(
186+
'gpt-4o',
187+
instructions="You're an expert in geography.",
188+
name='geography',
189+
)
190+
191+
prefect_agent = PrefectAgent(
192+
agent,
193+
model_task_config=TaskConfig(
194+
retries=3,
195+
retry_delay_seconds=[1.0, 2.0, 4.0], # Exponential backoff
196+
timeout_seconds=30.0,
197+
),
198+
)
199+
200+
async def main():
201+
result = await prefect_agent.run('What is the capital of France?')
202+
print(result.output)
203+
#> Paris
204+
```
205+
206+
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
207+
208+
### Retry Considerations
209+
210+
Pydantic AI and provider API clients have their own retry logic. When using Prefect, you may want to:
211+
212+
* Disable [HTTP Request Retries](../retries.md) in Pydantic AI
213+
* Turn off your provider API client's retry logic (e.g., `max_retries=0` on a [custom OpenAI client](../models/openai.md#custom-openai-client))
214+
* Rely on Prefect's task-level retry configuration for consistency
215+
216+
This prevents requests from being retried multiple times at different layers.
217+
218+
## Caching and Idempotency
219+
220+
Prefect 3.0 provides built-in caching and transactional semantics. Tasks with identical inputs will not re-execute if their results are already cached, making workflows naturally idempotent and resilient to failures.
221+
222+
* **Task inputs**: Messages, settings, parameters, tool arguments, and serializable dependencies
223+
224+
**Note**: For user dependencies to be included in cache keys, they must be serializable (e.g., Pydantic models or basic Python types). Non-serializable dependencies are automatically excluded from cache computation.
225+
226+
## Observability with Prefect and Logfire
227+
228+
Prefect provides a built-in UI for monitoring flow runs, task executions, and failures. You can:
229+
230+
* View real-time flow run status
231+
* Debug failures with full stack traces
232+
* Set up alerts and notifications
233+
234+
To access the Prefect UI, you can either:
235+
236+
1. Use [Prefect Cloud](https://www.prefect.io/cloud) (managed service)
237+
2. Run a local [Prefect server](https://docs.prefect.io/v3/how-to-guides/self-hosted/server-cli) with `prefect server start`
238+
239+
You can also use [Pydantic Logfire](../logfire.md) for detailed observability. When using both Prefect and Logfire, you'll get complementary views:
240+
241+
* **Prefect**: Workflow-level orchestration, task status, and retry history
242+
* **Logfire**: Fine-grained tracing of agent runs, model requests, and tool invocations
243+
244+
When using Logfire with Prefect, you can enable distributed tracing to see spans for your Prefect runs included with your agent runs, model requests, and tool invocations.
245+
246+
For more information about Prefect monitoring, see the [Prefect documentation](https://docs.prefect.io/).
247+
248+
## Deployments and Scheduling
249+
250+
To deploy and schedule a `PrefectAgent`, wrap it in a Prefect flow and use the flow's [`serve()`](https://docs.prefect.io/v3/how-to-guides/deployments/create-deployments#create-a-deployment-with-serve) or [`deploy()`](https://docs.prefect.io/v3/how-to-guides/deployments/deploy-via-python) methods:
251+
252+
```python {title="serve_agent.py" test="skip"}
253+
from prefect import flow
254+
255+
from pydantic_ai import Agent
256+
from pydantic_ai.durable_exec.prefect import PrefectAgent
257+
258+
agent = Agent(
259+
'openai:gpt-4o',
260+
name='daily_report_agent',
261+
instructions='Generate a daily summary report.',
262+
)
263+
264+
prefect_agent = PrefectAgent(agent)
265+
266+
@flow
267+
async def daily_report_flow(user_prompt: str):
268+
"""Generate a daily report using the agent."""
269+
result = await prefect_agent.run(user_prompt)
270+
return result.output
271+
272+
# Serve the flow with a daily schedule
273+
if __name__ == '__main__':
274+
daily_report_flow.serve(
275+
name='daily-report-deployment',
276+
cron='0 9 * * *', # Run daily at 9am
277+
parameters={'user_prompt': "Generate today's report"},
278+
tags=['production', 'reports'],
279+
)
280+
```
281+
282+
The `serve()` method accepts scheduling options:
283+
284+
- **`cron`**: Cron schedule string (e.g., `'0 9 * * *'` for daily at 9am)
285+
- **`interval`**: Schedule interval in seconds or as a timedelta
286+
- **`rrule`**: iCalendar RRule schedule string
287+
288+
For production deployments with Docker, Kubernetes, or other infrastructure, use the flow's [`deploy()`](https://docs.prefect.io/v3/how-to-guides/deployments/deploy-via-python) method. See the [Prefect deployment documentation](https://docs.prefect.io/v3/how-to-guides/deployments/create-deploymentsy) for more information.

0 commit comments

Comments
 (0)