Skip to content

Commit 127624d

Browse files
authored
Update docs and tests for DBOS v2.0 (#3004)
1 parent 7e74268 commit 127624d

File tree

4 files changed

+47
-406
lines changed

4 files changed

+47
-406
lines changed

docs/durable_execution/dbos.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ Other than that, any agent and toolset will just work!
123123

124124
### Agent Run Context and Dependencies
125125

126-
DBOS checkpoints workflow inputs/outputs and step outputs into a database using `jsonpickle`. This means you need to make sure [dependencies](../dependencies.md) object provided to [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] or [`DBOSAgent.run_sync()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run_sync], and tool outputs can be serialized using jsonpickle. You may also want to keep the inputs and outputs small (under \~2 MB). PostgreSQL and SQLite support up to 1 GB per field, but large objects may impact performance.
126+
DBOS checkpoints workflow inputs/outputs and step outputs into a database using [`pickle`](https://docs.python.org/3/library/pickle.html). This means you need to make sure [dependencies](../dependencies.md) object provided to [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] or [`DBOSAgent.run_sync()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run_sync], and tool outputs can be serialized using pickle. You may also want to keep the inputs and outputs small (under \~2 MB). PostgreSQL and SQLite support up to 1 GB per field, but large objects may impact performance.
127127

128128
### Streaming
129129

@@ -153,6 +153,6 @@ You can customize DBOS's retry policy using [step configuration](#step-configura
153153

154154
## Observability with Logfire
155155

156-
DBOS automatically generates OpenTelemetry spans for each workflow and step execution, and Pydantic AI emits spans for each agent run, model request, and tool invocation. You can send these spans to [Pydantic Logfire](../logfire.md) to get a full, end-to-end view of what's happening in your application.
156+
DBOS can be configured to generate OpenTelemetry spans for each workflow and step execution, and Pydantic AI emits spans for each agent run, model request, and tool invocation. You can send these spans to [Pydantic Logfire](../logfire.md) to get a full, end-to-end view of what's happening in your application.
157157

158158
For more information about DBOS logging and tracing, please see the [DBOS docs](https://docs.dbos.dev/python/tutorials/logging-and-tracing) for details.

pydantic_ai_slim/pydantic_ai/mcp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ def __repr__(self) -> str:
540540
f'args={self.args!r}',
541541
]
542542
if self.id:
543-
repr_args.append(f'id={self.id!r}')
543+
repr_args.append(f'id={self.id!r}') # pragma: lax no cover
544544
return f'{self.__class__.__name__}({", ".join(repr_args)})'
545545

546546
def __eq__(self, value: object, /) -> bool:

tests/test_dbos.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,14 @@
4343
from .conftest import IsDatetime, IsStr
4444

4545
try:
46+
import importlib.metadata
47+
4648
from dbos import DBOS, DBOSConfig, SetWorkflowID
49+
from packaging.version import Version
4750

4851
from pydantic_ai.durable_exec.dbos import DBOSAgent, DBOSMCPServer, DBOSModel
52+
53+
dbos_version = Version(importlib.metadata.version('dbos'))
4954
except ImportError: # pragma: lax no cover
5055
pytest.skip('DBOS is not installed', allow_module_level=True)
5156

@@ -110,9 +115,10 @@ def workflow_raises(exc_type: type[Exception], exc_message: str) -> Iterator[Non
110115
DBOS_SQLITE_FILE = 'dbostest.sqlite'
111116
DBOS_CONFIG: DBOSConfig = {
112117
'name': 'pydantic_dbos_tests',
113-
'database_url': f'sqlite:///{DBOS_SQLITE_FILE}',
114118
'system_database_url': f'sqlite:///{DBOS_SQLITE_FILE}',
115119
'run_admin_server': False,
120+
# enable_otlp requires dbos>1.14
121+
'enable_otlp': True,
116122
}
117123

118124

@@ -1042,9 +1048,19 @@ async def simple_event_stream_handler(
10421048
):
10431049
pass
10441050

1045-
with workflow_raises(TypeError, snapshot('Serialized function should be defined at the top level of a module')):
1051+
with pytest.raises(Exception) as exc_info:
10461052
await simple_dbos_agent.run('What is the capital of Mexico?', event_stream_handler=simple_event_stream_handler)
10471053

1054+
if dbos_version <= Version('1.14'): # pragma: lax no cover
1055+
# Older DBOS versions used jsonpickle
1056+
assert str(exc_info.value) == snapshot('Serialized function should be defined at the top level of a module')
1057+
else:
1058+
# Newer DBOS versions use pickle
1059+
assert (
1060+
"local object 'test_dbos_agent_run_in_workflow_with_event_stream_handler.<locals>.simple_event_stream_handler'"
1061+
in str(exc_info.value)
1062+
)
1063+
10481064

10491065
async def test_dbos_agent_run_in_workflow_with_model(allow_model_requests: None, dbos: DBOS):
10501066
# A non-DBOS model is not wrapped as steps so it's not deterministic and cannot be used in a DBOS workflow.
@@ -1142,15 +1158,19 @@ async def get_model_name(ctx: RunContext[UnserializableDeps]) -> int:
11421158
async def test_dbos_agent_with_unserializable_deps_type(allow_model_requests: None, dbos: DBOS):
11431159
unserializable_deps_dbos_agent = DBOSAgent(unserializable_deps_agent)
11441160
# Test this raises a serialization error because httpx.AsyncClient is not serializable.
1145-
with pytest.raises(
1146-
Exception,
1147-
match='object proxy must define __reduce_ex__()',
1148-
):
1161+
with pytest.raises(Exception) as exc_info:
11491162
async with AsyncClient() as client:
11501163
# This will trigger the client to be unserializable
11511164
logfire.instrument_httpx(client, capture_all=True)
11521165
await unserializable_deps_dbos_agent.run('What is the model name?', deps=UnserializableDeps(client=client))
11531166

1167+
if dbos_version <= Version('1.14'): # pragma: lax no cover
1168+
# Older DBOS versions used jsonpickle
1169+
assert str(exc_info.value) == snapshot('object proxy must define __reduce_ex__()')
1170+
else:
1171+
# Newer DBOS versions use pickle
1172+
assert str(exc_info.value) == snapshot("cannot pickle '_thread.RLock' object")
1173+
11541174

11551175
# Test dynamic toolsets in an agent with DBOS
11561176

0 commit comments

Comments
 (0)