Skip to content

Commit daffec0

Browse files
authored
Fix Queue Timeout (#346)
- Fix an issue where the deadline of an enqueued workflow might be reset during recovery. - `SetWorkflowTimeout(None)` on a child workflow now properly unsets its timeout. - Timeout and deadline are now exposed in workflow status.
1 parent 752569d commit daffec0

File tree

7 files changed

+178
-19
lines changed

7 files changed

+178
-19
lines changed

dbos/_context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ def __init__(self, workflow_timeout_sec: Optional[float]) -> None:
392392
else None
393393
)
394394
self.saved_workflow_timeout: Optional[int] = None
395+
self.saved_workflow_deadline_epoch_ms: Optional[int] = None
395396

396397
def __enter__(self) -> SetWorkflowTimeout:
397398
# Code to create a basic context
@@ -402,6 +403,8 @@ def __enter__(self) -> SetWorkflowTimeout:
402403
ctx = assert_current_dbos_context()
403404
self.saved_workflow_timeout = ctx.workflow_timeout_ms
404405
ctx.workflow_timeout_ms = self.workflow_timeout_ms
406+
self.saved_workflow_deadline_epoch_ms = ctx.workflow_deadline_epoch_ms
407+
ctx.workflow_deadline_epoch_ms = None
405408
return self
406409

407410
def __exit__(
@@ -411,6 +414,9 @@ def __exit__(
411414
traceback: Optional[TracebackType],
412415
) -> Literal[False]:
413416
assert_current_dbos_context().workflow_timeout_ms = self.saved_workflow_timeout
417+
assert_current_dbos_context().workflow_deadline_epoch_ms = (
418+
self.saved_workflow_deadline_epoch_ms
419+
)
414420
# Code to clean up the basic context if we created it
415421
if self.created_ctx:
416422
_clear_local_dbos_context()

dbos/_error.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class DBOSErrorCode(Enum):
6262
WorkflowCancelled = 10
6363
UnexpectedStep = 11
6464
QueueDeduplicated = 12
65+
AwaitedWorkflowCancelled = 13
6566
ConflictingRegistrationError = 25
6667

6768

@@ -206,6 +207,19 @@ def __reduce__(self) -> Any:
206207
)
207208

208209

210+
class DBOSAwaitedWorkflowCancelledError(DBOSException):
211+
def __init__(self, workflow_id: str):
212+
self.workflow_id = workflow_id
213+
super().__init__(
214+
f"Awaited workflow {workflow_id} was cancelled",
215+
dbos_error_code=DBOSErrorCode.AwaitedWorkflowCancelled.value,
216+
)
217+
218+
def __reduce__(self) -> Any:
219+
# Tell jsonpickle how to reconstruct this object
220+
return (self.__class__, (self.workflow_id,))
221+
222+
209223
#######################################
210224
## BaseException
211225
#######################################

dbos/_sys_db.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from . import _serialization
3333
from ._context import get_local_dbos_context
3434
from ._error import (
35+
DBOSAwaitedWorkflowCancelledError,
3536
DBOSConflictingWorkflowError,
3637
DBOSDeadLetterQueueError,
3738
DBOSNonExistentWorkflowError,
@@ -96,6 +97,10 @@ class WorkflowStatus:
9697
executor_id: Optional[str]
9798
# The application version on which this workflow was started
9899
app_version: Optional[str]
100+
# The start-to-close timeout of the workflow in ms
101+
workflow_timeout_ms: Optional[int]
102+
# The deadline of a workflow, computed by adding its timeout to its start time.
103+
workflow_deadline_epoch_ms: Optional[int]
99104

100105
# INTERNAL FIELDS
101106

@@ -761,9 +766,9 @@ def await_workflow_result(self, workflow_id: str) -> Any:
761766
error = row[2]
762767
raise _serialization.deserialize_exception(error)
763768
elif status == WorkflowStatusString.CANCELLED.value:
764-
# Raise a normal exception here, not the cancellation exception
769+
# Raise AwaitedWorkflowCancelledError here, not the cancellation exception
765770
# because the awaiting workflow is not being cancelled.
766-
raise Exception(f"Awaited workflow {workflow_id} was cancelled")
771+
raise DBOSAwaitedWorkflowCancelledError(workflow_id)
767772
else:
768773
pass # CB: I guess we're assuming the WF will show up eventually.
769774
time.sleep(1)
@@ -837,6 +842,8 @@ def get_workflows(self, input: GetWorkflowsInput) -> List[WorkflowStatus]:
837842
SystemSchema.workflow_inputs.c.inputs,
838843
SystemSchema.workflow_status.c.output,
839844
SystemSchema.workflow_status.c.error,
845+
SystemSchema.workflow_status.c.workflow_deadline_epoch_ms,
846+
SystemSchema.workflow_status.c.workflow_timeout_ms,
840847
).join(
841848
SystemSchema.workflow_inputs,
842849
SystemSchema.workflow_status.c.workflow_uuid
@@ -918,6 +925,8 @@ def get_workflows(self, input: GetWorkflowsInput) -> List[WorkflowStatus]:
918925
info.input = inputs
919926
info.output = output
920927
info.error = exception
928+
info.workflow_deadline_epoch_ms = row[18]
929+
info.workflow_timeout_ms = row[19]
921930

922931
infos.append(info)
923932
return infos
@@ -947,6 +956,8 @@ def get_queued_workflows(
947956
SystemSchema.workflow_inputs.c.inputs,
948957
SystemSchema.workflow_status.c.output,
949958
SystemSchema.workflow_status.c.error,
959+
SystemSchema.workflow_status.c.workflow_deadline_epoch_ms,
960+
SystemSchema.workflow_status.c.workflow_timeout_ms,
950961
).select_from(
951962
SystemSchema.workflow_queue.join(
952963
SystemSchema.workflow_status,
@@ -1024,6 +1035,8 @@ def get_queued_workflows(
10241035
info.input = inputs
10251036
info.output = output
10261037
info.error = exception
1038+
info.workflow_deadline_epoch_ms = row[18]
1039+
info.workflow_timeout_ms = row[19]
10271040

10281041
infos.append(info)
10291042

@@ -1827,8 +1840,13 @@ def start_queued_workflows(
18271840
# If a timeout is set, set the deadline on dequeue
18281841
workflow_deadline_epoch_ms=sa.case(
18291842
(
1830-
SystemSchema.workflow_status.c.workflow_timeout_ms.isnot(
1831-
None
1843+
sa.and_(
1844+
SystemSchema.workflow_status.c.workflow_timeout_ms.isnot(
1845+
None
1846+
),
1847+
SystemSchema.workflow_status.c.workflow_deadline_epoch_ms.is_(
1848+
None
1849+
),
18321850
),
18331851
sa.func.extract("epoch", sa.func.now()) * 1000
18341852
+ SystemSchema.workflow_status.c.workflow_timeout_ms,

tests/test_dbos.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
# Private API because this is a test
2525
from dbos._context import assert_current_dbos_context, get_local_dbos_context
2626
from dbos._error import (
27+
DBOSAwaitedWorkflowCancelledError,
2728
DBOSConflictingRegistrationError,
2829
DBOSMaxStepRetriesExceeded,
2930
DBOSWorkflowCancelledError,
@@ -1507,7 +1508,14 @@ def blocked_workflow() -> None:
15071508
with SetWorkflowID(wfid):
15081509
blocked_workflow()
15091510
assert assert_current_dbos_context().workflow_deadline_epoch_ms is None
1511+
start_time = time.time() * 1000
15101512
handle = DBOS.start_workflow(blocked_workflow)
1513+
status = handle.get_status()
1514+
assert status.workflow_timeout_ms == 100
1515+
assert (
1516+
status.workflow_deadline_epoch_ms is not None
1517+
and status.workflow_deadline_epoch_ms > start_time
1518+
)
15111519
with pytest.raises(DBOSWorkflowCancelledError):
15121520
handle.get_result()
15131521

@@ -1555,13 +1563,11 @@ def parent_workflow() -> None:
15551563
with pytest.raises(DBOSWorkflowCancelledError):
15561564
parent_workflow()
15571565

1558-
with pytest.raises(Exception) as exc_info:
1566+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15591567
DBOS.retrieve_workflow(start_child).get_result()
1560-
assert "was cancelled" in str(exc_info.value)
15611568

1562-
with pytest.raises(Exception) as exc_info:
1569+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15631570
DBOS.retrieve_workflow(direct_child).get_result()
1564-
assert "was cancelled" in str(exc_info.value)
15651571

15661572
# Verify the context variables are set correctly
15671573
with SetWorkflowTimeout(1.0):

tests/test_failures.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from dbos import DBOS, Queue, SetWorkflowID
1111
from dbos._error import (
12+
DBOSAwaitedWorkflowCancelledError,
1213
DBOSDeadLetterQueueError,
1314
DBOSMaxStepRetriesExceeded,
1415
DBOSNotAuthorizedError,
@@ -461,6 +462,11 @@ def test_error_serialization() -> None:
461462
d = deserialize_exception(serialize_exception(e))
462463
assert isinstance(d, DBOSQueueDeduplicatedError)
463464
assert str(d) == str(e)
465+
# AwaitedWorkflowCancelledError
466+
e = DBOSAwaitedWorkflowCancelledError("id")
467+
d = deserialize_exception(serialize_exception(e))
468+
assert isinstance(d, DBOSAwaitedWorkflowCancelledError)
469+
assert str(d) == str(e)
464470

465471
# Test safe_deserialize
466472
class BadException(Exception):

tests/test_queue.py

Lines changed: 116 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
)
2727
from dbos._context import assert_current_dbos_context
2828
from dbos._dbos import WorkflowHandleAsync
29+
from dbos._error import DBOSAwaitedWorkflowCancelledError, DBOSWorkflowCancelledError
2930
from dbos._schemas.system_database import SystemSchema
3031
from dbos._sys_db import WorkflowStatusString
3132
from dbos._utils import GlobalParams
@@ -853,9 +854,8 @@ def regular_workflow() -> None:
853854

854855
# Complete the blocked workflow
855856
blocking_event.set()
856-
with pytest.raises(Exception) as exc_info:
857+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
857858
blocked_handle.get_result()
858-
assert "was cancelled" in str(exc_info.value)
859859

860860
# Verify all queue entries eventually get cleaned up.
861861
assert queue_entries_are_cleaned_up(dbos)
@@ -891,9 +891,8 @@ def normal_workflow() -> None:
891891

892892
# Verify the blocked workflows are cancelled
893893
for handle in handles:
894-
with pytest.raises(Exception) as exc_info:
894+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
895895
handle.get_result()
896-
assert "was cancelled" in str(exc_info.value)
897896

898897
# Verify the normal workflow succeeds
899898
normal_handle.get_result()
@@ -911,17 +910,14 @@ def parent_workflow() -> None:
911910

912911
with SetWorkflowTimeout(1.0):
913912
handle = queue.enqueue(parent_workflow)
914-
with pytest.raises(Exception) as exc_info:
913+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
915914
handle.get_result()
916-
assert "was cancelled" in str(exc_info.value)
917915

918-
with pytest.raises(Exception) as exc_info:
916+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
919917
DBOS.retrieve_workflow(child_id).get_result()
920-
assert "was cancelled" in str(exc_info.value)
921918

922919
# Verify if a parent called with a timeout enqueues a blocked child
923920
# then exits the deadline propagates and the child is cancelled.
924-
child_id = str(uuid.uuid4())
925921
queue = Queue("regular_queue")
926922

927923
@DBOS.workflow()
@@ -931,9 +927,41 @@ def exiting_parent_workflow() -> str:
931927

932928
with SetWorkflowTimeout(1.0):
933929
child_id = exiting_parent_workflow()
934-
with pytest.raises(Exception) as exc_info:
930+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
931+
DBOS.retrieve_workflow(child_id).get_result()
932+
933+
# Verify if a parent called with a timeout enqueues a child that
934+
# never starts because the queue is blocked, the deadline propagates
935+
# and both parent and child are cancelled.
936+
child_id = str(uuid.uuid4())
937+
queue = Queue("stuck_queue", concurrency=1)
938+
939+
start_event = threading.Event()
940+
blocking_event = threading.Event()
941+
942+
@DBOS.workflow()
943+
def stuck_workflow() -> None:
944+
start_event.set()
945+
blocking_event.wait()
946+
947+
stuck_handle = queue.enqueue(stuck_workflow)
948+
start_event.wait()
949+
950+
@DBOS.workflow()
951+
def blocked_parent_workflow() -> None:
952+
with SetWorkflowID(child_id):
953+
queue.enqueue(blocking_workflow)
954+
while True:
955+
DBOS.sleep(0.1)
956+
957+
with SetWorkflowTimeout(1.0):
958+
handle = DBOS.start_workflow(blocked_parent_workflow)
959+
with pytest.raises(DBOSWorkflowCancelledError):
960+
handle.get_result()
961+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
935962
DBOS.retrieve_workflow(child_id).get_result()
936-
assert "was cancelled" in str(exc_info.value)
963+
blocking_event.set()
964+
stuck_handle.get_result()
937965

938966
# Verify all queue entries eventually get cleaned up.
939967
assert queue_entries_are_cleaned_up(dbos)
@@ -1341,3 +1369,80 @@ def test_workflow() -> str:
13411369
# Change the version, verify the other version complets
13421370
GlobalParams.app_version = other_version
13431371
assert other_version_handle.get_result()
1372+
1373+
1374+
def test_timeout_queue_recovery(dbos: DBOS) -> None:
1375+
queue = Queue("test_queue")
1376+
evt = threading.Event()
1377+
1378+
@DBOS.workflow()
1379+
def blocking_workflow() -> None:
1380+
evt.set()
1381+
while True:
1382+
DBOS.sleep(0.1)
1383+
1384+
timeout = 3.0
1385+
enqueue_time = time.time()
1386+
with SetWorkflowTimeout(timeout):
1387+
original_handle = queue.enqueue(blocking_workflow)
1388+
1389+
# Verify the workflow's timeout is properly configured
1390+
evt.wait()
1391+
original_status = original_handle.get_status()
1392+
assert original_status.workflow_timeout_ms == timeout * 1000
1393+
assert (
1394+
original_status.workflow_deadline_epoch_ms is not None
1395+
and original_status.workflow_deadline_epoch_ms > enqueue_time * 1000
1396+
)
1397+
1398+
# Recover the workflow. Verify its deadline remains the same
1399+
evt.clear()
1400+
handles = DBOS._recover_pending_workflows()
1401+
assert len(handles) == 1
1402+
evt.wait()
1403+
recovered_handle = handles[0]
1404+
recovered_status = recovered_handle.get_status()
1405+
assert recovered_status.workflow_timeout_ms == timeout * 1000
1406+
assert (
1407+
recovered_status.workflow_deadline_epoch_ms
1408+
== original_status.workflow_deadline_epoch_ms
1409+
)
1410+
1411+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
1412+
original_handle.get_result()
1413+
1414+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
1415+
recovered_handle.get_result()
1416+
1417+
1418+
def test_unsetting_timeout(dbos: DBOS) -> None:
1419+
1420+
queue = Queue("test_queue")
1421+
1422+
@DBOS.workflow()
1423+
def child() -> str:
1424+
for _ in range(5):
1425+
DBOS.sleep(1)
1426+
return DBOS.workflow_id
1427+
1428+
@DBOS.workflow()
1429+
def parent(child_one: str, child_two: str) -> None:
1430+
with SetWorkflowID(child_two):
1431+
with SetWorkflowTimeout(None):
1432+
queue.enqueue(child)
1433+
1434+
with SetWorkflowID(child_one):
1435+
queue.enqueue(child)
1436+
1437+
child_one, child_two = str(uuid.uuid4()), str(uuid.uuid4())
1438+
with SetWorkflowTimeout(1.0):
1439+
queue.enqueue(parent, child_one, child_two).get_result()
1440+
1441+
# Verify child one, which has a propagated timeout, is cancelled
1442+
handle: WorkflowHandle[str] = DBOS.retrieve_workflow(child_one)
1443+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
1444+
handle.get_result()
1445+
1446+
# Verify child two, which doesn't have a timeout, succeeds
1447+
handle = DBOS.retrieve_workflow(child_two)
1448+
assert handle.get_result() == child_two

tests/test_workflow_introspection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ def simple_workflow(x: int) -> int:
4141
assert output.app_version == GlobalParams.app_version
4242
assert output.app_id == ""
4343
assert output.recovery_attempts == 1
44+
assert output.workflow_timeout_ms is None
45+
assert output.workflow_deadline_epoch_ms is None
4446

4547
# Test searching by status
4648
outputs = DBOS.list_workflows(status="PENDING")
@@ -222,6 +224,8 @@ def blocking_step(i: int) -> int:
222224
assert workflow.created_at is not None and workflow.created_at > 0
223225
assert workflow.updated_at is not None and workflow.updated_at > 0
224226
assert workflow.recovery_attempts == 1
227+
assert workflow.workflow_timeout_ms is None
228+
assert workflow.workflow_deadline_epoch_ms is None
225229

226230
# Test sort_desc inverts the order
227231
workflows = DBOS.list_queued_workflows(sort_desc=True)

0 commit comments

Comments
 (0)