Skip to content

Commit ba44f7c

Browse files
committed
def initiate_download_standalonefunction and use it in DownloadManager and ExecutionManager
1 parent 56f095d commit ba44f7c

File tree

4 files changed

+39
-38
lines changed

4 files changed

+39
-38
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,24 @@
77
from jupyter_scheduler.pydantic_v1 import BaseModel
88

99

10+
def initiate_download_standalone(job_id: str, queue: Queue, db_session, redownload: bool = False):
11+
"""
12+
This static method initiates a download in a standalone manner independent of the DownloadManager instance. It is suitable for use in multiprocessing environment where a direct reference to DownloadManager instance is not feasible.
13+
"""
14+
download_initiated_time = get_utc_timestamp()
15+
download_id = generate_uuid()
16+
download = DescribeDownload(
17+
job_id=job_id,
18+
download_id=download_id,
19+
download_initiated_time=download_initiated_time,
20+
redownload=redownload,
21+
)
22+
download_record = Download(**download.dict())
23+
db_session.add(download_record)
24+
db_session.commit()
25+
queue.put(download)
26+
27+
1028
class DownloadRecordManager:
1129
def __init__(self, db_url):
1230
self.session = create_session(db_url)
@@ -46,17 +64,11 @@ def __init__(self, db_url: str):
4664
self.record_manager = DownloadRecordManager(db_url=db_url)
4765
self.queue = Queue()
4866

49-
def download_from_staging(self, job_id: str, redownload: bool):
50-
download_initiated_time = get_utc_timestamp()
51-
download_id = generate_uuid()
52-
download = DescribeDownload(
53-
job_id=job_id,
54-
download_id=download_id,
55-
download_initiated_time=download_initiated_time,
56-
redownload=redownload,
57-
)
58-
self.record_manager.put(download)
59-
self.queue.put(download)
67+
def initiate_download(self, job_id: str, redownload: bool):
68+
with self.record_manager.session() as session:
69+
initiate_download_standalone(
70+
job_id=job_id, queue=self.queue, db_session=session, redownload=redownload
71+
)
6072

6173
def delete_download(self, download_id: str):
6274
self.record_manager.delete_download(download_id)

jupyter_scheduler/executors.py

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@
1111
import nbformat
1212
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1313

14-
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1514
from jupyter_scheduler.orm import Job, create_session
16-
from jupyter_scheduler.download_manager import DescribeDownload, Download
15+
from jupyter_scheduler.download_manager import initiate_download_standalone
1716
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
18-
from jupyter_scheduler.orm import Job, create_session, generate_uuid
1917
from jupyter_scheduler.parameterize import add_parameters
2018
from jupyter_scheduler.utils import get_utc_timestamp
2119

@@ -154,22 +152,13 @@ def execute(self):
154152
finally:
155153
self.add_side_effects_files(staging_dir)
156154
self.create_output_files(job, nb)
157-
self._download_from_staging(job.job_id)
158-
159-
def _download_from_staging(self, job_id: str):
160-
download_initiated_time = get_utc_timestamp()
161-
download_id = generate_uuid()
162-
download = DescribeDownload(
163-
job_id=job_id,
164-
download_id=download_id,
165-
download_initiated_time=download_initiated_time,
166-
redownload=True,
167-
)
168-
with self.db_session() as session:
169-
download_record = Download(**download.dict())
170-
session.add(download_record)
171-
session.commit()
172-
self.download_queue.put(download)
155+
with self.db_session() as session:
156+
initiate_download_standalone(
157+
job_id=job.job_id,
158+
queue=self.download_queue,
159+
db_session=session,
160+
redownload=True,
161+
)
173162

174163
def add_side_effects_files(self, staging_dir: str):
175164
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""

jupyter_scheduler/extension.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ def initialize_settings(self):
8181
root_dir=self.serverapp.root_dir,
8282
environments_manager=environments_manager,
8383
db_url=self.db_url,
84-
config=self.config,
8584
download_queue=download_manager.queue,
85+
config=self.config,
8686
)
8787

8888
job_files_manager = self.job_files_manager_class(scheduler=scheduler)
@@ -95,7 +95,7 @@ def initialize_settings(self):
9595
environments_manager=environments_manager,
9696
scheduler=scheduler,
9797
job_files_manager=job_files_manager,
98-
download_from_staging=download_manager.download_from_staging,
98+
initiate_download=download_manager.initiate_download,
9999
)
100100

101101
if scheduler.task_runner:

jupyter_scheduler/handlers.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,20 +395,20 @@ def get(self):
395395

396396

397397
class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler):
398-
_download_from_staging = None
398+
_initiate_download = None
399399

400400
@property
401-
def download_from_staging(self):
402-
if not self._download_from_staging:
403-
self._download_from_staging = self.settings.get("download_from_staging", None)
401+
def initiate_download(self):
402+
if not self._initiate_download:
403+
self._initiate_download = self.settings.get("initiate_download", None)
404404

405-
return self._download_from_staging
405+
return self._initiate_download
406406

407407
@authenticated
408408
async def get(self, job_id):
409409
redownload = self.get_query_argument("redownload", False)
410410
try:
411-
self.download_from_staging(job_id, redownload)
411+
self.initiate_download(job_id, redownload)
412412
except Exception as e:
413413
self.log.exception(e)
414414
raise HTTPError(500, str(e)) from e

0 commit comments

Comments
 (0)