Skip to content

Commit c5da5b0

Browse files
committed
split create_workflow and run_workflow endpoints and methods
1 parent d77310e commit c5da5b0

File tree

3 files changed

+36
-16
lines changed

3 files changed

+36
-16
lines changed

jupyter_scheduler/extension.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from traitlets import Bool, Type, Unicode, default
77

88
from jupyter_scheduler.orm import create_tables
9-
from jupyter_scheduler.workflows import WorkflowHandler
9+
from jupyter_scheduler.workflows import WorkflowHandler, WorkflowRunHandler
1010

1111
from .handlers import (
1212
BatchJobHandler,
@@ -38,9 +38,10 @@ class SchedulerApp(ExtensionApp):
3838
(r"scheduler/runtime_environments", RuntimeEnvironmentsHandler),
3939
(r"scheduler/config", ConfigHandler),
4040
(r"scheduler/worklows", WorkflowHandler),
41-
(r"scheduler/worklows/{}/run".format(WORKFLOW_ID_REGEX, JOB_ID_REGEX), WorkflowHandler),
42-
(r"scheduler/worklows/%s/jobs" % WORKFLOW_ID_REGEX, WorkflowHandler),
43-
(r"scheduler/worklows/{}/jobs/{}".format(WORKFLOW_ID_REGEX, JOB_ID_REGEX), WorkflowHandler),
41+
(
42+
r"scheduler/worklows/{}/run".format(WORKFLOW_ID_REGEX),
43+
WorkflowRunHandler,
44+
),
4445
]
4546

4647
drop_tables = Bool(False, config=True, help="Drop the database tables before starting.")

jupyter_scheduler/scheduler.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -532,23 +532,18 @@ def create_job(self, model: CreateJob) -> str:
532532
return job_id
533533

534534
def create_workflow(self, model: CreateWorkflow) -> str:
535-
536535
with self.db_session() as session:
537-
538536
workflow = Workflow(**model.dict(exclude_none=True))
539-
540537
session.add(workflow)
541538
session.commit()
539+
return workflow.workflow_id
542540

543-
execution_manager = self.execution_manager_class(
544-
workflow_id=workflow.workflow_id,
545-
db_url=self.db_url,
546-
)
547-
execution_manager.process_workflow()
548-
session.commit()
549-
550-
workflow_id = workflow.workflow_id
551-
541+
def run_workflow(self, workflow_id: str) -> str:
542+
execution_manager = self.execution_manager_class(
543+
workflow_id=workflow_id,
544+
db_url=self.db_url,
545+
)
546+
execution_manager.process_workflow()
552547
return workflow_id
553548

554549
def update_job(self, job_id: str, model: UpdateJob):

jupyter_scheduler/workflows.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,30 @@ async def post(self):
4545
self.finish(json.dumps(dict(workflow_id=workflow_id)))
4646

4747

48+
class WorkflowRunHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
49+
@authenticated
50+
async def post(self, workflow_id: str):
51+
try:
52+
workflow_id = await ensure_async(self.scheduler.run_workflow(workflow_id))
53+
except ValidationError as e:
54+
self.log.exception(e)
55+
raise HTTPError(500, str(e)) from e
56+
except InputUriError as e:
57+
self.log.exception(e)
58+
raise HTTPError(500, str(e)) from e
59+
except IdempotencyTokenError as e:
60+
self.log.exception(e)
61+
raise HTTPError(409, str(e)) from e
62+
except SchedulerError as e:
63+
self.log.exception(e)
64+
raise HTTPError(500, str(e)) from e
65+
except Exception as e:
66+
self.log.exception(e)
67+
raise HTTPError(500, "Unexpected error occurred during creation of a workflow.") from e
68+
else:
69+
self.finish(json.dumps(dict(workflow_id=workflow_id)))
70+
71+
4872
class CreateWorkflow(BaseModel):
4973
tasks: List[str]
5074

0 commit comments

Comments
 (0)