Skip to content

Commit 205566d

Browse files
authored
Handle Serialization Errors in List Steps (#478)
1 parent 9aabef8 commit 205566d

File tree

3 files changed

+49
-15
lines changed

3 files changed

+49
-15
lines changed

dbos/_debouncer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def debouncer_workflow(
8686
dbos = _get_dbos_instance()
8787

8888
workflow_inputs: WorkflowInputs = {"args": args, "kwargs": kwargs}
89+
8990
# Every time the debounced workflow is called, a message is sent to this workflow.
9091
# It waits until debounce_period_sec have passed since the last message or until
9192
# debounce_timeout_sec has elapsed.
@@ -95,7 +96,10 @@ def get_debounce_deadline_epoch_sec() -> float:
9596
if options["debounce_timeout_sec"]
9697
else math.inf
9798
)
98-
debounce_deadline_epoch_sec = dbos._sys_db.call_function_as_step(get_debounce_deadline_epoch_sec, "get_debounce_deadline_epoch_sec")
99+
100+
debounce_deadline_epoch_sec = dbos._sys_db.call_function_as_step(
101+
get_debounce_deadline_epoch_sec, "get_debounce_deadline_epoch_sec"
102+
)
99103
debounce_period_sec = initial_debounce_period_sec
100104
while time.time() < debounce_deadline_epoch_sec:
101105
time_until_deadline = max(debounce_deadline_epoch_sec - time.time(), 0)

dbos/_sys_db.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,24 +1077,23 @@ def get_workflow_steps(self, workflow_id: str) -> List[StepInfo]:
10771077
SystemSchema.operation_outputs.c.child_workflow_id,
10781078
).where(SystemSchema.operation_outputs.c.workflow_uuid == workflow_id)
10791079
).fetchall()
1080-
return [
1081-
StepInfo(
1080+
steps = []
1081+
for row in rows:
1082+
_, output, exception = _serialization.safe_deserialize(
1083+
workflow_id,
1084+
serialized_input=None,
1085+
serialized_output=row[2],
1086+
serialized_exception=row[3],
1087+
)
1088+
step = StepInfo(
10821089
function_id=row[0],
10831090
function_name=row[1],
1084-
output=(
1085-
_serialization.deserialize(row[2])
1086-
if row[2] is not None
1087-
else row[2]
1088-
),
1089-
error=(
1090-
_serialization.deserialize_exception(row[3])
1091-
if row[3] is not None
1092-
else row[3]
1093-
),
1091+
output=output,
1092+
error=exception,
10941093
child_workflow_id=row[4],
10951094
)
1096-
for row in rows
1097-
]
1095+
steps.append(step)
1096+
return steps
10981097

10991098
def _record_operation_result_txn(
11001099
self, result: OperationResultInternal, conn: sa.Connection

tests/test_failures.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sqlalchemy.exc import InvalidRequestError, OperationalError
1010

1111
from dbos import DBOS, Queue, SetWorkflowID
12+
from dbos._client import DBOSClient
1213
from dbos._dbos_config import DBOSConfig
1314
from dbos._error import (
1415
DBOSAwaitedWorkflowCancelledError,
@@ -502,6 +503,36 @@ def test_error_serialization() -> None:
502503
assert isinstance(exception, str)
503504

504505

506+
def test_workflow_error_serialization(dbos: DBOS, client: DBOSClient) -> None:
507+
508+
@DBOS.step()
509+
def step() -> None:
510+
raise BadException(1, 2)
511+
512+
@DBOS.workflow()
513+
def workflow() -> None:
514+
step()
515+
516+
handle = DBOS.start_workflow(workflow)
517+
518+
with pytest.raises(BadException):
519+
handle.get_result()
520+
521+
workflows = DBOS.list_workflows()
522+
assert len(workflows) == 1
523+
assert workflows[0].error is not None
524+
525+
steps = DBOS.list_workflow_steps(handle.workflow_id)
526+
assert len(steps) == 1
527+
assert steps[0]["error"] is not None
528+
529+
status = handle.get_status()
530+
assert status.error is not None
531+
532+
status = client.retrieve_workflow(handle.workflow_id).get_status()
533+
assert status.error is not None
534+
535+
505536
def test_unregistered_workflow(dbos: DBOS, config: DBOSConfig) -> None:
506537

507538
@DBOS.workflow()

0 commit comments

Comments
 (0)