Skip to content

Commit 28ed708

Browse files
authored
Improve Cancellation Semantics (#413)
If a workflow is cancelled, it should throw DBOSAwaitedWorkflowCancelledError, not DBOSWorkflowCancelledError, to its caller.
1 parent b488c32 commit 28ed708

File tree

7 files changed

+26
-27
lines changed

7 files changed

+26
-27
lines changed

dbos/_core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
get_local_dbos_context,
5050
)
5151
from ._error import (
52+
DBOSAwaitedWorkflowCancelledError,
5253
DBOSException,
5354
DBOSMaxStepRetriesExceeded,
5455
DBOSNonExistentWorkflowError,
@@ -370,7 +371,7 @@ def persist(func: Callable[[], R]) -> R:
370371
r: R = dbos._sys_db.await_workflow_result(status["workflow_uuid"])
371372
return r
372373
except DBOSWorkflowCancelledError as error:
373-
raise
374+
raise DBOSAwaitedWorkflowCancelledError(status["workflow_uuid"])
374375
except Exception as error:
375376
if not dbos.debug_mode:
376377
dbos._sys_db.update_workflow_outcome(

tests/test_admin_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
WorkflowHandle,
2121
_workflow_commands,
2222
)
23-
from dbos._error import DBOSWorkflowCancelledError
23+
from dbos._error import DBOSAwaitedWorkflowCancelledError
2424
from dbos._schemas.system_database import SystemSchema
2525
from dbos._sys_db import SystemDatabase, WorkflowStatusString
2626
from dbos._utils import INTERNAL_QUEUE_NAME, GlobalParams
@@ -306,7 +306,7 @@ def blocking_workflow() -> None:
306306
)
307307
assert response.status_code == 204
308308
event.set()
309-
with pytest.raises(DBOSWorkflowCancelledError):
309+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
310310
handle.get_result()
311311
info = _workflow_commands.get_workflow(sys_db, wfid)
312312
assert info is not None
@@ -750,7 +750,7 @@ def workflow() -> None:
750750
timeout=5,
751751
)
752752
response.raise_for_status()
753-
with pytest.raises(DBOSWorkflowCancelledError):
753+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
754754
handle.get_result()
755755

756756

tests/test_async.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from dbos._context import assert_current_dbos_context
1919
from dbos._dbos import WorkflowHandle
2020
from dbos._dbos_config import ConfigFile
21-
from dbos._error import DBOSException, DBOSWorkflowCancelledError
21+
from dbos._error import DBOSAwaitedWorkflowCancelledError, DBOSException
2222

2323

2424
@pytest.mark.asyncio
@@ -492,20 +492,20 @@ async def blocked_workflow() -> None:
492492
DBOS.sleep(0.1)
493493

494494
with SetWorkflowTimeout(0.1):
495-
with pytest.raises(DBOSWorkflowCancelledError):
495+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
496496
await blocked_workflow()
497497
handle = await DBOS.start_workflow_async(blocked_workflow)
498-
with pytest.raises(DBOSWorkflowCancelledError):
498+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
499499
await handle.get_result()
500500

501501
@DBOS.workflow()
502502
async def parent_workflow_with_timeout() -> None:
503503
assert assert_current_dbos_context().workflow_deadline_epoch_ms is None
504504
with SetWorkflowTimeout(0.1):
505-
with pytest.raises(DBOSWorkflowCancelledError):
505+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
506506
await blocked_workflow()
507507
handle = await DBOS.start_workflow_async(blocked_workflow)
508-
with pytest.raises(DBOSWorkflowCancelledError):
508+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
509509
await handle.get_result()
510510
assert assert_current_dbos_context().workflow_deadline_epoch_ms is None
511511

@@ -526,7 +526,7 @@ async def parent_workflow() -> None:
526526
# Verify if a parent called with a timeout calls a blocked child
527527
# the deadline propagates and the children are also cancelled.
528528
with SetWorkflowTimeout(1.0):
529-
with pytest.raises(DBOSWorkflowCancelledError):
529+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
530530
await parent_workflow()
531531

532532
with pytest.raises(Exception) as exc_info:

tests/test_async_workflow_management.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import pytest
88

99
from dbos import DBOS, Queue, SetWorkflowID
10-
from dbos._error import DBOSWorkflowCancelledError
10+
from dbos._error import DBOSAwaitedWorkflowCancelledError
1111
from dbos._sys_db import StepInfo, WorkflowStatus
1212
from tests.conftest import queue_entries_are_cleaned_up
1313

@@ -46,7 +46,7 @@ def simple_workflow(x: int) -> int:
4646
await DBOS.cancel_workflow_async(wfid)
4747
workflow_event.set()
4848

49-
with pytest.raises(DBOSWorkflowCancelledError):
49+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
5050
handle.get_result()
5151
assert steps_completed == 1
5252

@@ -85,7 +85,7 @@ def simple_workflow(x: int) -> int:
8585
DBOS.cancel_workflow(wfid)
8686
workflow_event.set()
8787

88-
with pytest.raises(DBOSWorkflowCancelledError):
88+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
8989
handle.get_result()
9090
assert steps_completed == 1
9191

tests/test_dbos.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
DBOSAwaitedWorkflowCancelledError,
2929
DBOSConflictingRegistrationError,
3030
DBOSMaxStepRetriesExceeded,
31-
DBOSWorkflowCancelledError,
3231
)
3332
from dbos._schemas.system_database import SystemSchema
3433
from dbos._sys_db import GetWorkflowsInput
@@ -1493,7 +1492,7 @@ def blocked_workflow() -> None:
14931492
# Verify a blocked workflow called with a timeout is cancelled
14941493
wfid = str(uuid.uuid4())
14951494
with SetWorkflowTimeout(0.1):
1496-
with pytest.raises(DBOSWorkflowCancelledError):
1495+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
14971496
with SetWorkflowID(wfid):
14981497
blocked_workflow()
14991498
assert assert_current_dbos_context().workflow_deadline_epoch_ms is None
@@ -1505,7 +1504,7 @@ def blocked_workflow() -> None:
15051504
status.workflow_deadline_epoch_ms is not None
15061505
and status.workflow_deadline_epoch_ms > start_time
15071506
)
1508-
with pytest.raises(DBOSWorkflowCancelledError):
1507+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15091508
handle.get_result()
15101509

15111510
# Change the workflow status to pending
@@ -1518,17 +1517,17 @@ def blocked_workflow() -> None:
15181517
# Recover the workflow, verify it still times out
15191518
handles = DBOS._recover_pending_workflows()
15201519
assert len(handles) == 1
1521-
with pytest.raises(DBOSWorkflowCancelledError):
1520+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15221521
handles[0].get_result()
15231522

15241523
@DBOS.workflow()
15251524
def parent_workflow_with_timeout() -> None:
15261525
assert assert_current_dbos_context().workflow_deadline_epoch_ms is None
15271526
with SetWorkflowTimeout(0.1):
1528-
with pytest.raises(DBOSWorkflowCancelledError):
1527+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15291528
blocked_workflow()
15301529
handle = DBOS.start_workflow(blocked_workflow)
1531-
with pytest.raises(DBOSWorkflowCancelledError):
1530+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15321531
handle.get_result()
15331532
assert assert_current_dbos_context().workflow_deadline_epoch_ms is None
15341533

@@ -1549,7 +1548,7 @@ def parent_workflow() -> None:
15491548
# Verify if a parent called with a timeout calls a blocked child
15501549
# the deadline propagates and the children are also cancelled.
15511550
with SetWorkflowTimeout(1.0):
1552-
with pytest.raises(DBOSWorkflowCancelledError):
1551+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
15531552
parent_workflow()
15541553

15551554
with pytest.raises(DBOSAwaitedWorkflowCancelledError):

tests/test_queue.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import asyncio
2-
import logging
32
import multiprocessing
43
import multiprocessing.synchronize
54
import os
@@ -27,7 +26,7 @@
2726
)
2827
from dbos._context import assert_current_dbos_context
2928
from dbos._dbos import WorkflowHandleAsync
30-
from dbos._error import DBOSAwaitedWorkflowCancelledError, DBOSWorkflowCancelledError
29+
from dbos._error import DBOSAwaitedWorkflowCancelledError
3130
from dbos._schemas.system_database import SystemSchema
3231
from dbos._sys_db import WorkflowStatusString
3332
from dbos._utils import GlobalParams
@@ -943,7 +942,7 @@ def blocked_parent_workflow() -> None:
943942

944943
with SetWorkflowTimeout(1.0):
945944
handle = DBOS.start_workflow(blocked_parent_workflow)
946-
with pytest.raises(DBOSWorkflowCancelledError):
945+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
947946
handle.get_result()
948947
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
949948
DBOS.retrieve_workflow(child_id).get_result()

tests/test_workflow_management.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# Public API
1010
from dbos import DBOS, Queue, SetWorkflowID
1111
from dbos._dbos import DBOSConfiguredInstance
12-
from dbos._error import DBOSWorkflowCancelledError
12+
from dbos._error import DBOSAwaitedWorkflowCancelledError
1313
from dbos._schemas.application_database import ApplicationSchema
1414
from dbos._utils import INTERNAL_QUEUE_NAME, GlobalParams
1515
from dbos._workflow_commands import garbage_collect, global_timeout
@@ -52,7 +52,7 @@ def simple_workflow(x: int) -> int:
5252
main_thread_event.wait()
5353
DBOS.cancel_workflow(wfid)
5454
workflow_event.set()
55-
with pytest.raises(DBOSWorkflowCancelledError):
55+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
5656
handle.get_result()
5757
assert steps_completed == 1
5858

@@ -103,7 +103,7 @@ def simple_workflow(x: int) -> int:
103103
main_thread_event.wait()
104104
DBOS.cancel_workflow(wfid)
105105
workflow_event.set()
106-
with pytest.raises(DBOSWorkflowCancelledError):
106+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
107107
handle.get_result()
108108
assert txn_completed == 1
109109

@@ -746,7 +746,7 @@ def blocked_workflow() -> str:
746746

747747
# Verify all workflows started before the global timeout are cancelled
748748
for handle in handles:
749-
with pytest.raises(DBOSWorkflowCancelledError):
749+
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
750750
handle.get_result()
751751
event.set()
752752
assert final_handle.get_result() is not None

0 commit comments

Comments
 (0)