Skip to content

Commit f6066bd

Browse files
JasonWeillpre-commit-ci[bot]dlqqq
authored
[1.x] Archiving scheduler, fix JFM tests (#418)
* Archiving all-files scheduler (#388) * Fix typo in comment * WIP: Adds new scheduler * writes individual files * WIP: Write zip file * WIP: Trying to get zip file to be written only on scheduled job runs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * WIP: Removes zip type, incremental work for archiving work dir * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Create tar.gz in staging subdir * Capture side effect files in staging dir * Extracts files * Add filter * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update jupyter_scheduler/job_files_manager.py Co-authored-by: david qiu <[email protected]> * Simplifies cleanup logic * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Updates docs, deletes old Archiving*, renames AllFilesArchiving --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: david qiu <[email protected]> * Avoids option compatible only with Python 3.11 * Fix JFM tests (#424) * fix JFM tests * pre-commit * add minor comment --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: david qiu <[email protected]>
1 parent 3efcbb4 commit f6066bd

File tree

5 files changed

+79
-45
lines changed

5 files changed

+79
-45
lines changed

docs/operators/index.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,20 @@ jupyter lab --SchedulerApp.job_files_manager_class=jupyter_scheduler.job_files_m
8686

8787
For more information on writing a custom implementation, please see the {doc}`developer's guide </developers/index>`.
8888

89+
### Example: Capturing side effect files
90+
91+
The default scheduler and execution manager classes do not capture
92+
**side effect files**, files that are created as a side effect of executing
93+
cells in a notebook. The `ArchivingScheduler` and `ArchivingExecutionManager`
94+
classes do capture side effect files. If you intend to run notebooks that produce
95+
side effect files, you can use these classes by running:
96+
97+
```
98+
jupyter lab \
99+
--SchedulerApp.scheduler_class=jupyter_scheduler.scheduler.ArchivingScheduler \
100+
--Scheduler.execution_manager_class=jupyter_scheduler.executors.ArchivingExecutionManager
101+
```
102+
89103
## UI configuration
90104

91105
You can configure the Jupyter Scheduler UI by installing a lab extension that both:

jupyter_scheduler/executors.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import io
2+
import os
3+
import shutil
24
import tarfile
35
import traceback
46
from abc import ABC, abstractmethod
@@ -174,12 +176,12 @@ def validate(cls, input_path: str) -> bool:
174176

175177

176178
class ArchivingExecutionManager(DefaultExecutionManager):
177-
"""Execution manager that archives the output
178-
files to a compressed tar file.
179+
"""Execution manager that archives all output files in and under the
180+
output directory into a single archive file
179181
180182
Notes
181183
-----
182-
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchiveDownloadingScheduler`
184+
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchivingScheduler`
183185
as the `scheduler_class` during jupyter server start.
184186
"""
185187

@@ -197,27 +199,41 @@ def execute(self):
197199
store_widget_state=True,
198200
)
199201

202+
# Get the directory of the input file
203+
local_staging_dir = os.path.dirname(self.staging_paths["input"])
204+
# Directory where side-effect files are written
205+
run_dir = os.path.join(local_staging_dir, "files")
206+
os.mkdir(run_dir)
207+
200208
try:
201-
ep.preprocess(nb)
209+
ep.preprocess(nb, {"metadata": {"path": run_dir}})
202210
except CellExecutionError as e:
203211
pass
204212
finally:
213+
# Create all desired output files, other than "input" and "tar.gz"
214+
for output_format in job.output_formats:
215+
if output_format == "input" or output_format == "tar.gz":
216+
pass
217+
else:
218+
cls = nbconvert.get_exporter(output_format)
219+
output, resources = cls().from_notebook_node(nb)
220+
f = open(self.staging_paths[output_format], "wb")
221+
f.write(bytes(output, "utf-8"))
222+
f.close()
223+
224+
# Create an archive file of the staging directory for this run
225+
# and everything under it
205226
fh = io.BytesIO()
206227
with tarfile.open(fileobj=fh, mode="w:gz") as tar:
207-
output_formats = job.output_formats + ["input"]
208-
for output_format in output_formats:
209-
if output_format == "input":
210-
with open(self.staging_paths["input"]) as f:
211-
output = f.read()
212-
else:
213-
cls = nbconvert.get_exporter(output_format)
214-
output, resources = cls().from_notebook_node(nb)
215-
data = bytes(output, "utf-8")
216-
source_f = io.BytesIO(initial_bytes=data)
217-
info = tarfile.TarInfo(self.staging_paths[output_format])
218-
info.size = len(data)
219-
tar.addfile(info, source_f)
228+
for root, dirs, files in os.walk(local_staging_dir):
229+
for file in files:
230+
# This flattens the directory structure, so that in the tar
231+
# file, output files and side-effect files are side-by-side
232+
tar.add(os.path.join(root, file), file)
220233

221234
archive_filepath = self.staging_paths["tar.gz"]
222235
with fsspec.open(archive_filepath, "wb") as f:
223236
f.write(fh.getvalue())
237+
238+
# Clean up the side-effect files in the run directory
239+
shutil.rmtree(run_dir)

jupyter_scheduler/job_files_manager.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,37 +52,32 @@ def __init__(
5252

5353
def generate_filepaths(self):
5454
"""A generator that produces filepaths"""
55-
output_dir = self.output_dir
56-
if not os.path.exists(output_dir):
57-
os.makedirs(output_dir)
58-
5955
output_formats = self.output_formats + ["input"]
6056

6157
for output_format in output_formats:
6258
input_filepath = self.staging_paths[output_format]
63-
output_filename = self.output_filenames[output_format]
64-
output_filepath = os.path.join(output_dir, output_filename)
59+
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
6560
if not os.path.exists(output_filepath) or self.redownload:
6661
yield input_filepath, output_filepath
6762

6863
def download_tar(self, archive_format: str = "tar"):
6964
archive_filepath = self.staging_paths[archive_format]
7065
read_mode = "r:gz" if archive_format == "tar.gz" else "tar"
66+
7167
with fsspec.open(archive_filepath) as f:
7268
with tarfile.open(fileobj=f, mode=read_mode) as tar:
73-
filepaths = self.generate_filepaths()
74-
for input_filepath, output_filepath in filepaths:
75-
try:
76-
input_file = tar.extractfile(member=input_filepath)
77-
with fsspec.open(output_filepath, mode="wb") as output_file:
78-
output_file.write(input_file.read())
79-
except Exception as e:
80-
pass
69+
tar.extractall(self.output_dir)
8170

8271
def download(self):
72+
# ensure presence of staging paths
8373
if not self.staging_paths:
8474
return
8575

76+
# ensure presence of output dir
77+
output_dir = self.output_dir
78+
if not os.path.exists(output_dir):
79+
os.makedirs(output_dir)
80+
8681
if "tar" in self.staging_paths:
8782
self.download_tar()
8883
elif "tar.gz" in self.staging_paths:

jupyter_scheduler/scheduler.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->
686686

687687

688688
class ArchivingScheduler(Scheduler):
689-
"""Scheduler that adds archive path to staging paths."""
689+
"""Scheduler that captures all files in output directory in an archive."""
690690

691691
execution_manager_class = TType(
692692
klass="jupyter_scheduler.executors.ExecutionManager",
@@ -705,12 +705,16 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->
705705
filename = create_output_filename(
706706
model.input_filename, model.create_time, output_format
707707
)
708-
staging_paths[output_format] = filename
708+
# Use the staging directory to capture output files
709+
staging_paths[output_format] = os.path.join(self.staging_path, id, filename)
709710

710-
output_format = "tar.gz"
711-
filename = create_output_filename(model.input_filename, model.create_time, output_format)
712-
staging_paths[output_format] = os.path.join(self.staging_path, model.job_id, filename)
713-
staging_paths["input"] = os.path.join(self.staging_path, model.job_id, model.input_filename)
711+
# Create an output archive file
712+
staging_paths["tar.gz"] = os.path.join(
713+
self.staging_path,
714+
id,
715+
create_output_filename(model.input_filename, model.create_time, "tar.gz"),
716+
)
717+
staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename)
714718

715719
return staging_paths
716720

jupyter_scheduler/tests/test_job_files_manager.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22
import os
33
import shutil
44
import tarfile
5-
import tempfile
5+
import time
66
from pathlib import Path
77
from unittest.mock import patch
88

99
import pytest
1010

1111
from jupyter_scheduler.job_files_manager import Downloader, JobFilesManager
1212
from jupyter_scheduler.models import DescribeJob, JobFile
13-
from jupyter_scheduler.scheduler import BaseScheduler
1413

1514

1615
async def test_copy_from_staging():
@@ -68,6 +67,9 @@ async def test_copy_from_staging():
6867
def clear_outputs_dir():
6968
yield
7069
shutil.rmtree(OUTPUTS_DIR)
70+
# rmtree() is not synchronous; wait until it has finished running
71+
while os.path.isdir(OUTPUTS_DIR):
72+
time.sleep(0.01)
7173

7274

7375
@pytest.mark.parametrize(
@@ -76,9 +78,9 @@ def clear_outputs_dir():
7678
(
7779
["ipynb", "html"],
7880
{
79-
"ipynb": "helloworld-out.ipynb",
80-
"html": "helloworld-out.html",
81-
"input": "helloworld-input.ipynb",
81+
"ipynb": "job-1/helloworld-out.ipynb",
82+
"html": "job-1/helloworld-out.html",
83+
"input": "job-1/helloworld-input.ipynb",
8284
},
8385
{
8486
"ipynb": os.path.join(HERE, "test_staging_dir", "job-1", "helloworld-1.ipynb"),
@@ -91,9 +93,9 @@ def clear_outputs_dir():
9193
(
9294
["ipynb", "html"],
9395
{
94-
"ipynb": "helloworld-out.ipynb",
95-
"html": "helloworld-out.html",
96-
"input": "helloworld-input.ipynb",
96+
"ipynb": "job-2/helloworld-1.ipynb",
97+
"html": "job-2/helloworld-1.html",
98+
"input": "job-2/helloworld.ipynb",
9799
},
98100
{
99101
"tar.gz": os.path.join(HERE, "test_staging_dir", "job-2", "helloworld.tar.gz"),
@@ -120,10 +122,13 @@ def test_downloader_download(
120122

121123
assert os.path.exists(output_dir)
122124
for format in output_formats:
125+
# get path to output file corresponding to this format
123126
out_filepath = os.path.join(output_dir, output_filenames[format])
124127

128+
# assert each output file exists
125129
assert os.path.exists(out_filepath)
126130

131+
# assert integrity of each output file
127132
if "tar.gz" in staging_paths:
128133
with tarfile.open(staging_paths["tar.gz"]) as tar:
129134
input_file = tar.extractfile(member=staging_paths[format])

0 commit comments

Comments
 (0)