Skip to content

Commit ecc020f

Browse files
committed
copy staging folder and side effects to output after job runs, track and redownload files
1 parent 03cc668 commit ecc020f

File tree

10 files changed

+130
-76
lines changed

10 files changed

+130
-76
lines changed

jupyter_scheduler/executors.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,27 +149,41 @@ def execute(self):
149149
except CellExecutionError as e:
150150
raise e
151151
finally:
152+
self.add_side_effects_files(staging_dir)
152153
for output_format in job.output_formats:
153154
cls = nbconvert.get_exporter(output_format)
154155
output, _ = cls().from_notebook_node(nb)
155156
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
156157
f.write(output)
157-
158-
self.copy_staged_files_to_output()
159-
158+
self.copy_staged_files_to_output()
159+
160+
def add_side_effects_files(self, staging_dir):
161+
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
162+
input_notebook = os.path.relpath(self.staging_paths["input"])
163+
new_files_set = set()
164+
for root, _, files in os.walk(staging_dir):
165+
for file in files:
166+
file_rel_path = os.path.relpath(os.path.join(root, file), staging_dir)
167+
if file_rel_path != input_notebook:
168+
new_files_set.add(file_rel_path)
169+
170+
if new_files_set:
171+
with self.db_session() as session:
172+
current_packaged_files_set = set(
173+
session.query(Job.packaged_files).filter(Job.job_id == self.job_id).scalar()
174+
or []
175+
)
176+
updated_packaged_files = list(current_packaged_files_set.union(new_files_set))
177+
session.query(Job).filter(Job.job_id == self.job_id).update(
178+
{"packaged_files": updated_packaged_files}
179+
)
180+
session.commit()
181+
182+
# TODO: copy via downloader and remove this function or use this function and utilize return for putting side efects into packaged_files
160183
def copy_staged_files_to_output(self):
161184
"""Copies snapshot of the original notebook and staged input files from the staging directory to the output directory and includes them into job_files."""
162185
staging_dir = os.path.dirname(self.staging_paths["input"])
163-
copied_files = copy_directory(
164-
source_dir=staging_dir, destination_dir=self.output_dir, base_dir=self.root_dir
165-
)
166-
167-
if copied_files:
168-
for rel_path in copied_files:
169-
if not any(job_file.file_path == rel_path for job_file in self.model.job_files):
170-
self.model.job_files.append(
171-
JobFile(display_name="File", file_format="File", file_path=rel_path)
172-
)
186+
copy_directory(source_dir=staging_dir, destination_dir=self.output_dir)
173187

174188
def supported_features(cls) -> Dict[JobFeature, bool]:
175189
return {

jupyter_scheduler/job_files_manager.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,17 @@ def __init__(
5757

5858
def generate_filepaths(self):
5959
"""A generator that produces filepaths"""
60+
output_formats = self.output_formats + ["input"]
61+
for output_format in output_formats:
62+
input_filepath = self.staging_paths[output_format]
63+
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
64+
if not os.path.exists(output_filepath) or self.redownload:
65+
yield input_filepath, output_filepath
6066
if self.include_staging_files:
6167
staging_dir = os.path.dirname(self.staging_paths["input"])
62-
for root, _, files in os.walk(staging_dir):
63-
for file in files:
64-
input_filepath = os.path.join(root, file)
65-
relative_path = os.path.relpath(input_filepath, staging_dir)
66-
output_filepath = os.path.join(self.output_dir, relative_path)
67-
yield input_filepath, output_filepath
68-
else:
69-
output_formats = self.output_formats + ["input"]
70-
for output_format in output_formats:
71-
input_filepath = self.staging_paths[output_format]
72-
output_filepath = os.path.join(
73-
self.output_dir, self.output_filenames[output_format]
74-
)
68+
for file_relative_path in self.output_filenames["files"]:
69+
input_filepath = os.path.join(staging_dir, file_relative_path)
70+
output_filepath = os.path.join(self.output_dir, file_relative_path)
7571
if not os.path.exists(output_filepath) or self.redownload:
7672
yield input_filepath, output_filepath
7773

jupyter_scheduler/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class DescribeJob(BaseModel):
147147
status_message: Optional[str] = None
148148
downloaded: bool = False
149149
package_input_folder: Optional[bool] = None
150-
output_folder: Optional[str] = None
150+
packaged_files: Optional[List[str]] = []
151151

152152
class Config:
153153
orm_mode = True

jupyter_scheduler/orm.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class CommonColumns:
8686
update_time = Column(Integer, default=get_utc_timestamp, onupdate=get_utc_timestamp)
8787
create_time = Column(Integer, default=get_utc_timestamp)
8888
package_input_folder = Column(Boolean)
89+
packaged_files = Column(JsonType, default=[])
8990

9091

9192
class Job(CommonColumns, Base):

jupyter_scheduler/scheduler.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
import random
44
import shutil
5-
from typing import Dict, Optional, Type, Union
5+
from typing import Dict, List, Optional, Type, Union
66

77
import fsspec
88
import psutil
@@ -252,7 +252,29 @@ def file_exists(self, path: str):
252252
else:
253253
return os.path.isfile(os_path)
254254

255-
def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
255+
def dir_exists(self, path: str):
256+
"""Returns True if the directory exists, else returns False.
257+
258+
API-style wrapper for os.path.isdir
259+
260+
Parameters
261+
----------
262+
path : string
263+
The relative path to the directory (with '/' as separator)
264+
265+
Returns
266+
-------
267+
exists : bool
268+
Whether the directory exists.
269+
"""
270+
root = os.path.abspath(self.root_dir)
271+
os_path = to_os_path(path, root)
272+
if not (os.path.abspath(os_path) + os.path.sep).startswith(root):
273+
return False
274+
else:
275+
return os.path.isdir(os_path)
276+
277+
def get_job_filenames(self, model: DescribeJob) -> Dict[str, Union[str, List[str]]]:
256278
"""Returns dictionary mapping output formats to
257279
the job filenames in the JupyterLab workspace.
258280
@@ -269,7 +291,8 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
269291
{
270292
'ipynb': 'helloworld-2022-10-10.ipynb',
271293
'html': 'helloworld-2022-10-10.html',
272-
'input': 'helloworld.ipynb'
294+
'input': 'helloworld.ipynb',
295+
'files': ['data/helloworld.csv', 'images/helloworld.png']
273296
}
274297
275298
"""
@@ -282,6 +305,9 @@ def get_job_filenames(self, model: DescribeJob) -> Dict[str, str]:
282305

283306
filenames["input"] = model.input_filename
284307

308+
if model.package_input_folder and model.packaged_files:
309+
filenames["files"] = [relative_path for relative_path in model.packaged_files]
310+
285311
return filenames
286312

287313
def add_job_files(self, model: DescribeJob):
@@ -320,8 +346,27 @@ def add_job_files(self, model: DescribeJob):
320346
)
321347
)
322348

349+
# Add link to output folder with packaged input files and side effects
350+
if model.package_input_folder and model.packaged_files:
351+
job_files.append(
352+
JobFile(
353+
display_name="Files",
354+
file_format="files",
355+
file_path=output_dir if self.dir_exists(output_dir) else None,
356+
)
357+
)
358+
323359
model.job_files = job_files
324-
model.downloaded = all(job_file.file_path for job_file in job_files)
360+
361+
packaged_files = []
362+
if model.package_input_folder and model.packaged_files:
363+
packaged_files = [
364+
os.path.join(output_dir, packaged_file_rel_path)
365+
for packaged_file_rel_path in model.packaged_files
366+
]
367+
model.downloaded = all(job_file.file_path for job_file in job_files) and all(
368+
self.file_exists(file_path) for file_path in packaged_files
369+
)
325370

326371
def get_local_output_path(
327372
self, input_filename: str, job_id: str, root_dir_relative: Optional[bool] = False
@@ -385,11 +430,11 @@ def copy_input_file(self, input_uri: str, copy_to_path: str):
385430
with fsspec.open(copy_to_path, "wb") as output_file:
386431
output_file.write(input_file.read())
387432

388-
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str):
389-
"""Copies the input file along with the input directory to the staging directory"""
433+
def copy_input_folder(self, input_uri: str, nb_copy_to_path: str) -> List[str]:
434+
"""Copies the input file along with the input directory to the staging directory, returns the list of copied files relative to the staging directory"""
390435
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
391436
staging_dir = os.path.dirname(nb_copy_to_path)
392-
copy_directory(
437+
return copy_directory(
393438
source_dir=input_dir_path,
394439
destination_dir=staging_dir,
395440
)
@@ -420,12 +465,18 @@ def create_job(self, model: CreateJob) -> str:
420465
model.output_formats = []
421466

422467
job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))
468+
423469
session.add(job)
424470
session.commit()
425471

426472
staging_paths = self.get_staging_paths(DescribeJob.from_orm(job))
427473
if model.package_input_folder:
428-
self.copy_input_folder(model.input_uri, staging_paths["input"])
474+
copied_files = self.copy_input_folder(model.input_uri, staging_paths["input"])
475+
input_notebook_filename = os.path.basename(model.input_uri)
476+
job.packaged_files = [
477+
file for file in copied_files if file != input_notebook_filename
478+
]
479+
session.commit()
429480
else:
430481
self.copy_input_file(model.input_uri, staging_paths["input"])
431482

@@ -497,10 +548,6 @@ def list_jobs(self, query: ListJobsQuery) -> ListJobsResponse:
497548
for job in jobs:
498549
model = DescribeJob.from_orm(job)
499550
self.add_job_files(model=model)
500-
if model.package_input_folder:
501-
model.output_folder = self.get_local_output_path(
502-
input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True
503-
)
504551
jobs_list.append(model)
505552

506553
list_jobs_response = ListJobsResponse(

jupyter_scheduler/utils.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,10 @@ def get_localized_timestamp(timezone) -> int:
9090
def copy_directory(
9191
source_dir: str,
9292
destination_dir: str,
93-
base_dir: Optional[str] = None,
9493
exclude_files: Optional[List[str]] = [],
9594
) -> List[str]:
9695
"""Copies content of source_dir to destination_dir excluding exclude_files.
97-
Returns a list of relative paths to copied files, relative to base_dir if provided.
96+
Returns a list of relative paths to copied files from destination_dir.
9897
"""
9998
copied_files = []
10099
for item in os.listdir(source_dir):
@@ -104,15 +103,13 @@ def copy_directory(
104103
shutil.copytree(source, destination, ignore=shutil.ignore_patterns(*exclude_files))
105104
for dirpath, _, filenames in os.walk(destination):
106105
for filename in filenames:
107-
rel_path = os.path.relpath(
108-
os.path.join(dirpath, filename), base_dir if base_dir else destination_dir
109-
)
106+
rel_path = os.path.relpath(os.path.join(dirpath, filename), destination_dir)
110107
copied_files.append(rel_path)
111108
elif os.path.isfile(source) and item not in exclude_files:
112109
with fsspec.open(source, "rb") as source_file:
113110
with fsspec.open(destination, "wb") as output_file:
114111
output_file.write(source_file.read())
115-
rel_path = os.path.relpath(destination, base_dir if base_dir else destination_dir)
112+
rel_path = os.path.relpath(destination, destination_dir)
116113
copied_files.append(rel_path)
117114

118115
return copied_files
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import React from 'react';
2+
import { Scheduler } from '../handler';
3+
import { JupyterFrontEnd } from '@jupyterlab/application';
4+
import { useTranslator } from '../hooks';
5+
import { Link } from '@mui/material';
6+
7+
export function FilesDirectoryLink(props: {
8+
jobFile: Scheduler.IJobFile;
9+
app: JupyterFrontEnd;
10+
}): JSX.Element | null {
11+
const trans = useTranslator('jupyterlab');
12+
return (
13+
<Link
14+
href={`/lab/tree/${props.jobFile.file_path}`}
15+
title={trans.__('Open output directory with files')}
16+
onClick={e => {
17+
e.preventDefault();
18+
props.app.commands.execute('filebrowser:open-path', {
19+
path: props.jobFile.file_path
20+
});
21+
}}
22+
>
23+
{trans.__('Files')}
24+
</Link>
25+
);
26+
}

src/components/job-file-link.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { useEventLogger, useTranslator } from '../hooks';
66
import { JupyterFrontEnd } from '@jupyterlab/application';
77

88
import { Link } from '@mui/material';
9+
import { FilesDirectoryLink } from './files-directory-link';
910

1011
export interface IJobFileLinkProps {
1112
jobFile: Scheduler.IJobFile;
@@ -30,6 +31,9 @@ export function JobFileLink(props: IJobFileLinkProps): JSX.Element | null {
3031
? trans.__('Open input file "%1"', fileBaseName)
3132
: trans.__('Open output file "%1"', fileBaseName);
3233

34+
if (props.jobFile.file_format === 'files') {
35+
return <FilesDirectoryLink jobFile={props.jobFile} app={props.app} />;
36+
}
3337
return (
3438
<Link
3539
key={props.jobFile.file_format}

src/components/job-row.tsx

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,33 +75,6 @@ function JobFiles(props: {
7575
);
7676
}
7777

78-
function FilesDirectoryLink(props: {
79-
job: Scheduler.IDescribeJob;
80-
app: JupyterFrontEnd;
81-
}): JSX.Element | null {
82-
if (!props.job.package_input_folder || !props.job.output_folder) {
83-
return null;
84-
}
85-
const trans = useTranslator('jupyterlab');
86-
return (
87-
<Link
88-
href={`/lab/tree/${props.job.output_folder}`}
89-
title={trans.__(
90-
'Open output directory with files for "%1"',
91-
props.job.name
92-
)}
93-
onClick={e => {
94-
e.preventDefault();
95-
props.app.commands.execute('filebrowser:open-path', {
96-
path: props.job.output_folder
97-
});
98-
}}
99-
>
100-
{trans.__('Files')}
101-
</Link>
102-
);
103-
}
104-
10578
type DownloadFilesButtonProps = {
10679
app: JupyterFrontEnd;
10780
job: Scheduler.IDescribeJob;
@@ -194,9 +167,6 @@ export function buildJobRow(
194167
/>
195168
)}
196169
<JobFiles job={job} app={app} />
197-
{(job.status === 'COMPLETED' || job.status === 'FAILED') && (
198-
<FilesDirectoryLink job={job} app={app} />
199-
)}
200170
</>,
201171
<Timestamp job={job} />,
202172
translateStatus(job.status),

src/handler.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ export namespace Scheduler {
467467
end_time?: number;
468468
downloaded: boolean;
469469
package_input_folder?: boolean;
470-
output_folder?: string;
471470
}
472471

473472
export interface ICreateJobResponse {

0 commit comments

Comments
 (0)