Skip to content

Commit 5a961d5

Browse files
committed
remove staging to output copying logic from executor
1 parent ecc020f commit 5a961d5

File tree

3 files changed

+6
-29
lines changed

3 files changed

+6
-29
lines changed

jupyter_scheduler/executors.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from jupyter_scheduler.models import DescribeJob, JobFeature, JobFile, Status
1515
from jupyter_scheduler.orm import Job, create_session
1616
from jupyter_scheduler.parameterize import add_parameters
17-
from jupyter_scheduler.utils import copy_directory, get_utc_timestamp
17+
from jupyter_scheduler.utils import get_utc_timestamp
1818

1919

2020
class ExecutionManager(ABC):
@@ -29,19 +29,11 @@ class ExecutionManager(ABC):
2929
_model = None
3030
_db_session = None
3131

32-
def __init__(
33-
self,
34-
job_id: str,
35-
root_dir: str,
36-
db_url: str,
37-
staging_paths: Dict[str, str],
38-
output_dir: str,
39-
):
32+
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
4033
self.job_id = job_id
4134
self.staging_paths = staging_paths
4235
self.root_dir = root_dir
4336
self.db_url = db_url
44-
self.output_dir = output_dir
4537

4638
@property
4739
def model(self):
@@ -155,7 +147,6 @@ def execute(self):
155147
output, _ = cls().from_notebook_node(nb)
156148
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
157149
f.write(output)
158-
self.copy_staged_files_to_output()
159150

160151
def add_side_effects_files(self, staging_dir):
161152
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
@@ -179,12 +170,6 @@ def add_side_effects_files(self, staging_dir):
179170
)
180171
session.commit()
181172

182-
# TODO: copy via downloader and remove this function or use this function and utilize return for putting side efects into packaged_files
183-
def copy_staged_files_to_output(self):
184-
"""Copies snapshot of the original notebook and staged input files from the staging directory to the output directory and includes them into job_files."""
185-
staging_dir = os.path.dirname(self.staging_paths["input"])
186-
copy_directory(source_dir=staging_dir, destination_dir=self.output_dir)
187-
188173
def supported_features(cls) -> Dict[JobFeature, bool]:
189174
return {
190175
JobFeature.job_name: True,

jupyter_scheduler/job_files_manager.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
2121
job = await ensure_async(self.scheduler.get_job(job_id, False))
2222
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
2323
output_filenames = self.scheduler.get_job_filenames(job)
24-
output_dir = self.scheduler.get_local_output_path(
25-
input_filename=job.input_filename, job_id=job.job_id, root_dir_relative=True
26-
)
24+
output_dir = self.scheduler.get_local_output_path(model=job, root_dir_relative=True)
2725

2826
p = Process(
2927
target=Downloader(

jupyter_scheduler/scheduler.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -319,9 +319,7 @@ def add_job_files(self, model: DescribeJob):
319319
mapping = self.environments_manager.output_formats_mapping()
320320
job_files = []
321321
output_filenames = self.get_job_filenames(model)
322-
output_dir = self.get_local_output_path(
323-
input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True
324-
)
322+
output_dir = self.get_local_output_path(model, root_dir_relative=True)
325323

326324
for output_format in model.output_formats:
327325
filename = output_filenames[output_format]
@@ -369,13 +367,13 @@ def add_job_files(self, model: DescribeJob):
369367
)
370368

371369
def get_local_output_path(
372-
self, input_filename: str, job_id: str, root_dir_relative: Optional[bool] = False
370+
self, model: DescribeJob, root_dir_relative: Optional[bool] = False
373371
) -> str:
374372
"""Returns the local output directory path
375373
where all the job files will be downloaded
376374
from the staging location.
377375
"""
378-
output_dir_name = create_output_directory(input_filename, job_id)
376+
output_dir_name = create_output_directory(model.input_filename, model.job_id)
379377
if root_dir_relative:
380378
return os.path.relpath(
381379
os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir
@@ -494,10 +492,6 @@ def create_job(self, model: CreateJob) -> str:
494492
staging_paths=staging_paths,
495493
root_dir=self.root_dir,
496494
db_url=self.db_url,
497-
output_dir=self.get_local_output_path(
498-
input_filename=model.input_filename,
499-
job_id=job.job_id,
500-
),
501495
).process
502496
)
503497
p.start()

0 commit comments

Comments
 (0)