-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add support for durable execution with Prefect #3074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
desertaxle
wants to merge
23
commits into
pydantic:main
Choose a base branch
from
desertaxle:feat/prefect-durable-execution
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
9ca20f6
Initial implementation of durable execution with Prefect
desertaxle e958798
Add funtion toolset wrapping and logging
desertaxle 3676489
Add ability to serve a `PrefectAgent`
desertaxle 02640df
Get durable execution working through caching
desertaxle 780ae0d
Flesh out test suite for Prefect functionality
desertaxle ac40ad7
Clean up dependency declaration
desertaxle d155cc7
Clean up documentation
desertaxle e6e400f
Fix import outside of guard
desertaxle 57e7b4e
Regenerate `uv.lock`
desertaxle e27add9
Fix `pre-commit` errors
desertaxle a2cb45b
Fix docs tests
desertaxle 85440cd
Update diagram in docs
desertaxle b524d6c
Merge branch 'main' into feat/prefect-durable-execution
desertaxle 62f8f7d
Fix typing error
desertaxle 681b734
Add `prefect` optional depedency to root `pyproject.toml`
desertaxle 1ee8e00
Increase `prefect_test_harness` startup timeout
desertaxle 1fa3a75
Remove serialization logic from `PrefectAgent`
desertaxle 162c0ff
Remove `.serve` from `PrefectAgent`
desertaxle ae486ff
Address CI failures and review comments
desertaxle 5747c1a
Revert typing change
desertaxle 98e70b1
Merge branch 'main' into feat/prefect-durable-execution
desertaxle 50ef64e
Fix linting and typing errors
desertaxle 1f589bc
Remove run context serialization logic
desertaxle File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
# Durable Execution with Prefect | ||
|
||
[Prefect](https://www.prefect.io/) is a workflow orchestration framework for building resilient data pipelines in Python, natively integrated with Pydantic AI. | ||
|
||
## Durable Execution | ||
|
||
Prefect 3.0 brings [transactional semantics](https://www.prefect.io/blog/transactional-ml-pipelines-with-prefect-3-0) to your Python workflows, allowing you to group tasks into atomic units and define failure modes. If any part of a transaction fails, the entire transaction can be rolled back to a clean state. | ||
|
||
* **Flows** are the top-level entry points for your workflow. They can contain tasks and other flows. | ||
* **Tasks** are individual units of work that can be retried, cached, and monitored independently. | ||
|
||
Prefect 3.0's approach to transactional orchestration makes your workflows automatically **idempotent**: rerunnable without duplication or inconsistency across any environment. Every task is executed within a transaction that governs when and where the task's result record is persisted. If the task runs again under an identical context, it will not re-execute but instead load its previous result. | ||
|
||
The diagram below shows the overall architecture of an agentic application with Prefect. | ||
Prefect uses client-side task orchestration by default, with optional server connectivity for advanced features like scheduling and monitoring. | ||
|
||
```text | ||
+---------------------+ | ||
| Prefect Server | (Monitoring, | ||
| or Cloud | scheduling, UI, | ||
+---------------------+ orchestration) | ||
^ | ||
| | ||
Flow state, | Schedule flows, | ||
metadata, | track execution | ||
logs | | ||
| | ||
+------------------------------------------------------+ | ||
| Application Process | | ||
| +----------------------------------------------+ | | ||
| | Flow (Agent.run) | | | ||
| +----------------------------------------------+ | | ||
| | | | | | ||
| v v v | | ||
| +-----------+ +------------+ +-------------+ | | ||
| | Task | | Task | | Task | | | ||
| | (Tool) | | (MCP Tool) | | (Model API) | | | ||
| +-----------+ +------------+ +-------------+ | | ||
| | | | | | ||
| Cache & Cache & Cache & | | ||
| persist persist persist | | ||
| to to to | | ||
| v v v | | ||
| +----------------------------------------------+ | | ||
| | Result Storage (Local FS, S3, etc.) | | | ||
| +----------------------------------------------+ | | ||
+------------------------------------------------------+ | ||
| | | | ||
v v v | ||
[External APIs, services, databases, etc.] | ||
``` | ||
|
||
See the [Prefect documentation](https://docs.prefect.io/) for more information. | ||
|
||
## Durable Agent | ||
|
||
Any agent can be wrapped in a [`PrefectAgent`][pydantic_ai.durable_exec.prefect.PrefectAgent] to get durable execution. `PrefectAgent` automatically: | ||
|
||
* Wraps [`Agent.run`][pydantic_ai.Agent.run] and [`Agent.run_sync`][pydantic_ai.Agent.run_sync] as Prefect flows. | ||
* Wraps [model requests](../models/overview.md) as Prefect tasks. | ||
* Wraps [tool calls](../tools.md) as Prefect tasks (configurable per-tool). | ||
* Wraps [MCP communication](../mcp/client.md) as Prefect tasks. | ||
|
||
Event stream handlers are **not automatically wrapped** by Prefect. If they involve I/O or non-deterministic behavior, you can explicitly decorate them with `@task` from Prefect. | ||
|
||
The original agent, model, and MCP server can still be used as normal outside the Prefect flow. | ||
|
||
Here is a simple but complete example of wrapping an agent for durable execution. All it requires is to install Pydantic AI with Prefect: | ||
|
||
```bash | ||
pip/uv-add pydantic-ai[prefect] | ||
``` | ||
|
||
Or if you're using the slim package, you can install it with the `prefect` optional group: | ||
|
||
```bash | ||
pip/uv-add pydantic-ai-slim[prefect] | ||
``` | ||
|
||
```python {title="prefect_agent.py" test="skip"} | ||
from pydantic_ai import Agent | ||
from pydantic_ai.durable_exec.prefect import PrefectAgent | ||
|
||
agent = Agent( | ||
'gpt-4o', | ||
instructions="You're an expert in geography.", | ||
name='geography', # (1)! | ||
) | ||
|
||
prefect_agent = PrefectAgent(agent) # (2)! | ||
|
||
async def main(): | ||
result = await prefect_agent.run('What is the capital of Mexico?') # (3)! | ||
print(result.output) | ||
#> Mexico City (Ciudad de México, CDMX) | ||
``` | ||
|
||
1. The agent's `name` is used to uniquely identify its flows and tasks. | ||
2. Wrapping the agent with `PrefectAgent` enables durable execution for all agent runs. | ||
3. [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run] works like [`Agent.run()`][pydantic_ai.Agent.run], but runs as a Prefect flow and executes model requests, decorated tool calls, and MCP communication as Prefect tasks. | ||
|
||
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_ | ||
|
||
For more information on how to use Prefect in Python applications, see their [Python documentation](https://docs.prefect.io/v3/develop/write-flows). | ||
|
||
## Prefect Integration Considerations | ||
|
||
When using Prefect with Pydantic AI agents, there are a few important considerations to ensure workflows behave correctly. | ||
|
||
### Agent Requirements | ||
|
||
Each agent instance must have a unique `name` so Prefect can correctly identify and track its flows and tasks. | ||
|
||
### Tool Wrapping | ||
|
||
Agent tools are automatically wrapped as Prefect tasks, which means they benefit from: | ||
|
||
* **Retry logic**: Failed tool calls can be retried automatically | ||
* **Caching**: Tool results are cached based on their inputs | ||
* **Observability**: Tool execution is tracked in the Prefect UI | ||
|
||
You can customize tool task behavior using `tool_task_config` (applies to all tools) or `tool_task_config_by_name` (per-tool configuration): | ||
|
||
```python {title="prefect_agent_config.py" test="skip"} | ||
from pydantic_ai import Agent | ||
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig | ||
|
||
agent = Agent('gpt-4o', name='my_agent') | ||
|
||
@agent.tool_plain | ||
def fetch_data(url: str) -> str: | ||
# This tool will be wrapped as a Prefect task | ||
... | ||
|
||
prefect_agent = PrefectAgent( | ||
agent, | ||
tool_task_config=TaskConfig(retries=3), # Default for all tools | ||
tool_task_config_by_name={ | ||
'fetch_data': TaskConfig(timeout_seconds=10.0), # Specific to fetch_data | ||
'simple_tool': None, # Disable task wrapping for simple_tool | ||
}, | ||
) | ||
``` | ||
|
||
Set a tool's config to `None` in `tool_task_config_by_name` to disable task wrapping for that specific tool. | ||
|
||
### Agent Run Context and Dependencies | ||
|
||
Prefect persists task results using [Pydantic's serialization](https://docs.pydantic.dev/latest/concepts/serialization/). This means the [dependencies](../dependencies.md) object provided to [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run] or [`PrefectAgent.run_sync()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run_sync], and tool outputs should be serializable using Pydantic's `TypeAdapter`. You may also want to keep the inputs and outputs reasonably sized for optimal performance. | ||
|
||
### Streaming | ||
|
||
When running inside a Prefect flow, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] works but doesn't provide real-time streaming because Prefect tasks consume their entire execution before returning results. The method will execute fully and return the complete result at once. | ||
|
||
For real-time streaming behavior inside Prefect flows, you can set an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `PrefectAgent` instance and use [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run]. | ||
|
||
**Note**: Event stream handlers behave differently when running inside a Prefect flow versus outside: | ||
- **Outside a flow**: The handler receives events as they stream from the model | ||
- **Inside a flow**: Each event is wrapped as a Prefect task for durability, which may affect timing but ensures reliability | ||
|
||
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events). | ||
|
||
## Task Configuration | ||
|
||
You can customize Prefect task behavior, such as retries and timeouts, by passing [`TaskConfig`][pydantic_ai.durable_exec.prefect.TaskConfig] objects to the `PrefectAgent` constructor: | ||
|
||
- `mcp_task_config`: Configuration for MCP server communication tasks | ||
- `model_task_config`: Configuration for model request tasks | ||
- `tool_task_config`: Default configuration for all tool calls | ||
- `tool_task_config_by_name`: Per-tool task configuration (overrides `tool_task_config`) | ||
|
||
Available `TaskConfig` options: | ||
|
||
- `retries`: Maximum number of retries for the task (default: `0`) | ||
- `retry_delay_seconds`: Delay between retries in seconds (can be a single value or list for exponential backoff, default: `1.0`) | ||
- `timeout_seconds`: Maximum time in seconds for the task to complete | ||
- `cache_policy`: Custom Prefect cache policy for the task | ||
- `persist_result`: Whether to persist the task result | ||
- `result_storage`: Prefect result storage for the task (e.g., `'s3-bucket/my-storage'` or a `WritableFileSystem` block) | ||
- `log_prints`: Whether to log print statements from the task (default: `False`) | ||
|
||
Example: | ||
|
||
```python {title="prefect_agent_config.py" test="skip"} | ||
from pydantic_ai import Agent | ||
from pydantic_ai.durable_exec.prefect import PrefectAgent, TaskConfig | ||
|
||
agent = Agent( | ||
'gpt-4o', | ||
instructions="You're an expert in geography.", | ||
name='geography', | ||
) | ||
|
||
prefect_agent = PrefectAgent( | ||
agent, | ||
model_task_config=TaskConfig( | ||
retries=3, | ||
retry_delay_seconds=[1.0, 2.0, 4.0], # Exponential backoff | ||
timeout_seconds=30.0, | ||
), | ||
) | ||
|
||
async def main(): | ||
result = await prefect_agent.run('What is the capital of France?') | ||
print(result.output) | ||
#> Paris | ||
``` | ||
|
||
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_ | ||
|
||
### Retry Considerations | ||
|
||
Pydantic AI and provider API clients have their own retry logic. When using Prefect, you may want to: | ||
|
||
* Disable [HTTP Request Retries](../retries.md) in Pydantic AI | ||
* Turn off your provider API client's retry logic (e.g., `max_retries=0` on a [custom OpenAI client](../models/openai.md#custom-openai-client)) | ||
* Rely on Prefect's task-level retry configuration for consistency | ||
|
||
This prevents requests from being retried multiple times at different layers. | ||
|
||
## Caching and Idempotency | ||
|
||
Prefect 3.0 provides built-in caching and transactional semantics. Tasks with identical inputs will not re-execute if their results are already cached, making workflows naturally idempotent and resilient to failures. | ||
|
||
* **Task inputs**: Messages, settings, parameters, tool arguments, and serializable dependencies | ||
|
||
**Note**: For user dependencies to be included in cache keys, they must be serializable (e.g., Pydantic models or basic Python types). Non-serializable dependencies are automatically excluded from cache computation. | ||
|
||
## Observability with Prefect and Logfire | ||
|
||
Prefect provides a built-in UI for monitoring flow runs, task executions, and failures. You can: | ||
|
||
* View real-time flow run status | ||
* Inspect task execution history and outputs | ||
* Debug failures with full stack traces | ||
* Set up alerts and notifications | ||
|
||
To access the Prefect UI, you can either: | ||
|
||
1. Use [Prefect Cloud](https://www.prefect.io/cloud) (managed service) | ||
2. Run a local [Prefect server](https://docs.prefect.io/v3/manage/self-host) with `prefect server start` | ||
|
||
You can also use [Pydantic Logfire](../logfire.md) for detailed observability. When using both Prefect and Logfire, you'll get complementary views: | ||
|
||
* **Prefect**: Workflow-level orchestration, task status, and retry history | ||
* **Logfire**: Fine-grained tracing of agent runs, model requests, and tool invocations | ||
|
||
For more information about Prefect monitoring, see the [Prefect documentation](https://docs.prefect.io/). | ||
|
||
## Deployments and Scheduling | ||
|
||
`PrefectAgent` provides a [`serve()`][pydantic_ai.durable_exec.prefect.PrefectAgent.serve] method that creates a Prefect deployment and starts a long-running process to monitor for scheduled work: | ||
|
||
```python {title="serve_agent.py" test="skip"} | ||
from pydantic_ai import Agent | ||
from pydantic_ai.durable_exec.prefect import PrefectAgent | ||
|
||
agent = Agent( | ||
'openai:gpt-4o', | ||
name='daily_report_agent', | ||
instructions='Generate a daily summary report.', | ||
) | ||
|
||
prefect_agent = PrefectAgent(agent) | ||
|
||
# Serve the agent with a daily schedule | ||
prefect_agent.serve( | ||
name='daily-report-deployment', | ||
cron='0 9 * * *', # Run daily at 9am | ||
parameters={'user_prompt': "Generate today's report"}, | ||
tags=['production', 'reports'], | ||
) | ||
``` | ||
|
||
This method accepts scheduling options: | ||
|
||
- **`cron`**: Cron schedule string (e.g., `'0 9 * * *'` for daily at 9am) | ||
- **`interval`**: Schedule interval in seconds or as a timedelta | ||
- **`rrule`**: iCalendar RRule schedule string | ||
|
||
For more advanced deployment patterns, see the [Prefect deployment documentation](https://docs.prefect.io/v3/deploy/infrastructure-examples/docker). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
16 changes: 16 additions & 0 deletions
16
pydantic_ai_slim/pydantic_ai/durable_exec/prefect/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from ._agent import PrefectAgent | ||
from ._cache_policies import DEFAULT_PYDANTIC_AI_CACHE_POLICY, InputsWithoutTimestamps | ||
from ._function_toolset import PrefectFunctionToolset | ||
from ._mcp_server import PrefectMCPServer | ||
from ._model import PrefectModel | ||
from ._types import TaskConfig | ||
|
||
__all__ = [ | ||
'PrefectAgent', | ||
'PrefectModel', | ||
'PrefectMCPServer', | ||
'PrefectFunctionToolset', | ||
'TaskConfig', | ||
'InputsWithoutTimestamps', | ||
'DEFAULT_PYDANTIC_AI_CACHE_POLICY', | ||
] |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.