Skip to content

Commit 6466c48

Browse files
committed
add workflow definitions
1 parent 650a566 commit 6466c48

File tree

3 files changed

+133
-0
lines changed

3 files changed

+133
-0
lines changed

jupyter_scheduler/extension.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
from jupyter_scheduler.orm import create_tables
99
from jupyter_scheduler.workflows import (
10+
WorkflowDefinitionsHandler,
11+
WorkflowDefinitionsTasksHandler,
1012
WorkflowsHandler,
1113
WorkflowsRunHandler,
1214
WorkflowsTasksHandler,
@@ -25,6 +27,7 @@
2527

2628
JOB_DEFINITION_ID_REGEX = r"(?P<job_definition_id>\w+(?:-\w+)+)"
2729
JOB_ID_REGEX = r"(?P<job_id>\w+(?:-\w+)+)"
30+
WORKFLOW_DEFINITION_ID_REGEX = r"(?P<workflow_definition_id>\w+(?:-\w+)+)"
2831
WORKFLOW_ID_REGEX = r"(?P<workflow_id>\w+(?:-\w+)+)"
2932

3033

@@ -51,6 +54,18 @@ class SchedulerApp(ExtensionApp):
5154
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks",
5255
WorkflowsTasksHandler,
5356
),
57+
(
58+
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}",
59+
WorkflowDefinitionsHandler,
60+
),
61+
(
62+
rf"scheduler/worklows/{WORKFLOW_DEFINITION_ID_REGEX}/run",
63+
WorkflowDefinitionsHandler,
64+
),
65+
(
66+
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks",
67+
WorkflowDefinitionsTasksHandler,
68+
),
5469
]
5570

5671
drop_tables = Bool(False, config=True, help="Drop the database tables before starting.")

jupyter_scheduler/orm.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,19 @@ class Workflow(Base):
120120
# Any default values specified for new columns will be ignored during the migration process.
121121

122122

123+
class WorkflowDefinition(Base):
124+
__tablename__ = "workflow_definitions"
125+
__table_args__ = {"extend_existing": True}
126+
workflow_id = Column(String(36), primary_key=True, default=generate_uuid)
127+
tasks = Column(JsonType)
128+
status = Column(String(64), default=Status.CREATED)
129+
active = Column(Boolean, default=False)
130+
schedule = Column(String(256))
131+
timezone = Column(String(36))
132+
# All new columns added to this table must be nullable to ensure compatibility during database migrations.
133+
# Any default values specified for new columns will be ignored during the migration process.
134+
135+
123136
class JobDefinition(CommonColumns, Base):
124137
__tablename__ = "job_definitions"
125138
__table_args__ = {"extend_existing": True}

jupyter_scheduler/workflows.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,79 @@ async def post(self, workflow_id: str):
148148
self.finish(json.dumps(dict(workflow_id=workflow_id)))
149149

150150

151+
class WorkflowDefinitionsHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
152+
@authenticated
153+
async def post(self):
154+
payload = self.get_json_body() or {}
155+
try:
156+
workflow_id = await ensure_async(
157+
self.scheduler.create_workflow(CreateWorkflow(**payload))
158+
)
159+
except ValidationError as e:
160+
self.log.exception(e)
161+
raise HTTPError(500, str(e)) from e
162+
except InputUriError as e:
163+
self.log.exception(e)
164+
raise HTTPError(500, str(e)) from e
165+
except IdempotencyTokenError as e:
166+
self.log.exception(e)
167+
raise HTTPError(409, str(e)) from e
168+
except SchedulerError as e:
169+
self.log.exception(e)
170+
raise HTTPError(500, str(e)) from e
171+
except Exception as e:
172+
self.log.exception(e)
173+
raise HTTPError(500, "Unexpected error occurred during creation of a workflow.") from e
174+
else:
175+
self.finish(json.dumps(dict(workflow_id=workflow_id)))
176+
177+
@authenticated
178+
async def get(self, workflow_id: str = None):
179+
if not workflow_id:
180+
raise HTTPError(400, "Missing workflow_id in the request URL.")
181+
try:
182+
workflow = await ensure_async(self.scheduler.get_workflow(workflow_id))
183+
except SchedulerError as e:
184+
self.log.exception(e)
185+
raise HTTPError(500, str(e)) from e
186+
except Exception as e:
187+
self.log.exception(e)
188+
raise HTTPError(500, "Unexpected error occurred while getting workflow details.") from e
189+
else:
190+
self.finish(workflow.json())
191+
192+
193+
class WorkflowDefinitionsTasksHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
194+
@authenticated
195+
async def post(self, workflow_id: str):
196+
payload = self.get_json_body()
197+
try:
198+
task_id = await ensure_async(
199+
self.scheduler.create_workflow_task(
200+
workflow_id=workflow_id, model=CreateJob(**payload)
201+
)
202+
)
203+
except ValidationError as e:
204+
self.log.exception(e)
205+
raise HTTPError(500, str(e)) from e
206+
except InputUriError as e:
207+
self.log.exception(e)
208+
raise HTTPError(500, str(e)) from e
209+
except IdempotencyTokenError as e:
210+
self.log.exception(e)
211+
raise HTTPError(409, str(e)) from e
212+
except SchedulerError as e:
213+
self.log.exception(e)
214+
raise HTTPError(500, str(e)) from e
215+
except Exception as e:
216+
self.log.exception(e)
217+
raise HTTPError(
218+
500, "Unexpected error occurred during creation of workflow job."
219+
) from e
220+
else:
221+
self.finish(json.dumps(dict(task_id=task_id)))
222+
223+
151224
class CreateWorkflow(BaseModel):
152225
tasks: List[str] = []
153226

@@ -169,3 +242,35 @@ class UpdateWorkflow(BaseModel):
169242

170243
class Config:
171244
orm_mode = True
245+
246+
247+
class CreateWorkflowDefinition(BaseModel):
248+
tasks: List[str] = []
249+
# any field added to CreateWorkflow should also be added to this model as well
250+
schedule: Optional[str] = None
251+
timezone: Optional[str] = None
252+
253+
class Config:
254+
orm_mode = True
255+
256+
257+
class DescribeWorkflowDefinition(BaseModel):
258+
workflow_definition_id: str
259+
tasks: List[str] = None
260+
schedule: Optional[str] = None
261+
timezone: Optional[str] = None
262+
status: Status = Status.CREATED
263+
active: Optional[bool] = None
264+
265+
class Config:
266+
orm_mode = True
267+
268+
269+
class UpdateWorkflowDefinition(BaseModel):
270+
tasks: Optional[List[str]] = None
271+
schedule: Optional[str] = None
272+
timezone: Optional[str] = None
273+
active: Optional[bool] = None
274+
275+
class Config:
276+
orm_mode = True

0 commit comments

Comments
 (0)