Skip to content

Commit 4afd4d9

Browse files
authored
Test & Improve Queue Semantics (#202)
- Cancelling a workflow removes it from any queue it is in. This way, cancelled workflows don't block queues. - Resume sets status to `PENDING` and resets recovery attempts. It also removes the workflow from any queue it is in. This way, resume can safely start DLQ'ed, `CANCELLED`, and `ENQUEUED` workflows. - Lots and lots of tests.
1 parent 97562ec commit 4afd4d9

File tree

8 files changed

+316
-126
lines changed

8 files changed

+316
-126
lines changed

dbos/_dbos.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -800,14 +800,13 @@ def recover_pending_workflows(
800800
@classmethod
801801
def cancel_workflow(cls, workflow_id: str) -> None:
802802
"""Cancel a workflow by ID."""
803-
_get_dbos_instance()._sys_db.set_workflow_status(
804-
workflow_id, WorkflowStatusString.CANCELLED
805-
)
803+
_get_dbos_instance()._sys_db.cancel_workflow(workflow_id)
806804

807805
@classmethod
808-
def resume_workflow(cls, workflow_id: str) -> None:
806+
def resume_workflow(cls, workflow_id: str) -> WorkflowHandle[Any]:
809807
"""Resume a workflow by ID."""
810-
execute_workflow_by_id(_get_dbos_instance(), workflow_id, False)
808+
_get_dbos_instance()._sys_db.resume_workflow(workflow_id)
809+
return execute_workflow_by_id(_get_dbos_instance(), workflow_id, False)
811810

812811
@classproperty
813812
def logger(cls) -> Logger:

dbos/_sys_db.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -390,20 +390,55 @@ def update_workflow_status(
390390
if status["workflow_uuid"] in self._temp_txn_wf_ids:
391391
self._exported_temp_txn_wf_status.add(status["workflow_uuid"])
392392

393-
def set_workflow_status(
393+
def cancel_workflow(
394394
self,
395-
workflow_uuid: str,
396-
status: WorkflowStatusString,
395+
workflow_id: str,
397396
) -> None:
398397
with self.engine.begin() as c:
399-
stmt = (
398+
# Remove the workflow from the queues table so it does not block the table
399+
c.execute(
400+
sa.delete(SystemSchema.workflow_queue).where(
401+
SystemSchema.workflow_queue.c.workflow_uuid == workflow_id
402+
)
403+
)
404+
# Set the workflow's status to CANCELLED
405+
c.execute(
400406
sa.update(SystemSchema.workflow_status)
401-
.where(SystemSchema.workflow_status.c.workflow_uuid == workflow_uuid)
407+
.where(SystemSchema.workflow_status.c.workflow_uuid == workflow_id)
402408
.values(
403-
status=status,
409+
status=WorkflowStatusString.CANCELLED.value,
410+
)
411+
)
412+
413+
def resume_workflow(
414+
self,
415+
workflow_id: str,
416+
) -> None:
417+
with self.engine.begin() as c:
418+
# Check the status of the workflow. If it is complete, do nothing.
419+
row = c.execute(
420+
sa.select(
421+
SystemSchema.workflow_status.c.status,
422+
).where(SystemSchema.workflow_status.c.workflow_uuid == workflow_id)
423+
).fetchone()
424+
if (
425+
row is None
426+
or row[0] == WorkflowStatusString.SUCCESS.value
427+
or row[0] == WorkflowStatusString.ERROR.value
428+
):
429+
return
430+
# Remove the workflow from the queues table so resume can safely be called on an ENQUEUED workflow
431+
c.execute(
432+
sa.delete(SystemSchema.workflow_queue).where(
433+
SystemSchema.workflow_queue.c.workflow_uuid == workflow_id
404434
)
405435
)
406-
c.execute(stmt)
436+
# Set the workflow's status to PENDING and clear its recovery attempts.
437+
c.execute(
438+
sa.update(SystemSchema.workflow_status)
439+
.where(SystemSchema.workflow_status.c.workflow_uuid == workflow_id)
440+
.values(status=WorkflowStatusString.PENDING.value, recovery_attempts=0)
441+
)
407442

408443
def get_workflow_status(
409444
self, workflow_uuid: str

dbos/_workflow_commands.py

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,14 @@
1-
import importlib
2-
import os
3-
import sys
4-
from typing import Any, List, Optional, cast
1+
from typing import List, Optional, cast
52

63
import typer
7-
from rich import print
84

9-
from dbos import DBOS
10-
11-
from . import _serialization, load_config
12-
from ._core import execute_workflow_by_id
13-
from ._dbos_config import ConfigFile, _is_valid_app_name
5+
from . import _serialization
6+
from ._dbos_config import ConfigFile
147
from ._sys_db import (
158
GetWorkflowsInput,
169
GetWorkflowsOutput,
1710
SystemDatabase,
1811
WorkflowStatuses,
19-
WorkflowStatusInternal,
20-
WorkflowStatusString,
2112
)
2213

2314

@@ -41,7 +32,7 @@ class WorkflowInformation:
4132
queue_name: Optional[str]
4233

4334

44-
def _list_workflows(
35+
def list_workflows(
4536
config: ConfigFile,
4637
li: int,
4738
user: Optional[str],
@@ -91,17 +82,13 @@ def _list_workflows(
9182
sys_db.destroy()
9283

9384

94-
def _get_workflow(
85+
def get_workflow(
9586
config: ConfigFile, uuid: str, request: bool
9687
) -> Optional[WorkflowInformation]:
97-
sys_db = None
98-
9988
try:
10089
sys_db = SystemDatabase(config)
101-
10290
info = _get_workflow_info(sys_db, uuid, request)
10391
return info
104-
10592
except Exception as e:
10693
typer.echo(f"Error getting workflow: {e}")
10794
return None
@@ -110,18 +97,13 @@ def _get_workflow(
11097
sys_db.destroy()
11198

11299

113-
def _cancel_workflow(config: ConfigFile, uuid: str) -> None:
114-
# config = load_config()
115-
sys_db = None
116-
100+
def cancel_workflow(config: ConfigFile, uuid: str) -> None:
117101
try:
118102
sys_db = SystemDatabase(config)
119-
sys_db.set_workflow_status(uuid, WorkflowStatusString.CANCELLED)
120-
return
121-
103+
sys_db.cancel_workflow(uuid)
122104
except Exception as e:
123105
typer.echo(f"Failed to connect to DBOS system database: {e}")
124-
return None
106+
raise e
125107
finally:
126108
if sys_db:
127109
sys_db.destroy()

dbos/cli/cli.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from .._app_db import ApplicationDatabase
2020
from .._dbos_config import _is_valid_app_name
2121
from .._sys_db import SystemDatabase, reset_system_database
22-
from .._workflow_commands import _cancel_workflow, _get_workflow, _list_workflows
22+
from .._workflow_commands import cancel_workflow, get_workflow, list_workflows
2323
from ..cli._github_init import create_template_from_github
2424
from ._template_init import copy_template, get_project_name, get_templates_directory
2525

@@ -282,7 +282,7 @@ def list(
282282
] = None,
283283
) -> None:
284284
config = load_config()
285-
workflows = _list_workflows(
285+
workflows = list_workflows(
286286
config, limit, user, starttime, endtime, status, request, appversion
287287
)
288288
print(jsonpickle.encode(workflows, unpicklable=False))
@@ -301,7 +301,7 @@ def get(
301301
] = True,
302302
) -> None:
303303
config = load_config()
304-
print(jsonpickle.encode(_get_workflow(config, uuid, request), unpicklable=False))
304+
print(jsonpickle.encode(get_workflow(config, uuid, request), unpicklable=False))
305305

306306

307307
@workflow.command(
@@ -315,7 +315,7 @@ def cancel(
315315
] = None,
316316
) -> None:
317317
config = load_config()
318-
_cancel_workflow(config, uuid)
318+
cancel_workflow(config, uuid)
319319
print(f"Workflow {uuid} has been cancelled")
320320

321321

tests/test_admin_server.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -151,54 +151,59 @@ def test_admin_diff_port(cleanup_test_databases: None) -> None:
151151

152152

153153
def test_admin_workflow_resume(dbos: DBOS, config: ConfigFile) -> None:
154+
counter: int = 0
154155

155156
@DBOS.workflow()
156157
def simple_workflow() -> None:
157-
print("Executed Simple workflow")
158-
return
158+
nonlocal counter
159+
counter += 1
159160

160-
# run the workflow
161+
# Run the workflow and flush its results
161162
simple_workflow()
162-
time.sleep(1)
163+
assert counter == 1
164+
dbos._sys_db.wait_for_buffer_flush()
163165

164-
# get the workflow list
165-
output = _workflow_commands._list_workflows(
166+
# Verify the workflow has succeeded
167+
output = _workflow_commands.list_workflows(
166168
config, 10, None, None, None, None, False, None
167169
)
168170
assert len(output) == 1, f"Expected list length to be 1, but got {len(output)}"
169-
170171
assert output[0] != None, "Expected output to be not None"
171-
172172
wfUuid = output[0].workflowUUID
173-
174-
info = _workflow_commands._get_workflow(config, wfUuid, True)
173+
info = _workflow_commands.get_workflow(config, wfUuid, True)
175174
assert info is not None, "Expected output to be not None"
176-
177175
assert info.status == "SUCCESS", f"Expected status to be SUCCESS"
178176

177+
# Cancel the workflow. Verify it was cancelled
179178
response = requests.post(
180179
f"http://localhost:3001/workflows/{wfUuid}/cancel", json=[], timeout=5
181180
)
182181
assert response.status_code == 204
182+
info = _workflow_commands.get_workflow(config, wfUuid, True)
183+
assert info is not None
184+
assert info.status == "CANCELLED", f"Expected status to be CANCELLED"
183185

184-
info = _workflow_commands._get_workflow(config, wfUuid, True)
185-
if info is not None:
186-
assert info.status == "CANCELLED", f"Expected status to be CANCELLED"
187-
else:
188-
assert False, "Expected info to be not None"
189-
186+
# Resume the workflow. Verify that it succeeds again.
190187
response = requests.post(
191188
f"http://localhost:3001/workflows/{wfUuid}/resume", json=[], timeout=5
192189
)
193190
assert response.status_code == 204
191+
dbos._sys_db.wait_for_buffer_flush()
192+
assert counter == 2
193+
info = _workflow_commands.get_workflow(config, wfUuid, True)
194+
assert info is not None
195+
assert info.status == "SUCCESS", f"Expected status to be SUCCESS"
194196

195-
time.sleep(1)
196-
197-
info = _workflow_commands._get_workflow(config, wfUuid, True)
198-
if info is not None:
199-
assert info.status == "SUCCESS", f"Expected status to be SUCCESS"
200-
else:
201-
assert False, "Expected info to be not None"
197+
# Resume the workflow. Verify it does not run and status remains SUCCESS
198+
response = requests.post(
199+
f"http://localhost:3001/workflows/{wfUuid}/resume", json=[], timeout=5
200+
)
201+
assert response.status_code == 204
202+
dbos._sys_db.wait_for_buffer_flush()
203+
info = _workflow_commands.get_workflow(config, wfUuid, True)
204+
assert info is not None
205+
assert info.status == "SUCCESS", f"Expected status to be SUCCESS"
206+
assert counter == 2
202207

203208

204209
def test_admin_workflow_restart(dbos: DBOS, config: ConfigFile) -> None:
@@ -213,7 +218,7 @@ def simple_workflow() -> None:
213218
time.sleep(1)
214219

215220
# get the workflow list
216-
output = _workflow_commands._list_workflows(
221+
output = _workflow_commands.list_workflows(
217222
config, 10, None, None, None, None, False, None
218223
)
219224
assert len(output) == 1, f"Expected list length to be 1, but got {len(output)}"
@@ -222,7 +227,7 @@ def simple_workflow() -> None:
222227

223228
wfUuid = output[0].workflowUUID
224229

225-
info = _workflow_commands._get_workflow(config, wfUuid, True)
230+
info = _workflow_commands.get_workflow(config, wfUuid, True)
226231
assert info is not None, "Expected output to be not None"
227232

228233
assert info.status == "SUCCESS", f"Expected status to be SUCCESS"
@@ -232,7 +237,7 @@ def simple_workflow() -> None:
232237
)
233238
assert response.status_code == 204
234239

235-
info = _workflow_commands._get_workflow(config, wfUuid, True)
240+
info = _workflow_commands.get_workflow(config, wfUuid, True)
236241
if info is not None:
237242
assert info.status == "CANCELLED", f"Expected status to be CANCELLED"
238243
else:
@@ -245,13 +250,13 @@ def simple_workflow() -> None:
245250

246251
time.sleep(1)
247252

248-
info = _workflow_commands._get_workflow(config, wfUuid, True)
253+
info = _workflow_commands.get_workflow(config, wfUuid, True)
249254
if info is not None:
250255
assert info.status == "CANCELLED", f"Expected status to be CANCELLED"
251256
else:
252257
assert False, "Expected info to be not None"
253258

254-
output = _workflow_commands._list_workflows(
259+
output = _workflow_commands.list_workflows(
255260
config, 10, None, None, None, None, False, None
256261
)
257262
assert len(output) == 2, f"Expected list length to be 2, but got {len(output)}"
@@ -261,7 +266,7 @@ def simple_workflow() -> None:
261266
else:
262267
new_wfUuid = output[0].workflowUUID
263268

264-
info = _workflow_commands._get_workflow(config, new_wfUuid, True)
269+
info = _workflow_commands.get_workflow(config, new_wfUuid, True)
265270
if info is not None:
266271
assert info.status == "SUCCESS", f"Expected status to be SUCCESS"
267272
else:

0 commit comments

Comments
 (0)