Skip to content

Commit f880cdb

Browse files
committed
def initiate_download_standalonefunction and use it in DownloadManager and ExecutionManager
1 parent 7938533 commit f880cdb

File tree

4 files changed

+43
-36
lines changed

4 files changed

+43
-36
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,24 @@
66
from jupyter_scheduler.utils import get_utc_timestamp
77

88

9+
def initiate_download_standalone(job_id: str, queue: Queue, db_session, redownload: bool = False):
10+
"""
11+
This static method initiates a download in a standalone manner independent of the DownloadManager instance. It is suitable for use in multiprocessing environment where a direct reference to DownloadManager instance is not feasible.
12+
"""
13+
download_initiated_time = get_utc_timestamp()
14+
download_id = generate_uuid()
15+
download = DescribeDownload(
16+
job_id=job_id,
17+
download_id=download_id,
18+
download_initiated_time=download_initiated_time,
19+
redownload=redownload,
20+
)
21+
download_record = Download(**download.dict())
22+
db_session.add(download_record)
23+
db_session.commit()
24+
queue.put(download)
25+
26+
927
class DownloadRecordManager:
1028
def __init__(self, db_url):
1129
self.session = create_session(db_url)
@@ -45,17 +63,11 @@ def __init__(self, db_url: str):
4563
self.record_manager = DownloadRecordManager(db_url=db_url)
4664
self.queue = Queue()
4765

48-
def download_from_staging(self, job_id: str, redownload: bool):
49-
download_initiated_time = get_utc_timestamp()
50-
download_id = generate_uuid()
51-
download = DescribeDownload(
52-
job_id=job_id,
53-
download_id=download_id,
54-
download_initiated_time=download_initiated_time,
55-
redownload=redownload,
56-
)
57-
self.record_manager.put(download)
58-
self.queue.put(download)
66+
def initiate_download(self, job_id: str, redownload: bool):
67+
with self.record_manager.session() as session:
68+
initiate_download_standalone(
69+
job_id=job_id, queue=self.queue, db_session=session, redownload=redownload
70+
)
5971

6072
def delete_download(self, download_id: str):
6173
self.record_manager.delete_download(download_id)

jupyter_scheduler/executors.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
import nbformat
1212
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1313

14-
from jupyter_scheduler.download_manager import DescribeDownload, Download
14+
from jupyter_scheduler.download_manager import (
15+
DescribeDownload,
16+
Download,
17+
initiate_download_standalone,
18+
)
1519
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1620
from jupyter_scheduler.orm import Job, create_session, generate_uuid
1721
from jupyter_scheduler.parameterize import add_parameters
@@ -156,22 +160,13 @@ def execute(self):
156160
output, _ = cls().from_notebook_node(nb)
157161
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
158162
f.write(output)
159-
self._download_from_staging(job.job_id)
160-
161-
def _download_from_staging(self, job_id: str):
162-
download_initiated_time = get_utc_timestamp()
163-
download_id = generate_uuid()
164-
download = DescribeDownload(
165-
job_id=job_id,
166-
download_id=download_id,
167-
download_initiated_time=download_initiated_time,
168-
redownload=True,
169-
)
170-
with self.db_session() as session:
171-
download_record = Download(**download.dict())
172-
session.add(download_record)
173-
session.commit()
174-
self.download_queue.put(download)
163+
with self.db_session() as session:
164+
initiate_download_standalone(
165+
job_id=job.job_id,
166+
queue=self.download_queue,
167+
db_session=session,
168+
redownload=True,
169+
)
175170

176171
def add_side_effects_files(self, staging_dir):
177172
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""

jupyter_scheduler/extension.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ def initialize_settings(self):
8181
root_dir=self.serverapp.root_dir,
8282
environments_manager=environments_manager,
8383
db_url=self.db_url,
84-
config=self.config,
8584
download_queue=download_manager.queue,
85+
config=self.config,
8686
)
8787

8888
job_files_manager = self.job_files_manager_class(scheduler=scheduler)
@@ -95,7 +95,7 @@ def initialize_settings(self):
9595
environments_manager=environments_manager,
9696
scheduler=scheduler,
9797
job_files_manager=job_files_manager,
98-
download_from_staging=download_manager.download_from_staging,
98+
initiate_download=download_manager.initiate_download,
9999
)
100100

101101
if scheduler.task_runner:

jupyter_scheduler/handlers.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,20 +395,20 @@ def get(self):
395395

396396

397397
class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler):
398-
_download_from_staging = None
398+
_initiate_download = None
399399

400400
@property
401-
def download_from_staging(self):
402-
if not self._download_from_staging:
403-
self._download_from_staging = self.settings.get("download_from_staging", None)
401+
def initiate_download(self):
402+
if not self._initiate_download:
403+
self._initiate_download = self.settings.get("initiate_download", None)
404404

405-
return self._download_from_staging
405+
return self._initiate_download
406406

407407
@authenticated
408408
async def get(self, job_id):
409409
redownload = self.get_query_argument("redownload", False)
410410
try:
411-
self.download_from_staging(job_id, redownload)
411+
self.initiate_download(job_id, redownload)
412412
except Exception as e:
413413
self.log.exception(e)
414414
raise HTTPError(500, str(e)) from e

0 commit comments

Comments
 (0)