You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/temporal.md
+31-20Lines changed: 31 additions & 20 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -3,7 +3,7 @@
3
3
!!! note
4
4
Durable execution support is in beta and the public interface is subject to change based on user feedback. We expect it to be stable by the release of Pydantic AI v1 at the end of August. Questions and feedback are welcome in [GitHub issues](https://github.com/pydantic/pydantic-ai/issues) and the [`#pydantic-ai` Slack channel](https://logfire.pydantic.dev/docs/join-slack/).
5
5
6
-
Pydantic AI allows you to build durable agents that never lose their progress and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](agents.md#streaming-all-events) and [MCP](mcp/client.md), with the added benefit of fault tolerance.
6
+
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](agents.md#streaming-all-events) and [MCP](mcp/client.md), with the added benefit of fault tolerance.
7
7
8
8
[Temporal](https://temporal.io) is a popular [durable execution](https://docs.temporal.io/evaluate/understanding-temporal#durable-execution) platform that's natively supported by Pydantic AI.
9
9
The integration only uses Pydantic AI's public interface, so it can also serve as a reference for how to integrate with another durable execution systems.
@@ -15,18 +15,27 @@ In Temporal's durable execution implementation, a program that crashes or encoun
15
15
Temporal relies primarily on a replay mechanism to recover from failures.
16
16
As the program makes progress, Temporal saves key inputs and decisions, allowing a re-started program to pick up right where it left off.
17
17
18
-
The key to making this work is to separate the applications repeatable (deterministic) and non-repeatable (non-deterministic) parts:
18
+
The key to making this work is to separate the application's repeatable (deterministic) and non-repeatable (non-deterministic) parts:
19
19
20
20
1. Deterministic pieces, termed [**workflows**](https://docs.temporal.io/workflow-definition), execute the same way when re-run with the same inputs.
21
21
2. Non-deterministic pieces, termed [**activities**](https://docs.temporal.io/activities), can run arbitrary code, performing I/O and any other operations.
22
22
23
23
Workflow code can run for extended periods and, if interrupted, resume exactly where it left off.
24
-
Activity code faces no restrictions on I/O or external interactions, but if it fails part-way through it restarts from the beginning.
24
+
Critically, workflow code generally _cannot_ include any kind of I/O, over the network, disk, etc.
25
+
Activity code faces no restrictions on I/O or external interactions, but if an activity fails part-way through it is restarted from the beginning.
25
26
26
-
In the case of Pydantic AI agents, this means that [model requests](models/index.md), [tool calls](tools.md) that may require I/O, and [MCP server communication](mcp/client.md) all need to be offloaded to Temporal activities, while the logic that coordinates them (i.e. the agent run) lives in the workflow. Code that handles a scheduled job or web request can then execute the workflow, which will in turn execute the activities as needed.
27
+
28
+
!!! note
29
+
30
+
If you are familiar with celery, it may be helpful to think of Temporal activities as similar to celery tasks, but where you wait for the task to complete and obtain its result before proceeding to the next step in the workflow.
31
+
However, Temporal workflows and activities offer a great deal more flexibility and functionality than celery tasks.
32
+
33
+
See the [Temporal documentation](https://docs.temporal.io/evaluate/understanding-temporal#temporal-application-the-building-blocks) for more information
34
+
35
+
In the case of Pydantic AI agents, integration with Temporal means that [model requests](models/index.md), [tool calls](tools.md) that may require I/O, and [MCP server communication](mcp/client.md) all need to be offloaded to Temporal activities due to their I/O requirements, while the logic that coordinates them (i.e. the agent run) lives in the workflow. Code that handles a scheduled job or web request can then execute the workflow, which will in turn execute the activities as needed.
27
36
28
37
The diagram below shows the overall architecture of an agentic application in Temporal.
29
-
The Temporal Server is responsible to tracking program execution and making sure associated state is preserved reliably (i.e., stored to a database, possibly replicated across cloud regions).
38
+
The Temporal Server is responsible for tracking program execution and making sure the associated state is preserved reliably (i.e., stored to an internal database, and possibly replicated across cloud regions).
30
39
Temporal Server manages data in encrypted form, so all data processing occurs on the Worker, which runs the workflow and activities.
31
40
32
41
@@ -59,17 +68,15 @@ Temporal Server manages data in encrypted form, so all data processing occurs on
59
68
[External APIs, services, databases, etc.]
60
69
```
61
70
62
-
See the [Temporal documentation](https://docs.temporal.io/evaluate/understanding-temporal#temporal-application-the-building-blocks)for more information.
71
+
See the [Temporal documentation](https://docs.temporal.io/evaluate/understanding-temporal#temporal-application-the-building-blocks) for more information.
63
72
64
73
## Durable Agent
65
74
66
-
Any agent can be wrapped in a [`TemporalAgent`][pydantic_ai.durable_exec.temporal.TemporalAgent] to get a durable agent that can be used inside a deterministic Temporal workflow, by automatically offloading all work that requires IO (namely model requests, tool calls, and MCP server communication) to non-deterministic activities.
67
-
68
-
At the time of wrapping, the agent's [model](models/index.md) and [toolsets](toolsets.md) (including function tools registered on the agent and MCP servers) are frozen, activities are dynamically created for each, and the original model and toolsets are wrapped to call on the worker to execute the corresponding activity instead of directly performing the action inside the workflow. The original agent can still be used as normal outside of the Temporal workflow, but any changes to its model or toolsets after wrapping will not be reflected in the durable agent.
75
+
Any agent can be wrapped in a [`TemporalAgent`][pydantic_ai.durable_exec.temporal.TemporalAgent] to get a durable agent that can be used inside a deterministic Temporal workflow, by automatically offloading all work that requires I/O (namely model requests, tool calls, and MCP server communication) to non-deterministic activities.
69
76
70
-
This is a simple but complete example of wrapping an agent for durable execution, creating a Temporal workflow with durable execution logic, connecting to a Temporal server, and running the workflow from non-durable code.
77
+
At the time of wrapping, the agent's [model](models/index.md) and [toolsets](toolsets.md) (including function tools registered on the agent and MCP servers) are frozen, activities are dynamically created for each, and the original model and toolsets are wrapped to call on the worker to execute the corresponding activities instead of directly performing the actions inside the workflow. The original agent can still be used as normal outside the Temporal workflow, but any changes to its model or toolsets after wrapping will not be reflected in the durable agent.
71
78
72
-
All it requires is a Temporal server to be [running locally](https://github.com/temporalio/temporal#download-and-start-temporal-server-locally):
79
+
Here is a simple but complete example of wrapping an agent for durable execution, creating a Temporal workflow with durable execution logic, connecting to a Temporal server, and running the workflow from non-durable code. All it requires is a Temporal server to be [running locally](https://github.com/temporalio/temporal#download-and-start-temporal-server-locally):
73
80
74
81
```sh
75
82
brew install temporal
@@ -126,12 +133,12 @@ async def main():
126
133
```
127
134
128
135
1. The original `Agent` cannot be used inside a deterministic Temporal workflow, but the `TemporalAgent` can.
129
-
2. As explained above, the workflow represents a deterministic piece of code that can use non-deterministic activities for operations that require IO.
136
+
2. As explained above, the workflow represents a deterministic piece of code that can use non-deterministic activities for operations that require I/O.
130
137
3.[`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] works just like [`Agent.run()`][pydantic_ai.Agent.run], but it will automatically offload model requests, tool calls, and MCP server communication to Temporal activities.
131
138
4. We connect to the Temporal server which keeps track of workflow and activity execution.
132
139
5. This assumes the Temporal server is [running locally](https://github.com/temporalio/temporal#download-and-start-temporal-server-locally).
133
140
6. The [`PydanticAIPlugin`][pydantic_ai.durable_exec.temporal.PydanticAIPlugin] tells Temporal to use Pydantic for serialization and deserialization, and to treat [`UserError`][pydantic_ai.exceptions.UserError] exceptions as non-retryable.
134
-
7. We start the worker process that will listen on the specified task queue and run workflows and activities.
141
+
7. We start the worker that will listen on the specified task queue and run workflows and activities. In a real world application, this might be run in a separate service.
135
142
8. The [`AgentPlugin`][pydantic_ai.durable_exec.temporal.AgentPlugin] registers the `TemporalAgent`'s activities with the worker.
136
143
9. We call on the server to execute the workflow on a worker that's listening on the specified task queue.
137
144
10. The agent's `name` is used to uniquely identify its activities.
@@ -143,15 +150,19 @@ Because Temporal workflows need to be defined at the top level of the file and t
143
150
144
151
For more information on how to use Temporal in Python applications, see their [Python SDK guide](https://docs.temporal.io/develop/python).
145
152
146
-
## Agent and Toolset Requirements
153
+
## Temporal Integration Considerations
154
+
155
+
There are a few considerations specific to agents and toolsets when using Temporal for durable execution. These are important to understand to ensure that your agents and toolsets work correctly with Temporal's workflow and activity model.
156
+
157
+
### Agent and Toolset Requirements
147
158
148
159
To ensure that Temporal knows what code to run when an activity fails or is interrupted and then restarted, even if your code is changed in between, each activity needs to have a name that's stable and unique.
149
160
150
-
When `TemporalAgent` dynamically creates activities for the wrapped agent's model requests and toolsets (specifically those that implement their own tool listing and calling, i.e. [`FunctionToolset`][pydantic_ai.toolsets.FunctionToolset] and [`MCPServer`][pydantic_ai.mcp.MCPServer]), their names are derived from the agent's [`name`][pydantic_ai.agent.AbstractAgent.name] and the toolsets' [`id`s][pydantic_ai.toolsets.AbstractToolset.id]. These fields are normally optional, but are required to be set when using Temporal. They should not be changed once the temporal agent has been deployed to production as this would break active workflows.
161
+
When `TemporalAgent` dynamically creates activities for the wrapped agent's model requests and toolsets (specifically those that implement their own tool listing and calling, i.e. [`FunctionToolset`][pydantic_ai.toolsets.FunctionToolset] and [`MCPServer`][pydantic_ai.mcp.MCPServer]), their names are derived from the agent's [`name`][pydantic_ai.agent.AbstractAgent.name] and the toolsets' [`id`s][pydantic_ai.toolsets.AbstractToolset.id]. These fields are normally optional, but are required to be set when using Temporal. They should not be changed once the durable agent has been deployed to production as this would break active workflows.
151
162
152
163
Other than that, any agent and toolset will just work!
153
164
154
-
## Agent Run Context and Dependencies
165
+
###Agent Run Context and Dependencies
155
166
156
167
As workflows and activities run in separate processes, any values passed between them need to be serializable. As these payloads are stored in the workflow execution event history, Temporal limits their size to 2MB.
157
168
@@ -160,7 +171,7 @@ To account for these limitations, tool functions and the [event stream handler](
160
171
Specifically, only the `deps`, `retries`, `tool_call_id`, `tool_name`, `retry`, and `run_step` fields are available by default, and trying to access `model`, `usage`, `prompt`, `messages`, or `tracer` will raise an error.
161
172
If you need one or more of these attributes to be available inside activities, you can create a [`TemporalRunContext`][pydantic_ai.durable_exec.temporal.TemporalRunContext] subclass with custom `serialize_run_context` and `deserialize_run_context` class methods and pass it to [`TemporalAgent`][pydantic_ai.durable_exec.temporal.TemporalAgent] as `run_context_type`.
162
173
163
-
## Streaming
174
+
###Streaming
164
175
165
176
Because Temporal activities cannot stream output directly to the activity call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] and [`Agent.iter()`][pydantic_ai.Agent.iter] are not supported.
166
177
@@ -182,19 +193,19 @@ Temporal activity configuration, like timeouts and retry policies, can be custom
182
193
-`tool_activity_config`: The Temporal activity config to use for specific tool call activities identified by toolset ID and tool name.
183
194
This is merged with the base and toolset-specific activity configs.
184
195
185
-
If a tool does not use IO, you can specify `False` to disable using an activity. Note that the tool is required to be defined as an `async` function as non-async tools are run in threads which are non-deterministic and thus not supported outside of activities.
196
+
If a tool does not use I/O, you can specify `False` to disable using an activity. Note that the tool is required to be defined as an `async` function as non-async tools are run in threads which are non-deterministic and thus not supported outside of activities.
186
197
187
198
## Activity Retries
188
199
189
-
On top of the automatic retries for request failures that Temporal will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time will cause the request to be retried more often than expected, with improper `Retry-After` handling.
200
+
On top of the automatic retries for request failures that Temporal will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time may cause the request to be retried more often than expected, with improper `Retry-After` handling.
190
201
191
202
When using Temporal, it's recommended to not use [HTTP Request Retries](retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](models/openai.md#custom-openai-client).
192
203
193
204
You can customize Temporal's retry policy using [activity configuration](#activity-configuration).
194
205
195
206
## Observability with Logfire
196
207
197
-
Temporal generates telemetry events and metrics for each workflow and activity execution, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to [Logfire](logfire.md) to get a complete picture of what's happening in your application.
208
+
Temporal generates telemetry events and metrics for each workflow and activity execution, and Pydantic AI generates events for each agent run, model request and tool call. These can be sent to [Pydantic Logfire](logfire.md) to get a complete picture of what's happening in your application.
198
209
199
210
To use Logfire with Temporal, you need to pass a [`LogfirePlugin`][pydantic_ai.durable_exec.temporal.LogfirePlugin] object to Temporal's `Client.connect()`:
0 commit comments