Skip to content

Commit 73eef13

Browse files
committed
use dask for input file / folder copying, make copy functions standalone
1 parent 1c5a54d commit 73eef13

File tree

2 files changed

+32
-22
lines changed

2 files changed

+32
-22
lines changed

jupyter_scheduler/scheduler.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
)
4141
from jupyter_scheduler.orm import Job, JobDefinition, create_session
4242
from jupyter_scheduler.utils import (
43-
copy_directory,
43+
copy_input_file,
44+
copy_input_folder,
4445
create_output_directory,
4546
create_output_filename,
4647
)
@@ -427,22 +428,6 @@ def db_session(self):
427428

428429
return self._db_session
429430

430-
def copy_input_file(self, input_uri: str, copy_to_path: str):
431-
"""Copies the input file to the staging directory"""
432-
input_filepath = os.path.join(self.root_dir, input_uri)
433-
with fsspec.open(input_filepath) as input_file:
434-
with fsspec.open(copy_to_path, "wb") as output_file:
435-
output_file.write(input_file.read())
436-
437-
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
438-
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
439-
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
440-
staging_dir = os.path.dirname(nb_copy_to_path)
441-
return copy_directory(
442-
source_dir=input_dir_path,
443-
destination_dir=staging_dir,
444-
)
445-
446431
async def create_job(self, model: CreateJob) -> str:
447432
if not model.job_definition_id and not self.file_exists(model.input_uri):
448433
raise InputUriError(model.input_uri)
@@ -473,19 +458,26 @@ async def create_job(self, model: CreateJob) -> str:
473458
session.add(job)
474459
session.commit()
475460

461+
dask: DaskClient = await self.dask_client_future
462+
476463
staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
477464
if model.package_input_folder:
478-
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
465+
copy_future = dask.submit(
466+
copy_input_folder, self.root_dir, model.input_uri, staging_paths["input"]
467+
)
468+
copied_files = await dask.gather(copy_future)
479469
input_notebook_filename = os.path.basename(model.input_uri)
480470
job.packaged_files = [
481471
file for file in copied_files if file != input_notebook_filename
482472
]
483473
session.commit()
484474
else:
485-
self.copy_input_file(model.input_uri, staging_paths["input"])
475+
copy_future = dask.submit(
476+
copy_input_file, self.root_dir, model.input_uri, staging_paths["input"]
477+
)
478+
await dask.gather(copy_future)
486479

487-
dask_client: DaskClient = await self.dask_client_future
488-
future = dask_client.submit(
480+
process_future = dask.submit(
489481
self.execution_manager_class(
490482
job_id=job.job_id,
491483
staging_paths=staging_paths,
@@ -494,7 +486,7 @@ async def create_job(self, model: CreateJob) -> str:
494486
).process
495487
)
496488

497-
job.pid = future.key
489+
job.pid = process_future.key
498490
session.commit()
499491

500492
job_id = job.job_id

jupyter_scheduler/utils.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,21 @@ def copy_directory(
113113
copied_files.append(rel_path)
114114

115115
return copied_files
116+
117+
118+
def copy_input_file(root_dir: str, input_uri: str, copy_to_path: str):
119+
"""Copies the input file to the staging directory"""
120+
input_filepath = os.path.join(root_dir, input_uri)
121+
with fsspec.open(input_filepath) as input_file:
122+
with fsspec.open(copy_to_path, "wb") as output_file:
123+
output_file.write(input_file.read())
124+
125+
126+
def copy_input_folder(root_dir: str, input_uri: str, nb_copy_to_path: str) -> List[str]:
127+
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
128+
input_dir_path = os.path.dirname(os.path.join(root_dir, input_uri))
129+
staging_dir = os.path.dirname(nb_copy_to_path)
130+
return copy_directory(
131+
source_dir=input_dir_path,
132+
destination_dir=staging_dir,
133+
)

0 commit comments

Comments
 (0)