Skip to content

Commit 983a028

Browse files
authored
Workflow cancel , restart, resume as admin API (#187)
1 parent 5abd162 commit 983a028

File tree

6 files changed

+273
-18
lines changed

6 files changed

+273
-18
lines changed

dbos/_admin_server.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import json
4+
import re
45
import threading
56
from functools import partial
67
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
@@ -15,6 +16,9 @@
1516
_health_check_path = "/dbos-healthz"
1617
_workflow_recovery_path = "/dbos-workflow-recovery"
1718
_deactivate_path = "/deactivate"
19+
# /workflows/:workflow_id/cancel
20+
# /workflows/:workflow_id/resume
21+
# /workflows/:workflow_id/restart
1822

1923

2024
class AdminServer:
@@ -79,12 +83,51 @@ def do_POST(self) -> None:
7983
self._end_headers()
8084
self.wfile.write(json.dumps(workflow_ids).encode("utf-8"))
8185
else:
82-
self.send_response(404)
83-
self._end_headers()
86+
87+
restart_match = re.match(
88+
r"^/workflows/(?P<workflow_id>[^/]+)/restart$", self.path
89+
)
90+
resume_match = re.match(
91+
r"^/workflows/(?P<workflow_id>[^/]+)/resume$", self.path
92+
)
93+
cancel_match = re.match(
94+
r"^/workflows/(?P<workflow_id>[^/]+)/cancel$", self.path
95+
)
96+
97+
if restart_match:
98+
workflow_id = restart_match.group("workflow_id")
99+
self._handle_restart(workflow_id)
100+
elif resume_match:
101+
workflow_id = resume_match.group("workflow_id")
102+
self._handle_resume(workflow_id)
103+
elif cancel_match:
104+
workflow_id = cancel_match.group("workflow_id")
105+
self._handle_cancel(workflow_id)
106+
else:
107+
self.send_response(404)
108+
self._end_headers()
84109

85110
def log_message(self, format: str, *args: Any) -> None:
86111
return # Disable admin server request logging
87112

113+
def _handle_restart(self, workflow_id: str) -> None:
114+
self.dbos.restart_workflow(workflow_id)
115+
print("Restarting workflow", workflow_id)
116+
self.send_response(204)
117+
self._end_headers()
118+
119+
def _handle_resume(self, workflow_id: str) -> None:
120+
print("Resuming workflow", workflow_id)
121+
self.dbos.resume_workflow(workflow_id)
122+
self.send_response(204)
123+
self._end_headers()
124+
125+
def _handle_cancel(self, workflow_id: str) -> None:
126+
print("Cancelling workflow", workflow_id)
127+
self.dbos.cancel_workflow(workflow_id)
128+
self.send_response(204)
129+
self._end_headers()
130+
88131

89132
# Be consistent with DBOS-TS response.
90133
class PerfUtilization(TypedDict):

dbos/_core.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,9 @@ def _execute_workflow_wthread(
266266
raise
267267

268268

269-
def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[Any]":
269+
def execute_workflow_by_id(
270+
dbos: "DBOS", workflow_id: str, startNew: bool = False
271+
) -> "WorkflowHandle[Any]":
270272
status = dbos._sys_db.get_workflow_status(workflow_id)
271273
if not status:
272274
raise DBOSRecoveryError(workflow_id, "Workflow status not found")
@@ -293,7 +295,8 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
293295
workflow_id,
294296
f"Cannot execute workflow because instance '{iname}' is not registered",
295297
)
296-
with SetWorkflowID(workflow_id):
298+
299+
if startNew:
297300
return start_workflow(
298301
dbos,
299302
wf_func,
@@ -303,14 +306,26 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
303306
*inputs["args"],
304307
**inputs["kwargs"],
305308
)
309+
else:
310+
with SetWorkflowID(workflow_id):
311+
return start_workflow(
312+
dbos,
313+
wf_func,
314+
status["queue_name"],
315+
True,
316+
dbos._registry.instance_info_map[iname],
317+
*inputs["args"],
318+
**inputs["kwargs"],
319+
)
306320
elif status["class_name"] is not None:
307321
class_name = status["class_name"]
308322
if class_name not in dbos._registry.class_info_map:
309323
raise DBOSWorkflowFunctionNotFoundError(
310324
workflow_id,
311325
f"Cannot execute workflow because class '{class_name}' is not registered",
312326
)
313-
with SetWorkflowID(workflow_id):
327+
328+
if startNew:
314329
return start_workflow(
315330
dbos,
316331
wf_func,
@@ -320,8 +335,19 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
320335
*inputs["args"],
321336
**inputs["kwargs"],
322337
)
338+
else:
339+
with SetWorkflowID(workflow_id):
340+
return start_workflow(
341+
dbos,
342+
wf_func,
343+
status["queue_name"],
344+
True,
345+
dbos._registry.class_info_map[class_name],
346+
*inputs["args"],
347+
**inputs["kwargs"],
348+
)
323349
else:
324-
with SetWorkflowID(workflow_id):
350+
if startNew:
325351
return start_workflow(
326352
dbos,
327353
wf_func,
@@ -330,6 +356,16 @@ def execute_workflow_by_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[An
330356
*inputs["args"],
331357
**inputs["kwargs"],
332358
)
359+
else:
360+
with SetWorkflowID(workflow_id):
361+
return start_workflow(
362+
dbos,
363+
wf_func,
364+
status["queue_name"],
365+
True,
366+
*inputs["args"],
367+
**inputs["kwargs"],
368+
)
333369

334370

335371
@overload

dbos/_dbos.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
)
5757
from ._roles import default_required_roles, required_roles
5858
from ._scheduler import ScheduledWorkflow, scheduled
59+
from ._sys_db import WorkflowStatusString
5960
from ._tracer import dbos_tracer
6061

6162
if TYPE_CHECKING:
@@ -231,6 +232,7 @@ def __new__(
231232
f"DBOS configured multiple times with conflicting information"
232233
)
233234
config = _dbos_global_registry.config
235+
234236
_dbos_global_instance = super().__new__(cls)
235237
_dbos_global_instance.__init__(fastapi=fastapi, config=config, flask=flask) # type: ignore
236238
else:
@@ -767,13 +769,30 @@ def execute_workflow_id(cls, workflow_id: str) -> WorkflowHandle[Any]:
767769
"""Execute a workflow by ID (for recovery)."""
768770
return execute_workflow_by_id(_get_dbos_instance(), workflow_id)
769771

772+
@classmethod
773+
def restart_workflow(cls, workflow_id: str) -> None:
774+
"""Execute a workflow by ID (for recovery)."""
775+
execute_workflow_by_id(_get_dbos_instance(), workflow_id, True)
776+
770777
@classmethod
771778
def recover_pending_workflows(
772779
cls, executor_ids: List[str] = ["local"]
773780
) -> List[WorkflowHandle[Any]]:
774781
"""Find all PENDING workflows and execute them."""
775782
return recover_pending_workflows(_get_dbos_instance(), executor_ids)
776783

784+
@classmethod
785+
def cancel_workflow(cls, workflow_id: str) -> None:
786+
"""Cancel a workflow by ID."""
787+
_get_dbos_instance()._sys_db.set_workflow_status(
788+
workflow_id, WorkflowStatusString.CANCELLED, False
789+
)
790+
791+
@classmethod
792+
def resume_workflow(cls, workflow_id: str) -> None:
793+
"""Resume a workflow by ID."""
794+
execute_workflow_by_id(_get_dbos_instance(), workflow_id, False)
795+
777796
@classproperty
778797
def logger(cls) -> Logger:
779798
"""Return the DBOS `Logger` for the current context."""

dbos/_workflow_commands.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import importlib
2+
import os
3+
import sys
14
from typing import Any, List, Optional, cast
25

36
import typer
@@ -6,6 +9,7 @@
69
from dbos import DBOS
710

811
from . import _serialization, load_config
12+
from ._core import execute_workflow_by_id
913
from ._dbos_config import ConfigFile, _is_valid_app_name
1014
from ._sys_db import (
1115
GetWorkflowsInput,
@@ -123,11 +127,6 @@ def _cancel_workflow(config: ConfigFile, uuid: str) -> None:
123127
sys_db.destroy()
124128

125129

126-
def _reattempt_workflow(uuid: str, startNewWorkflow: bool) -> None:
127-
print(f"Reattempt workflow info for {uuid} not implemented")
128-
return
129-
130-
131130
def _get_workflow_info(
132131
sys_db: SystemDatabase, workflowUUID: str, getRequest: bool
133132
) -> Optional[WorkflowInformation]:

dbos/cli.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import Any
1010

1111
import jsonpickle # type: ignore
12+
import requests
1213
import sqlalchemy as sa
1314
import tomlkit
1415
import typer
@@ -22,12 +23,7 @@
2223
from ._app_db import ApplicationDatabase
2324
from ._dbos_config import _is_valid_app_name
2425
from ._sys_db import SystemDatabase
25-
from ._workflow_commands import (
26-
_cancel_workflow,
27-
_get_workflow,
28-
_list_workflows,
29-
_reattempt_workflow,
30-
)
26+
from ._workflow_commands import _cancel_workflow, _get_workflow, _list_workflows
3127

3228
app = typer.Typer()
3329
workflow = typer.Typer()
@@ -432,5 +428,49 @@ def cancel(
432428
print(f"Workflow {uuid} has been cancelled")
433429

434430

431+
@workflow.command(help="Resume a workflow that has been cancelled")
432+
def resume(
433+
uuid: Annotated[str, typer.Argument()],
434+
host: Annotated[
435+
typing.Optional[str],
436+
typer.Option("--host", "-h", help="Specify the admin host"),
437+
] = "localhost",
438+
port: Annotated[
439+
typing.Optional[int],
440+
typer.Option("--port", "-p", help="Specify the admin port"),
441+
] = 3001,
442+
) -> None:
443+
response = requests.post(
444+
f"http://{host}:{port}/workflows/{uuid}/resume", json=[], timeout=5
445+
)
446+
447+
if response.status_code == 200:
448+
print(f"Workflow {uuid} has been resumed")
449+
else:
450+
print(f"Failed to resume workflow {uuid}. Status code: {response.status_code}")
451+
452+
453+
@workflow.command(help="Restart a workflow from the beginning with a new id")
454+
def restart(
455+
uuid: Annotated[str, typer.Argument()],
456+
host: Annotated[
457+
typing.Optional[str],
458+
typer.Option("--host", "-h", help="Specify the admin host"),
459+
] = "localhost",
460+
port: Annotated[
461+
typing.Optional[int],
462+
typer.Option("--port", "-p", help="Specify the admin port"),
463+
] = 3001,
464+
) -> None:
465+
response = requests.post(
466+
f"http://{host}:{port}/workflows/{uuid}/restart", json=[], timeout=5
467+
)
468+
469+
if response.status_code == 200:
470+
print(f"Workflow {uuid} has been restarted")
471+
else:
472+
print(f"Failed to resume workflow {uuid}. Status code: {response.status_code}")
473+
474+
435475
if __name__ == "__main__":
436476
app()

0 commit comments

Comments
 (0)