Skip to content

Conversation

desertaxle
Copy link

@desertaxle desertaxle commented Oct 2, 2025

Summary

This PR introduces an integration with Prefect to enable durable, fault-tolerant execution of Pydantic AI agents.

Examples

Run a pydantic-ai agent as a Prefect flow

from pydantic_ai import Agent
from pydantic_ai.durable_exec.prefect import PrefectAgent

# Existing agent
agent = Agent('gpt-4o', name='my_agent')

# Enable durable execution
prefect_agent = PrefectAgent(agent)

# Use as normal, now run as a Prefect flow with model call and tool call tasks
result = await prefect_agent.run('What is the capital of Mexico?')

print(result.output) # The capital of Mexico is Mexico City

The PrefectAgent class wraps an Agent and instruments it so that .run calls are executed as a Prefect flow, and all tool and model calls are executed as Prefect tasks.

Customize underlying task behavior

prefect_agent = PrefectAgent(
    agent,
    model_task_config=TaskConfig(
        retries=3,
        retry_delay_seconds=[1.0, 2.0, 4.0],
        timeout_seconds=30.0,
    ),
)

Create an agent service to enable scheduling and remote execution

# Deploy agent with scheduled execution
from prefect import flow
from prefect.deployments import run_deployment
from pydantic_ai import Agent
from pydantic_ai.durable_exec.prefect import PrefectAgent

agent = Agent(
    'openai:gpt-4o',
    name='daily_report_agent',
    instructions='Generate a daily summary report.',
)

prefect_agent = PrefectAgent(agent)

@flow
async def daily_report_flow(user_prompt: str):
    """Generate a daily report using the agent."""
    result = await prefect_agent.run(user_prompt)
    return result.output

# Serve the flow with a daily schedule
if __name__ == '__main__':
    daily_report_flow.serve(
        name='daily-report-deployment',
        cron='0 9 * * *',  # Run daily at 9am
        parameters={'user_prompt': "Generate today's report"},
        tags=['production', 'reports'],
    )

Implementation Details

The integration is implemented through:

  • PrefectAgent: Wrapper class that extends WrapperAgent to intercept and wrap agent operations
  • PrefectModel: Model wrapper that executes requests as Prefect tasks with event streaming support
  • PrefectFunctionToolset: Wraps tool calls as tasks while preserving type safety
  • A custom Prefect cache policy to ensure resumability of agent runs

Copy link
Contributor

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple small thoughts

@desertaxle desertaxle marked this pull request as ready for review October 7, 2025 19:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants