Skip to content

Commit 69b34ca

Browse files
committed
Use prefect task to execute jobs, download files. Add flow and run names
1 parent b9c466b commit 69b34ca

File tree

3 files changed

+23
-32
lines changed

3 files changed

+23
-32
lines changed

jupyter_scheduler/executors.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def on_complete_workflow(self):
188188
class DefaultExecutionManager(ExecutionManager):
189189
"""Default execution manager that executes notebooks"""
190190

191-
@task
191+
@task(name="Execute workflow task")
192192
def execute_task(self, job: Job):
193193
with self.db_session() as session:
194194
staging_paths = Scheduler.get_staging_paths(DescribeJob.from_orm(job))
@@ -206,14 +206,14 @@ def execute_task(self, job: Job):
206206

207207
return job_id
208208

209-
@task
209+
@task(name="Get workflow task records")
210210
def get_tasks_records(self, task_ids: List[str]) -> List[Job]:
211211
with self.db_session() as session:
212212
tasks = session.query(Job).filter(Job.job_id.in_(task_ids)).all()
213213

214214
return tasks
215215

216-
@flow
216+
@flow(name="Execute workflow", flow_run_name="Execute workflow run")
217217
def execute_workflow(self):
218218
tasks_info: List[Job] = self.get_tasks_records(self.model.tasks)
219219
tasks = {task.job_id: task for task in tasks_info}
@@ -232,6 +232,7 @@ def make_task(task_id):
232232
for future in as_completed(final_tasks):
233233
future.result()
234234

235+
@flow(name="Execute job", flow_run_name="Execute job run")
235236
def execute(self):
236237
job = self.model
237238

@@ -254,6 +255,7 @@ def execute(self):
254255
self.add_side_effects_files(staging_dir)
255256
self.create_output_files(job, nb)
256257

258+
@task(name="Check for and add side effect files")
257259
def add_side_effects_files(self, staging_dir: str):
258260
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
259261
input_notebook = os.path.relpath(self.staging_paths["input"])
@@ -276,6 +278,7 @@ def add_side_effects_files(self, staging_dir: str):
276278
)
277279
session.commit()
278280

281+
@task(name="Create output files")
279282
def create_output_files(self, job: DescribeJob, notebook_node):
280283
for output_format in job.output_formats:
281284
cls = nbconvert.get_exporter(output_format)

jupyter_scheduler/job_files_manager.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import fsspec
88
from jupyter_server.utils import ensure_async
9+
from prefect import task
910

1011
from jupyter_scheduler.exceptions import SchedulerError
1112
from jupyter_scheduler.scheduler import BaseScheduler
@@ -23,17 +24,14 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
2324
output_filenames = self.scheduler.get_job_filenames(job)
2425
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)
2526

26-
p = Process(
27-
target=Downloader(
28-
output_formats=job.output_formats,
29-
output_filenames=output_filenames,
30-
staging_paths=staging_paths,
31-
output_dir=output_dir,
32-
redownload=redownload,
33-
include_staging_files=job.package_input_folder,
34-
).download
35-
)
36-
p.start()
27+
target = Downloader(
28+
output_formats=job.output_formats,
29+
output_filenames=output_filenames,
30+
staging_paths=staging_paths,
31+
output_dir=output_dir,
32+
redownload=redownload,
33+
include_staging_files=job.package_input_folder,
34+
).download
3735

3836

3937
class Downloader:
@@ -77,6 +75,7 @@ def download_tar(self, archive_format: str = "tar"):
7775
with tarfile.open(fileobj=f, mode=read_mode) as tar:
7876
tar.extractall(self.output_dir)
7977

78+
@task(name="Download job files")
8079
def download(self):
8180
# ensure presence of staging paths
8281
if not self.staging_paths:

jupyter_scheduler/scheduler.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import multiprocessing as mp
21
import os
32
import random
43
import shutil
@@ -526,25 +525,15 @@ def create_job(self, model: CreateJob, run: bool = True) -> str:
526525

527526
def run_job(self, job: Job, staging_paths: Dict[str, str]) -> str:
528527
with self.db_session() as session:
529-
# The MP context forces new processes to not be forked on Linux.
530-
# This is necessary because `asyncio.get_event_loop()` is bugged in
531-
# forked processes in Python versions below 3.12. This method is
532-
# called by `jupyter_core` by `nbconvert` in the default executor.
533-
#
534-
# See: https://github.com/python/cpython/issues/66285
535-
# See also: https://github.com/jupyter/jupyter_core/pull/362
536-
mp_ctx = mp.get_context("spawn")
537-
p = mp_ctx.Process(
538-
target=self.execution_manager_class(
539-
job_id=job.job_id,
540-
staging_paths=staging_paths,
541-
root_dir=self.root_dir,
542-
db_url=self.db_url,
543-
).process
528+
execution_manager = self.execution_manager_class(
529+
job_id=job.job_id,
530+
staging_paths=staging_paths,
531+
root_dir=self.root_dir,
532+
db_url=self.db_url,
544533
)
545-
p.start()
534+
execution_manager.process()
546535

547-
job.pid = p.pid
536+
job.pid = 1 # TODO: fix pid hardcode
548537
session.commit()
549538

550539
job_id = job.job_id

0 commit comments

Comments
 (0)