Skip to content

Conversation

desertaxle
Copy link

@desertaxle desertaxle commented Oct 2, 2025

Summary

This PR introduces an integration with Prefect to enable durable, fault-tolerant execution of Pydantic AI agents.

Examples

Run a pydantic-ai agent as a Prefect flow

from pydantic_ai import Agent
from pydantic_ai.durable_exec.prefect import PrefectAgent

# Existing agent
agent = Agent('gpt-4o', name='my_agent')

# Enable durable execution
prefect_agent = PrefectAgent(agent)

# Use as normal - now with automatic persistence and recovery
result = await prefect_agent.run('What is the capital of Mexico?')

print(result.output) # The capital of Mexico is Mexico City

The PrefectAgent class wraps an Agent and instruments it so that .run calls are executed as a Prefect flow, and all tool and model calls are executed as Prefect tasks.

Customize underlying task behavior

prefect_agent = PrefectAgent(
    agent,
    model_task_config=TaskConfig(
        retries=3,
        retry_delay_seconds=[1.0, 2.0, 4.0],
        timeout_seconds=30.0,
    ),
)

Create an agent service to enable scheduling and remote execution

# Deploy agent with scheduled execution
prefect_agent.serve(
    name='daily-report-deployment',
    cron='0 9 * * *',  # Run daily at 9am
    parameters={'user_prompt': 'Generate daily report'},
    tags=['production'],
)

Implementation Details

The integration is implemented through:

  • PrefectAgent: Wrapper class that extends WrapperAgent to intercept and wrap agent operations
  • PrefectModel: Model wrapper that executes requests as Prefect tasks with event streaming support
  • PrefectFunctionToolset: Wraps tool calls as tasks while preserving type safety
  • A custom Prefect cache policy to ensure resumability of agent runs

Concerns

Complex Serialization Requirements

The .serve() method requires the Agent to be picklable because each run is executed in a separate process. I've added __getstate__ and __setstate__ implementations for PrefectAgent, but they are complex and may rely on too much internal pydantic-ai functionality.

Copy link
Contributor

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple small thoughts


### Agent Run Context and Dependencies

Prefect persists task results using [Pydantic's serialization](https://docs.pydantic.dev/latest/concepts/serialization/). This means the [dependencies](../dependencies.md) object provided to [`PrefectAgent.run()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run] or [`PrefectAgent.run_sync()`][pydantic_ai.durable_exec.prefect.PrefectAgent.run_sync], and tool outputs should be serializable using Pydantic's `TypeAdapter`. You may also want to keep the inputs and outputs reasonably sized for optimal performance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could mention the escape hatches you have, ie impl a serializer on the model or field type

Comment on lines +20 to +37
def _strip_timestamps(
obj: Any | dict[str, Any] | list[Any] | tuple[Any, ...],
) -> Any:
"""Recursively convert dataclasses to dicts, excluding timestamp fields."""
if is_dataclass(obj) and not isinstance(obj, type):
result: dict[str, Any] = {}
for f in fields(obj):
if f.name != 'timestamp':
value = getattr(obj, f.name)
result[f.name] = _strip_timestamps(value)
return result
elif _is_dict(obj):
return {k: _strip_timestamps(v) for k, v in obj.items() if k != 'timestamp'}
elif _is_list(obj):
return [_strip_timestamps(item) for item in obj]
elif _is_tuple(obj):
return tuple(_strip_timestamps(item) for item in obj)
return obj
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we don't need it, but looks like a special case of visit_collection

retries: int
"""Maximum number of retries for the task."""

retry_delay_seconds: float | list[float]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefect.types.TaskRetryDelaySeconds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants