Skip to content

Commit 044e3bb

Browse files
committed
add workflow/{workflow_id}/tasks endpoint
1 parent b07a765 commit 044e3bb

File tree

3 files changed

+87
-9
lines changed

3 files changed

+87
-9
lines changed

jupyter_scheduler/extension.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from traitlets import Bool, Type, Unicode, default
77

88
from jupyter_scheduler.orm import create_tables
9-
from jupyter_scheduler.workflows import WorkflowHandler, WorkflowRunHandler
9+
from jupyter_scheduler.workflows import WorkflowsHandler, WorkflowsRunHandler, WorkflowsTasksHandler
1010

1111
from .handlers import (
1212
BatchJobHandler,
@@ -37,11 +37,15 @@ class SchedulerApp(ExtensionApp):
3737
(r"scheduler/job_definitions/%s/jobs" % JOB_DEFINITION_ID_REGEX, JobFromDefinitionHandler),
3838
(r"scheduler/runtime_environments", RuntimeEnvironmentsHandler),
3939
(r"scheduler/config", ConfigHandler),
40-
(r"scheduler/worklows", WorkflowHandler),
41-
(fr"scheduler/worklows/{WORKFLOW_ID_REGEX}", WorkflowHandler),
40+
(r"scheduler/worklows", WorkflowsHandler),
41+
(rf"scheduler/worklows/{WORKFLOW_ID_REGEX}", WorkflowsHandler),
4242
(
4343
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/run",
44-
WorkflowRunHandler,
44+
WorkflowsRunHandler,
45+
),
46+
(
47+
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks",
48+
WorkflowsTasksHandler,
4549
),
4650
]
4751

jupyter_scheduler/scheduler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
473473
destination_dir=staging_dir,
474474
)
475475

476-
def create_job(self, model: CreateJob) -> str:
476+
def create_job(self, model: CreateJob, run: bool = True) -> str:
477477
if not model.job_definition_id and not self.file_exists(model.input_uri):
478478
raise InputUriError(model.input_uri)
479479

@@ -514,6 +514,9 @@ def create_job(self, model: CreateJob) -> str:
514514
else:
515515
self.copy_input_file(model.input_uri, staging_paths["input"])
516516

517+
if not run:
518+
return job.job_id
519+
517520
# The MP context forces new processes to not be forked on Linux.
518521
# This is necessary because `asyncio.get_event_loop()` is bugged in
519522
# forked processes in Python versions below 3.12. This method is

jupyter_scheduler/workflows.py

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
ExtensionHandlerMixin,
1515
JobHandlersMixin,
1616
)
17-
from jupyter_scheduler.models import Status
17+
from jupyter_scheduler.models import CreateJob, Status, UpdateJob
1818
from jupyter_scheduler.pydantic_v1 import BaseModel, ValidationError
1919

2020

21-
class WorkflowHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
21+
class WorkflowsHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
2222
@authenticated
2323
async def post(self):
2424
payload = self.get_json_body()
@@ -60,7 +60,76 @@ async def get(self, workflow_id: str = None):
6060
self.finish(workflow.json())
6161

6262

63-
class WorkflowRunHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
63+
class WorkflowsTasksHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
64+
@authenticated
65+
async def post(self, workflow_id: str):
66+
print("WorkflowsTasksHandler post")
67+
payload = self.get_json_body()
68+
if workflow_id != payload.get("workflow_id"):
69+
raise HTTPError(
70+
400,
71+
"Error during workflow job creation. workflow_id in the URL and payload don't match.",
72+
)
73+
try:
74+
job_id = await ensure_async(self.scheduler.create_job(CreateJob(**payload), run=False))
75+
except ValidationError as e:
76+
self.log.exception(e)
77+
raise HTTPError(500, str(e)) from e
78+
except InputUriError as e:
79+
self.log.exception(e)
80+
raise HTTPError(500, str(e)) from e
81+
except IdempotencyTokenError as e:
82+
self.log.exception(e)
83+
raise HTTPError(409, str(e)) from e
84+
except SchedulerError as e:
85+
self.log.exception(e)
86+
raise HTTPError(500, str(e)) from e
87+
except Exception as e:
88+
self.log.exception(e)
89+
raise HTTPError(
90+
500, "Unexpected error occurred during creation of workflow job."
91+
) from e
92+
else:
93+
self.finish(json.dumps(dict(job_id=job_id)))
94+
95+
@authenticated
96+
async def patch(self, workflow_id: str, job_id: str):
97+
payload = self.get_json_body()
98+
if workflow_id != payload.get("workflow_id", None):
99+
raise HTTPError(
100+
400,
101+
"Error during workflow job creation. workflow_id in the URL and payload don't match.",
102+
)
103+
status = payload.get("status")
104+
status = Status(status) if status else None
105+
106+
if status and status != Status.STOPPED:
107+
raise HTTPError(
108+
500,
109+
"Invalid value for field 'status'. Workflow job status can only be updated to status 'STOPPED' after creation.",
110+
)
111+
try:
112+
if status:
113+
await ensure_async(self.scheduler.stop_job(job_id))
114+
else:
115+
await ensure_async(self.scheduler.update_job(job_id, UpdateJob(**payload)))
116+
except ValidationError as e:
117+
self.log.exception(e)
118+
raise HTTPError(500, str(e)) from e
119+
except SchedulerError as e:
120+
self.log.exception(e)
121+
raise HTTPError(500, str(e)) from e
122+
except Exception as e:
123+
self.log.exception(e)
124+
raise HTTPError(
125+
500, "Unexpected error occurred while updating the workflow job."
126+
) from e
127+
else:
128+
self.set_status(204)
129+
self.finish()
130+
131+
132+
class WorkflowsRunHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
64133
@authenticated
65134
async def post(self, workflow_id: str):
66135
try:
@@ -79,7 +148,9 @@ async def post(self, workflow_id: str):
79148
raise HTTPError(500, str(e)) from e
80149
except Exception as e:
81150
self.log.exception(e)
82-
raise HTTPError(500, "Unexpected error occurred during creation of a workflow.") from e
151+
raise HTTPError(
152+
500, "Unexpected error occurred during attempt to run a workflow."
153+
) from e
83154
else:
84155
self.finish(json.dumps(dict(workflow_id=workflow_id)))
85156

0 commit comments

Comments
 (0)