Skip to content

Commit cb51bbe

Browse files
authored
Custom Serialization (#486)
Add support for using custom serializers and deserializers in DBOS Python. You can now supply a custom serializer in DBOS configuration, which completely replaces the default `pickle`-based serializer for storing objects in the system database. For example, here's how you would use a JSON serializer: ```python from dbos import DBOS, DBOSConfig, Serializer class JsonSerializer(Serializer): def serialize(self, data: Any) -> str: return json.dumps(data) def deserialize(cls, serialized_data: str) -> Any: return json.loads(serialized_data) serializer = JsonSerializer() config: DBOSConfig = { "name": "dbos-starter", "system_database_url": os.environ.get("DBOS_SYSTEM_DATABASE_URL"), "serializer": serializer } DBOS(config=config) DBOS.launch() ``` Addresses #485
1 parent 1887532 commit cb51bbe

File tree

12 files changed

+275
-189
lines changed

12 files changed

+275
-189
lines changed

dbos/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ._debouncer import Debouncer, DebouncerClient
1313
from ._kafka_message import KafkaMessage
1414
from ._queue import Queue
15+
from ._serialization import Serializer
1516
from ._sys_db import GetWorkflowsInput, WorkflowStatus, WorkflowStatusString
1617

1718
__all__ = [
@@ -35,4 +36,5 @@
3536
"Queue",
3637
"Debouncer",
3738
"DebouncerClient",
39+
"Serializer",
3840
]

dbos/_app_db.py

Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
from sqlalchemy.orm import Session, sessionmaker
99

1010
from dbos._migration import get_sqlite_timestamp_expr
11+
from dbos._serialization import Serializer
1112

12-
from . import _serialization
1313
from ._error import DBOSUnexpectedStepError, DBOSWorkflowConflictIDError
1414
from ._logger import dbos_logger
1515
from ._schemas.application_database import ApplicationSchema
@@ -34,17 +34,52 @@ class RecordedResult(TypedDict):
3434

3535
class ApplicationDatabase(ABC):
3636

37+
@staticmethod
38+
def create(
39+
database_url: str,
40+
engine_kwargs: Dict[str, Any],
41+
schema: Optional[str],
42+
serializer: Serializer,
43+
debug_mode: bool = False,
44+
) -> "ApplicationDatabase":
45+
"""Factory method to create the appropriate ApplicationDatabase implementation based on URL."""
46+
if database_url.startswith("sqlite"):
47+
return SQLiteApplicationDatabase(
48+
database_url=database_url,
49+
engine_kwargs=engine_kwargs,
50+
schema=schema,
51+
serializer=serializer,
52+
debug_mode=debug_mode,
53+
)
54+
else:
55+
# Default to PostgreSQL for postgresql://, postgres://, or other URLs
56+
return PostgresApplicationDatabase(
57+
database_url=database_url,
58+
engine_kwargs=engine_kwargs,
59+
schema=schema,
60+
serializer=serializer,
61+
debug_mode=debug_mode,
62+
)
63+
3764
def __init__(
3865
self,
3966
*,
4067
database_url: str,
4168
engine_kwargs: Dict[str, Any],
69+
serializer: Serializer,
70+
schema: Optional[str],
4271
debug_mode: bool = False,
4372
):
73+
if database_url.startswith("sqlite"):
74+
self.schema = None
75+
else:
76+
self.schema = schema if schema else "dbos"
77+
ApplicationSchema.transaction_outputs.schema = schema
4478
self.engine = self._create_engine(database_url, engine_kwargs)
4579
self._engine_kwargs = engine_kwargs
4680
self.sessionmaker = sessionmaker(bind=self.engine)
4781
self.debug_mode = debug_mode
82+
self.serializer = serializer
4883

4984
@abstractmethod
5085
def _create_engine(
@@ -156,10 +191,12 @@ def get_transactions(self, workflow_uuid: str) -> List[StepInfo]:
156191
function_id=row[0],
157192
function_name=row[1],
158193
output=(
159-
_serialization.deserialize(row[2]) if row[2] is not None else row[2]
194+
self.serializer.deserialize(row[2])
195+
if row[2] is not None
196+
else row[2]
160197
),
161198
error=(
162-
_serialization.deserialize_exception(row[3])
199+
self.serializer.deserialize(row[3])
163200
if row[3] is not None
164201
else row[3]
165202
),
@@ -237,52 +274,10 @@ def _is_serialization_error(self, dbapi_error: DBAPIError) -> bool:
237274
"""Check if the error is a serialization/concurrency error."""
238275
pass
239276

240-
@staticmethod
241-
def create(
242-
database_url: str,
243-
engine_kwargs: Dict[str, Any],
244-
schema: Optional[str],
245-
debug_mode: bool = False,
246-
) -> "ApplicationDatabase":
247-
"""Factory method to create the appropriate ApplicationDatabase implementation based on URL."""
248-
if database_url.startswith("sqlite"):
249-
return SQLiteApplicationDatabase(
250-
database_url=database_url,
251-
engine_kwargs=engine_kwargs,
252-
debug_mode=debug_mode,
253-
)
254-
else:
255-
# Default to PostgreSQL for postgresql://, postgres://, or other URLs
256-
return PostgresApplicationDatabase(
257-
database_url=database_url,
258-
engine_kwargs=engine_kwargs,
259-
debug_mode=debug_mode,
260-
schema=schema,
261-
)
262-
263277

264278
class PostgresApplicationDatabase(ApplicationDatabase):
265279
"""PostgreSQL-specific implementation of ApplicationDatabase."""
266280

267-
def __init__(
268-
self,
269-
*,
270-
database_url: str,
271-
engine_kwargs: Dict[str, Any],
272-
schema: Optional[str],
273-
debug_mode: bool = False,
274-
):
275-
super().__init__(
276-
database_url=database_url,
277-
engine_kwargs=engine_kwargs,
278-
debug_mode=debug_mode,
279-
)
280-
if schema is None:
281-
self.schema = "dbos"
282-
else:
283-
self.schema = schema
284-
ApplicationSchema.transaction_outputs.schema = schema
285-
286281
def _create_engine(
287282
self, database_url: str, engine_kwargs: Dict[str, Any]
288283
) -> sa.Engine:

dbos/_client.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import sqlalchemy as sa
1818

19-
from dbos import _serialization
2019
from dbos._app_db import ApplicationDatabase
2120
from dbos._context import MaxPriority, MinPriority
2221
from dbos._sys_db import SystemDatabase
@@ -27,7 +26,7 @@
2726
from dbos._dbos_config import get_system_database_url, is_valid_database_url
2827
from dbos._error import DBOSException, DBOSNonExistentWorkflowError
2928
from dbos._registrations import DEFAULT_MAX_RECOVERY_ATTEMPTS
30-
from dbos._serialization import WorkflowInputs
29+
from dbos._serialization import DefaultSerializer, Serializer, WorkflowInputs
3130
from dbos._sys_db import (
3231
EnqueueOptionsInternal,
3332
StepInfo,
@@ -127,7 +126,9 @@ def __init__(
127126
system_database_engine: Optional[sa.Engine] = None,
128127
application_database_url: Optional[str] = None,
129128
dbos_system_schema: Optional[str] = "dbos",
129+
serializer: Serializer = DefaultSerializer(),
130130
):
131+
self._serializer = serializer
131132
application_database_url = (
132133
database_url if database_url else application_database_url
133134
)
@@ -150,6 +151,7 @@ def __init__(
150151
},
151152
engine=system_database_engine,
152153
schema=dbos_system_schema,
154+
serializer=serializer,
153155
)
154156
self._sys_db.check_connection()
155157
if application_database_url:
@@ -161,6 +163,7 @@ def __init__(
161163
"pool_size": 2,
162164
},
163165
schema=dbos_system_schema,
166+
serializer=serializer,
164167
)
165168

166169
def destroy(self) -> None:
@@ -217,7 +220,7 @@ def _enqueue(self, options: EnqueueOptions, *args: Any, **kwargs: Any) -> str:
217220
if enqueue_options_internal["priority"] is not None
218221
else 0
219222
),
220-
"inputs": _serialization.serialize_args(inputs),
223+
"inputs": self._serializer.serialize(inputs),
221224
}
222225

223226
self._sys_db.init_workflow(
@@ -282,7 +285,7 @@ def send(
282285
"workflow_deadline_epoch_ms": None,
283286
"deduplication_id": None,
284287
"priority": 0,
285-
"inputs": _serialization.serialize_args({"args": (), "kwargs": {}}),
288+
"inputs": self._serializer.serialize({"args": (), "kwargs": {}}),
286289
}
287290
with self._sys_db.engine.begin() as conn:
288291
self._sys_db._insert_workflow_status(

0 commit comments

Comments
 (0)