You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/durable_execution/dbos.md
+7-9Lines changed: 7 additions & 9 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,10 +1,8 @@
1
1
# Durable Execution with DBOS
2
2
3
-
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](agents.md#streaming-all-events) and [MCP](mcp/client.md), with the added benefit of fault tolerance.
3
+
[DBOS](https://www.dbos.dev/) is a lightweight [durable execution](https://docs.dbos.dev/architecture) library natively integrated with Pydantic AI.
4
4
5
-
[DBOS](https://www.dbos.dev/) is a lightweight [durable execution](https://docs.dbos.dev/architecture) library natively integrated with Pydantic AI. The integration only uses Pydantic AI's public interface, so it also serves as a reference for integrating with other durable systems.
6
-
7
-
### Durable Execution
5
+
## Durable Execution
8
6
9
7
DBOS workflows make your program **durable** by checkpointing its state in a database. If your program ever fails, when it restarts all your workflows will automatically resume from the last completed step.
10
8
@@ -50,7 +48,7 @@ See the [DBOS documentation](https://docs.dbos.dev/architecture) for more inform
50
48
Any agent can be wrapped in a [`DBOSAgent`][pydantic_ai.durable_exec.dbos.DBOSAgent] to get durable execution. `DBOSAgent` automatically:,
51
49
52
50
* Wraps `Agent.run` and `Agent.run_sync` as DBOS workflows.
53
-
* Wraps [model requests](models/index.md) and [MCP communication](mcp/client.md) as DBOS steps.
51
+
* Wraps [model requests](../models/overview.md) and [MCP communication](../mcp/client.md) as DBOS steps.
54
52
55
53
Custom tool functions are not wrapped automatically. You can decorate them with `@DBOS.workflow` or `@DBOS.step` as needed.
56
54
@@ -112,14 +110,14 @@ Other than that, any agent and toolset will just work!
112
110
113
111
### Agent Run Context and Dependencies
114
112
115
-
DBOS checkpoints workflow inputs/outputs and step outputs into a database using `jsonpickle`. This means you need to make sure [dependencies](dependencies.md) object provided to [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] or [`DBOSAgent.run_sync()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run_sync], and tool outputs can be serialized using jsonpickle. You may also want to keep the inputs and outputs small (under \~2 MB). PostgreSQL and SQLite support up to 1 GB per field, but large objects may impact performance.
113
+
DBOS checkpoints workflow inputs/outputs and step outputs into a database using `jsonpickle`. This means you need to make sure [dependencies](../dependencies.md) object provided to [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] or [`DBOSAgent.run_sync()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run_sync], and tool outputs can be serialized using jsonpickle. You may also want to keep the inputs and outputs small (under \~2 MB). PostgreSQL and SQLite support up to 1 GB per field, but large objects may impact performance.
116
114
117
115
### Streaming
118
116
119
117
Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] is not supported when running inside of a DBOS workflow.
120
118
121
119
Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `DBOSAgent` instance and using [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run].
122
-
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](agents.md#streaming-all-events).
120
+
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).
123
121
124
122
125
123
## Step Configuration
@@ -136,13 +134,13 @@ For custom tools, you can annotate them directly with [`@DBOS.step`](https://doc
136
134
137
135
On top of the automatic retries for request failures that DBOS will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time may cause the request to be retried more often than expected, with improper `Retry-After` handling.
138
136
139
-
When using DBOS, it's recommended to not use [HTTP Request Retries](retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](models/openai.md#custom-openai-client).
137
+
When using DBOS, it's recommended to not use [HTTP Request Retries](../retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](../models/openai.md#custom-openai-client).
140
138
141
139
You can customize DBOS's retry policy using [step configuration](#step-configuration).
142
140
143
141
## Observability with Logfire
144
142
145
-
DBOS emits OpenTelemetry traces and events for every workflow and step, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to [Pydantic Logfire](logfire.md) to get a complete picture of what's happening in your application.
143
+
DBOS emits OpenTelemetry traces and events for every workflow and step, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to [Pydantic Logfire](../logfire.md) to get a complete picture of what's happening in your application.
146
144
147
145
To disable DBOS traces and logs, you can set `disable_otlp=True` in `DBOSConfig`. For example:
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](../agents.md#streaming-all-events) and [MCP](../mcp/client.md), with the added benefit of fault tolerance.
4
+
5
+
Pydantic AI natively supports two durable execution solutions:
6
+
7
+
-[Temporal](./temporal.md)
8
+
-[DBOS](./dbos.md)
9
+
10
+
These integrations only uses Pydantic AI's public interface, so they also serve as a reference for integrating with other durable systems.
Copy file name to clipboardExpand all lines: docs/durable_execution/temporal.md
+7-10Lines changed: 7 additions & 10 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,11 +1,8 @@
1
1
# Durable Execution with Temporal
2
2
3
-
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](agents.md#streaming-all-events) and [MCP](mcp/client.md), with the added benefit of fault tolerance.
4
-
5
3
[Temporal](https://temporal.io) is a popular [durable execution](https://docs.temporal.io/evaluate/understanding-temporal#durable-execution) platform that's natively supported by Pydantic AI.
6
-
The integration only uses Pydantic AI's public interface, so it can also serve as a reference for how to integrate with another durable execution systems.
7
4
8
-
###Durable Execution
5
+
## Durable Execution
9
6
10
7
In Temporal's durable execution implementation, a program that crashes or encounters an exception while interacting with a model or API will retry until it can successfully complete.
11
8
@@ -29,7 +26,7 @@ Activity code faces no restrictions on I/O or external interactions, but if an a
29
26
30
27
See the [Temporal documentation](https://docs.temporal.io/evaluate/understanding-temporal#temporal-application-the-building-blocks) for more information
31
28
32
-
In the case of Pydantic AI agents, integration with Temporal means that [model requests](models/overview.md), [tool calls](tools.md) that may require I/O, and [MCP server communication](mcp/client.md) all need to be offloaded to Temporal activities due to their I/O requirements, while the logic that coordinates them (i.e. the agent run) lives in the workflow. Code that handles a scheduled job or web request can then execute the workflow, which will in turn execute the activities as needed.
29
+
In the case of Pydantic AI agents, integration with Temporal means that [model requests](../models/overview.md), [tool calls](../tools.md) that may require I/O, and [MCP server communication](../mcp/client.md) all need to be offloaded to Temporal activities due to their I/O requirements, while the logic that coordinates them (i.e. the agent run) lives in the workflow. Code that handles a scheduled job or web request can then execute the workflow, which will in turn execute the activities as needed.
33
30
34
31
The diagram below shows the overall architecture of an agentic application in Temporal.
35
32
The Temporal Server is responsible for tracking program execution and making sure the associated state is preserved reliably (i.e., stored to an internal database, and possibly replicated across cloud regions).
@@ -71,7 +68,7 @@ See the [Temporal documentation](https://docs.temporal.io/evaluate/understanding
71
68
72
69
Any agent can be wrapped in a [`TemporalAgent`][pydantic_ai.durable_exec.temporal.TemporalAgent] to get a durable agent that can be used inside a deterministic Temporal workflow, by automatically offloading all work that requires I/O (namely model requests, tool calls, and MCP server communication) to non-deterministic activities.
73
70
74
-
At the time of wrapping, the agent's [model](models/overview.md) and [toolsets](toolsets.md) (including function tools registered on the agent and MCP servers) are frozen, activities are dynamically created for each, and the original model and toolsets are wrapped to call on the worker to execute the corresponding activities instead of directly performing the actions inside the workflow. The original agent can still be used as normal outside the Temporal workflow, but any changes to its model or toolsets after wrapping will not be reflected in the durable agent.
71
+
At the time of wrapping, the agent's [model](../models/overview.md) and [toolsets](../toolsets.md) (including function tools registered on the agent and MCP servers) are frozen, activities are dynamically created for each, and the original model and toolsets are wrapped to call on the worker to execute the corresponding activities instead of directly performing the actions inside the workflow. The original agent can still be used as normal outside the Temporal workflow, but any changes to its model or toolsets after wrapping will not be reflected in the durable agent.
75
72
76
73
Here is a simple but complete example of wrapping an agent for durable execution, creating a Temporal workflow with durable execution logic, connecting to a Temporal server, and running the workflow from non-durable code. All it requires is a Temporal server to be [running locally](https://github.com/temporalio/temporal#download-and-start-temporal-server-locally):
77
74
@@ -167,7 +164,7 @@ Other than that, any agent and toolset will just work!
167
164
168
165
As workflows and activities run in separate processes, any values passed between them need to be serializable. As these payloads are stored in the workflow execution event history, Temporal limits their size to 2MB.
169
166
170
-
To account for these limitations, tool functions and the [event stream handler](#streaming) running inside activities receive a limited version of the agent's [`RunContext`][pydantic_ai.tools.RunContext], and it's your responsibility to make sure that the [dependencies](dependencies.md) object provided to [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] can be serialized using Pydantic.
167
+
To account for these limitations, tool functions and the [event stream handler](#streaming) running inside activities receive a limited version of the agent's [`RunContext`][pydantic_ai.tools.RunContext], and it's your responsibility to make sure that the [dependencies](../dependencies.md) object provided to [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] can be serialized using Pydantic.
171
168
172
169
Specifically, only the `deps`, `retries`, `tool_call_id`, `tool_name`, `tool_call_approved`, `retry`, and `run_step` fields are available by default, and trying to access `model`, `usage`, `prompt`, `messages`, or `tracer` will raise an error.
173
170
If you need one or more of these attributes to be available inside activities, you can create a [`TemporalRunContext`][pydantic_ai.durable_exec.temporal.TemporalRunContext] subclass with custom `serialize_run_context` and `deserialize_run_context` class methods and pass it to [`TemporalAgent`][pydantic_ai.durable_exec.temporal.TemporalAgent] as `run_context_type`.
@@ -177,7 +174,7 @@ If you need one or more of these attributes to be available inside activities, y
177
174
Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported.
178
175
179
176
Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `TemporalAgent` instance and using [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] inside the workflow.
180
-
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](agents.md#streaming-all-events).
177
+
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events).
181
178
182
179
As the streaming model request activity, workflow, and workflow execution call all take place in separate processes, passing data between them requires some care:
183
180
@@ -200,13 +197,13 @@ Temporal activity configuration, like timeouts and retry policies, can be custom
200
197
201
198
On top of the automatic retries for request failures that Temporal will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time may cause the request to be retried more often than expected, with improper `Retry-After` handling.
202
199
203
-
When using Temporal, it's recommended to not use [HTTP Request Retries](retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](models/openai.md#custom-openai-client).
200
+
When using Temporal, it's recommended to not use [HTTP Request Retries](../retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](../models/openai.md#custom-openai-client).
204
201
205
202
You can customize Temporal's retry policy using [activity configuration](#activity-configuration).
206
203
207
204
## Observability with Logfire
208
205
209
-
Temporal generates telemetry events and metrics for each workflow and activity execution, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to [Pydantic Logfire](logfire.md) to get a complete picture of what's happening in your application.
206
+
Temporal generates telemetry events and metrics for each workflow and activity execution, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to [Pydantic Logfire](../logfire.md) to get a complete picture of what's happening in your application.
210
207
211
208
To use Logfire with Temporal, you need to pass a [`LogfirePlugin`][pydantic_ai.durable_exec.temporal.LogfirePlugin] object to Temporal's `Client.connect()`:
0 commit comments