Skip to content

Commit 106fc2c

Browse files
authored
Better Exception Serialization (#323)
1 parent 77f43ef commit 106fc2c

File tree

4 files changed

+133
-17
lines changed

4 files changed

+133
-17
lines changed

dbos/_error.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,17 @@ class DBOSNotAuthorizedError(DBOSException):
134134
"""Exception raised by DBOS role-based security when the user is not authorized to access a function."""
135135

136136
def __init__(self, msg: str):
137+
self.msg = msg
137138
super().__init__(
138139
msg,
139140
dbos_error_code=DBOSErrorCode.NotAuthorized.value,
140141
)
141142
self.status_code = 403
142143

144+
def __reduce__(self) -> Any:
145+
# Tell jsonpickle how to reconstruct this object
146+
return (self.__class__, (self.msg,))
147+
143148

144149
class DBOSMaxStepRetriesExceeded(DBOSException):
145150
"""Exception raised when a step was retried the maximimum number of times without success."""
@@ -185,11 +190,21 @@ class DBOSQueueDeduplicatedError(DBOSException):
185190
def __init__(
186191
self, workflow_id: str, queue_name: str, deduplication_id: str
187192
) -> None:
193+
self.workflow_id = workflow_id
194+
self.queue_name = queue_name
195+
self.deduplication_id = deduplication_id
188196
super().__init__(
189197
f"Workflow {workflow_id} was deduplicated due to an existing workflow in queue {queue_name} with deduplication ID {deduplication_id}.",
190198
dbos_error_code=DBOSErrorCode.QueueDeduplicated.value,
191199
)
192200

201+
def __reduce__(self) -> Any:
202+
# Tell jsonpickle how to reconstruct this object
203+
return (
204+
self.__class__,
205+
(self.workflow_id, self.queue_name, self.deduplication_id),
206+
)
207+
193208

194209
#######################################
195210
## BaseException

dbos/_serialization.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import types
2-
from typing import Any, Dict, Tuple, TypedDict
2+
from typing import Any, Dict, Optional, Tuple, TypedDict
33

44
import jsonpickle # type: ignore
55

6+
from ._logger import dbos_logger
7+
68

79
class WorkflowInputs(TypedDict):
810
args: Tuple[Any, ...]
@@ -51,5 +53,54 @@ def deserialize_args(serialized_data: str) -> WorkflowInputs:
5153

5254
def deserialize_exception(serialized_data: str) -> Exception:
5355
"""Deserialize JSON string back to a Python Exception using jsonpickle."""
54-
upo: Exception = jsonpickle.decode(serialized_data)
55-
return upo
56+
exc: Exception = jsonpickle.decode(serialized_data)
57+
return exc
58+
59+
60+
def safe_deserialize(
61+
workflow_id: str,
62+
*,
63+
serialized_input: Optional[str],
64+
serialized_output: Optional[str],
65+
serialized_exception: Optional[str],
66+
) -> tuple[Optional[WorkflowInputs], Optional[Any], Optional[Exception]]:
67+
"""
68+
This function safely deserializes a workflow's recorded input and output/exception.
69+
If any of them is not deserializable, it logs a warning and returns a string instead of throwing an exception.
70+
71+
This function is used in workflow introspection methods (get_workflows and get_queued_workflow)
72+
to ensure errors related to nondeserializable objects are observable.
73+
"""
74+
input: Optional[WorkflowInputs]
75+
try:
76+
input = (
77+
deserialize_args(serialized_input) if serialized_input is not None else None
78+
)
79+
except Exception as e:
80+
dbos_logger.warning(
81+
f"Warning: input object could not be deserialized for workflow {workflow_id}, returning as string: {e}"
82+
)
83+
input = serialized_input # type: ignore
84+
output: Optional[Any]
85+
try:
86+
output = (
87+
deserialize(serialized_output) if serialized_output is not None else None
88+
)
89+
except Exception as e:
90+
dbos_logger.warning(
91+
f"Warning: output object could not be deserialized for workflow {workflow_id}, returning as string: {e}"
92+
)
93+
output = serialized_output
94+
exception: Optional[Exception]
95+
try:
96+
exception = (
97+
deserialize_exception(serialized_exception)
98+
if serialized_exception is not None
99+
else None
100+
)
101+
except Exception as e:
102+
dbos_logger.warning(
103+
f"Warning: exception object could not be deserialized for workflow {workflow_id}, returning as string: {e}"
104+
)
105+
exception = serialized_exception # type: ignore
106+
return input, output, exception

dbos/_sys_db.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -901,13 +901,15 @@ def get_workflows(
901901
info.app_version = row[14]
902902
info.app_id = row[15]
903903

904-
inputs = _serialization.deserialize_args(row[16])
905-
if inputs is not None:
906-
info.input = inputs
907-
if info.status == WorkflowStatusString.SUCCESS.value:
908-
info.output = _serialization.deserialize(row[17])
909-
elif info.status == WorkflowStatusString.ERROR.value:
910-
info.error = _serialization.deserialize_exception(row[18])
904+
inputs, output, exception = _serialization.safe_deserialize(
905+
info.workflow_id,
906+
serialized_input=row[16],
907+
serialized_output=row[17],
908+
serialized_exception=row[18],
909+
)
910+
info.input = inputs
911+
info.output = output
912+
info.error = exception
911913

912914
infos.append(info)
913915
return infos
@@ -1007,13 +1009,15 @@ def get_queued_workflows(
10071009
info.app_version = row[14]
10081010
info.app_id = row[15]
10091011

1010-
inputs = _serialization.deserialize_args(row[16])
1011-
if inputs is not None:
1012-
info.input = inputs
1013-
if info.status == WorkflowStatusString.SUCCESS.value:
1014-
info.output = _serialization.deserialize(row[17])
1015-
elif info.status == WorkflowStatusString.ERROR.value:
1016-
info.error = _serialization.deserialize_exception(row[18])
1012+
inputs, output, exception = _serialization.safe_deserialize(
1013+
info.workflow_id,
1014+
serialized_input=row[16],
1015+
serialized_output=row[17],
1016+
serialized_exception=row[18],
1017+
)
1018+
info.input = inputs
1019+
info.output = output
1020+
info.error = exception
10171021

10181022
infos.append(info)
10191023

tests/test_failures.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,16 @@
1111
from dbos._error import (
1212
DBOSDeadLetterQueueError,
1313
DBOSMaxStepRetriesExceeded,
14+
DBOSNotAuthorizedError,
15+
DBOSQueueDeduplicatedError,
1416
DBOSUnexpectedStepError,
1517
)
1618
from dbos._registrations import DEFAULT_MAX_RECOVERY_ATTEMPTS
19+
from dbos._serialization import (
20+
deserialize_exception,
21+
safe_deserialize,
22+
serialize_exception,
23+
)
1724
from dbos._sys_db import WorkflowStatusString
1825

1926
from .conftest import queue_entries_are_cleaned_up
@@ -434,3 +441,42 @@ def failing_workflow() -> str:
434441
recovery_handles = DBOS._recover_pending_workflows()
435442
assert len(recovery_handles) == 1
436443
assert recovery_handles[0].get_result() == recovery_handles[0].workflow_id
444+
445+
446+
def test_error_serialization() -> None:
447+
# Verify that each exception that can be thrown in a workflow
448+
# is serializable and deserializable
449+
# DBOSMaxStepRetriesExceeded
450+
e: Exception = DBOSMaxStepRetriesExceeded("step", 1)
451+
d = deserialize_exception(serialize_exception(e))
452+
assert isinstance(d, DBOSMaxStepRetriesExceeded)
453+
assert str(d) == str(e)
454+
# DBOSNotAuthorizedError
455+
e = DBOSNotAuthorizedError("no")
456+
d = deserialize_exception(serialize_exception(e))
457+
assert isinstance(d, DBOSNotAuthorizedError)
458+
assert str(d) == str(e)
459+
# DBOSQueueDeduplicatedError
460+
e = DBOSQueueDeduplicatedError("id", "queue", "dedup")
461+
d = deserialize_exception(serialize_exception(e))
462+
assert isinstance(d, DBOSQueueDeduplicatedError)
463+
assert str(d) == str(e)
464+
465+
# Test safe_deserialize
466+
class BadException(Exception):
467+
def __init__(self, one: int, two: int) -> None:
468+
super().__init__(f"Message: {one}, {two}")
469+
470+
bad_exception = BadException(1, 2)
471+
with pytest.raises(TypeError):
472+
deserialize_exception(serialize_exception(bad_exception))
473+
input, output, exception = safe_deserialize(
474+
"my_id",
475+
serialized_input=None,
476+
serialized_exception=serialize_exception(bad_exception),
477+
serialized_output=None,
478+
)
479+
assert input is None
480+
assert output is None
481+
assert isinstance(exception, str)
482+
assert "Message: 1, 2" in exception

0 commit comments

Comments
 (0)