Skip to content

Commit b897aa5

Browse files
JasonWeillpre-commit-ci[bot]dlqqq
authored
Archiving all-files scheduler (#388)
* Fix typo in comment * WIP: Adds new scheduler * writes individual files * WIP: Write zip file * WIP: Trying to get zip file to be written only on scheduled job runs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * WIP: Removes zip type, incremental work for archiving work dir * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Create tar.gz in staging subdir * Capture side effect files in staging dir * Extracts files * Add filter * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update jupyter_scheduler/job_files_manager.py Co-authored-by: david qiu <[email protected]> * Simplifies cleanup logic * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Updates docs, deletes old Archiving*, renames AllFilesArchiving --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: david qiu <[email protected]>
1 parent 1e6f460 commit b897aa5

File tree

4 files changed

+59
-31
lines changed

4 files changed

+59
-31
lines changed

docs/operators/index.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,20 @@ jupyter lab --SchedulerApp.job_files_manager_class=jupyter_scheduler.job_files_m
8686

8787
For more information on writing a custom implementation, please see the {doc}`developer's guide </developers/index>`.
8888

89+
### Example: Capturing side effect files
90+
91+
The default scheduler and execution manager classes do not capture
92+
**side effect files**, files that are created as a side effect of executing
93+
cells in a notebook. The `ArchivingScheduler` and `ArchivingExecutionManager`
94+
classes do capture side effect files. If you intend to run notebooks that produce
95+
side effect files, you can use these classes by running:
96+
97+
```
98+
jupyter lab \
99+
--SchedulerApp.scheduler_class=jupyter_scheduler.scheduler.ArchivingScheduler \
100+
--Scheduler.execution_manager_class=jupyter_scheduler.executors.ArchivingExecutionManager
101+
```
102+
89103
## UI configuration
90104

91105
You can configure the Jupyter Scheduler UI by installing a lab extension that both:

jupyter_scheduler/executors.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import io
2+
import os
3+
import shutil
24
import tarfile
35
import traceback
46
from abc import ABC, abstractmethod
@@ -174,12 +176,12 @@ def validate(cls, input_path: str) -> bool:
174176

175177

176178
class ArchivingExecutionManager(DefaultExecutionManager):
177-
"""Execution manager that archives the output
178-
files to a compressed tar file.
179+
"""Execution manager that archives all output files in and under the
180+
output directory into a single archive file
179181
180182
Notes
181183
-----
182-
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchiveDownloadingScheduler`
184+
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchivingScheduler`
183185
as the `scheduler_class` during jupyter server start.
184186
"""
185187

@@ -197,27 +199,41 @@ def execute(self):
197199
store_widget_state=True,
198200
)
199201

202+
# Get the directory of the input file
203+
local_staging_dir = os.path.dirname(self.staging_paths["input"])
204+
# Directory where side-effect files are written
205+
run_dir = os.path.join(local_staging_dir, "files")
206+
os.mkdir(run_dir)
207+
200208
try:
201-
ep.preprocess(nb)
209+
ep.preprocess(nb, {"metadata": {"path": run_dir}})
202210
except CellExecutionError as e:
203211
pass
204212
finally:
213+
# Create all desired output files, other than "input" and "tar.gz"
214+
for output_format in job.output_formats:
215+
if output_format == "input" or output_format == "tar.gz":
216+
pass
217+
else:
218+
cls = nbconvert.get_exporter(output_format)
219+
output, resources = cls().from_notebook_node(nb)
220+
f = open(self.staging_paths[output_format], "wb")
221+
f.write(bytes(output, "utf-8"))
222+
f.close()
223+
224+
# Create an archive file of the staging directory for this run
225+
# and everything under it
205226
fh = io.BytesIO()
206227
with tarfile.open(fileobj=fh, mode="w:gz") as tar:
207-
output_formats = job.output_formats + ["input"]
208-
for output_format in output_formats:
209-
if output_format == "input":
210-
with open(self.staging_paths["input"]) as f:
211-
output = f.read()
212-
else:
213-
cls = nbconvert.get_exporter(output_format)
214-
output, resources = cls().from_notebook_node(nb)
215-
data = bytes(output, "utf-8")
216-
source_f = io.BytesIO(initial_bytes=data)
217-
info = tarfile.TarInfo(self.staging_paths[output_format])
218-
info.size = len(data)
219-
tar.addfile(info, source_f)
228+
for root, dirs, files in os.walk(local_staging_dir):
229+
for file in files:
230+
# This flattens the directory structure, so that in the tar
231+
# file, output files and side-effect files are side-by-side
232+
tar.add(os.path.join(root, file), file)
220233

221234
archive_filepath = self.staging_paths["tar.gz"]
222235
with fsspec.open(archive_filepath, "wb") as f:
223236
f.write(fh.getvalue())
237+
238+
# Clean up the side-effect files in the run directory
239+
shutil.rmtree(run_dir)

jupyter_scheduler/job_files_manager.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,10 @@ def generate_filepaths(self):
6868
def download_tar(self, archive_format: str = "tar"):
6969
archive_filepath = self.staging_paths[archive_format]
7070
read_mode = "r:gz" if archive_format == "tar.gz" else "tar"
71+
7172
with fsspec.open(archive_filepath) as f:
7273
with tarfile.open(fileobj=f, mode=read_mode) as tar:
73-
filepaths = self.generate_filepaths()
74-
for input_filepath, output_filepath in filepaths:
75-
try:
76-
input_file = tar.extractfile(member=input_filepath)
77-
with fsspec.open(output_filepath, mode="wb") as output_file:
78-
output_file.write(input_file.read())
79-
except Exception as e:
80-
pass
74+
tar.extractall(self.output_dir, filter="data")
8175

8276
def download(self):
8377
if not self.staging_paths:

jupyter_scheduler/scheduler.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->
686686

687687

688688
class ArchivingScheduler(Scheduler):
689-
"""Scheduler that adds archive path to staging paths."""
689+
"""Scheduler that captures all files in output directory in an archive."""
690690

691691
execution_manager_class = TType(
692692
klass="jupyter_scheduler.executors.ExecutionManager",
@@ -705,12 +705,16 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->
705705
filename = create_output_filename(
706706
model.input_filename, model.create_time, output_format
707707
)
708-
staging_paths[output_format] = filename
708+
# Use the staging directory to capture output files
709+
staging_paths[output_format] = os.path.join(self.staging_path, id, filename)
709710

710-
output_format = "tar.gz"
711-
filename = create_output_filename(model.input_filename, model.create_time, output_format)
712-
staging_paths[output_format] = os.path.join(self.staging_path, model.job_id, filename)
713-
staging_paths["input"] = os.path.join(self.staging_path, model.job_id, model.input_filename)
711+
# Create an output archive file
712+
staging_paths["tar.gz"] = os.path.join(
713+
self.staging_path,
714+
id,
715+
create_output_filename(model.input_filename, model.create_time, "tar.gz"),
716+
)
717+
staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename)
714718

715719
return staging_paths
716720

0 commit comments

Comments
 (0)