Skip to content

Commit 9636c7f

Browse files
committed
use download queue
1 parent 5a961d5 commit 9636c7f

File tree

7 files changed

+284
-10
lines changed

7 files changed

+284
-10
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
from multiprocessing import Queue
4+
from typing import List, Optional
5+
6+
from jupyter_scheduler.orm import DownloadCacheRecord, create_session, generate_uuid
7+
from jupyter_scheduler.utils import get_utc_timestamp
8+
from jupyter_scheduler.pydantic_v1 import BaseModel
9+
10+
11+
class DescribeDownloadCache(BaseModel):
12+
job_id: str
13+
download_id: str
14+
download_initiated_time: int
15+
16+
class Config:
17+
orm_mode = True
18+
19+
20+
@dataclass
21+
class DownloadTask:
22+
job_id: str
23+
download_id: str
24+
download_initiated_time: int
25+
26+
def __lt__(self, other):
27+
return self.download_initiated_time < other.download_initiated_time
28+
29+
def __str__(self):
30+
download_initiated_time = datetime.fromtimestamp(self.download_initiated_time / 1e3)
31+
return f"Id: {self.job_id}, Download initiated: {download_initiated_time}"
32+
33+
34+
class MultiprocessQueue:
35+
"""A multiprocess-safe queue using multiprocessing.Queue()"""
36+
37+
def __init__(self):
38+
self.queue = Queue()
39+
40+
def put(self, download: DownloadTask):
41+
self.queue.put(download)
42+
43+
def get(self) -> Optional[DownloadTask]:
44+
return self.queue.get() if not self.queue.empty() else None
45+
46+
def isempty(self) -> bool:
47+
return self.queue.empty()
48+
49+
50+
class DownloadCache:
51+
def __init__(self, db_url):
52+
self.session = create_session(db_url)
53+
54+
def put(self, download: DescribeDownloadCache):
55+
with self.session() as session:
56+
new_download = DownloadCacheRecord(**download.dict())
57+
session.add(new_download)
58+
session.commit()
59+
60+
def get(self, job_id: str) -> Optional[DescribeDownloadCache]:
61+
with self.session() as session:
62+
download = (
63+
session.query(DownloadCacheRecord)
64+
.filter(DownloadCacheRecord.job_id == job_id)
65+
.first()
66+
)
67+
68+
if download:
69+
return DescribeDownloadCache.from_orm(download)
70+
else:
71+
return None
72+
73+
def get_tasks(self) -> List[DescribeDownloadCache]:
74+
with self.session() as session:
75+
return (
76+
session.query(DownloadCacheRecord)
77+
.order_by(DownloadCacheRecord.download_initiated_time)
78+
.all()
79+
)
80+
81+
def delete_download(self, download_id: str):
82+
with self.session() as session:
83+
session.query(DownloadCacheRecord).filter(
84+
DownloadCacheRecord.download_id == download_id
85+
).delete()
86+
session.commit()
87+
88+
def delete_job_downloads(self, job_id: str):
89+
with self.session() as session:
90+
session.query(DownloadCacheRecord).filter(DownloadCacheRecord.job_id == job_id).delete()
91+
session.commit()
92+
93+
94+
class DownloadManager:
95+
def __init__(self, db_url: str):
96+
self.cache = DownloadCache(db_url=db_url)
97+
self.queue = MultiprocessQueue()
98+
99+
def download_from_staging(self, job_id: str):
100+
download_initiated_time = get_utc_timestamp()
101+
download_id = generate_uuid()
102+
download_cache = DescribeDownloadCache(
103+
job_id=job_id,
104+
download_id=download_id,
105+
download_initiated_time=download_initiated_time,
106+
)
107+
self.cache.put(download_cache)
108+
download_task = DownloadTask(
109+
job_id=job_id,
110+
download_id=download_id,
111+
download_initiated_time=download_initiated_time,
112+
)
113+
self.queue.put(download_task)
114+
115+
def delete_download(self, download_id: str):
116+
self.cache.delete_download(download_id)
117+
118+
def delete_job_downloads(self, job_id: str):
119+
self.cache.delete_job_downloads(job_id)
120+
121+
def populate_queue(self):
122+
tasks = self.cache.get_tasks()
123+
for task in tasks:
124+
download_task = DownloadTask(
125+
job_id=task.job_id,
126+
download_id=task.download_id,
127+
download_initiated_time=task.download_initiated_time,
128+
)
129+
self.queue.put(download_task)

jupyter_scheduler/download_runner.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import asyncio
2+
3+
from jupyter_scheduler.download_manager import DownloadManager
4+
from jupyter_scheduler.job_files_manager import JobFilesManager
5+
6+
7+
class BaseDownloadRunner:
8+
def start(self):
9+
raise NotImplementedError("Must be implemented by subclass")
10+
11+
12+
class DownloadRunner(BaseDownloadRunner):
13+
def __init__(
14+
self,
15+
download_manager: DownloadManager,
16+
job_files_manager: JobFilesManager,
17+
poll_interval: int = 10,
18+
):
19+
self.download_manager = download_manager
20+
self.job_files_manager = job_files_manager
21+
self.poll_interval = poll_interval
22+
23+
# def add_download(self, job_id: str):
24+
# download_initiated_time = get_utc_timestamp()
25+
# download_id = generate_uuid()
26+
# download_cache = DescribeDownloadCache(
27+
# job_id=job_id,
28+
# download_id=download_id,
29+
# download_initiated_time=download_initiated_time,
30+
# )
31+
# self.download_cache.put(download_cache)
32+
# download_task = DownloadTask(
33+
# job_id=job_id,
34+
# download_id=download_id,
35+
# download_initiated_time=download_initiated_time,
36+
# )
37+
# self.download_queue.put(download_task)
38+
39+
# def delete_download(self, download_id: str):
40+
# self.download_cache.delete_download(download_id)
41+
42+
# def delete_job_downloads(self, job_id: str):
43+
# self.download_cache.delete_job_downloads(job_id)
44+
45+
async def process_download_queue(self):
46+
print("\n\n***\nDownloadRunner.process_download_queue isempty")
47+
print(self.download_manager.queue.isempty())
48+
while not self.download_manager.queue.isempty():
49+
download = self.download_manager.queue.get()
50+
print(download)
51+
cache = self.download_manager.cache.get(download.job_id)
52+
print(cache)
53+
if not cache or not download:
54+
continue
55+
await self.job_files_manager.copy_from_staging(cache.job_id)
56+
self.download_manager.cache.delete_download(cache.download_id)
57+
58+
# def populate_queue(self):
59+
# tasks = self.download_manager.cache.get_tasks()
60+
# for task in tasks:
61+
# download_task = DownloadTask(
62+
# job_id=task.job_id,
63+
# download_id=task.download_id,
64+
# download_initiated_time=task.download_initiated_time,
65+
# )
66+
# self.download_manager.queue.put(download_task)
67+
68+
async def start(self):
69+
self.download_manager.populate_queue()
70+
while True:
71+
await self.process_download_queue()
72+
await asyncio.sleep(self.poll_interval)

jupyter_scheduler/executors.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import io
2+
import multiprocessing
23
import os
34
import shutil
45
import tarfile
@@ -11,8 +12,14 @@
1112
import nbformat
1213
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1314

15+
from jupyter_scheduler.download_manager import (
16+
DescribeDownloadCache,
17+
DownloadCacheRecord,
18+
DownloadTask,
19+
MultiprocessQueue,
20+
)
1421
from jupyter_scheduler.models import DescribeJob, JobFeature, JobFile, Status
15-
from jupyter_scheduler.orm import Job, create_session
22+
from jupyter_scheduler.orm import Job, create_session, generate_uuid
1623
from jupyter_scheduler.parameterize import add_parameters
1724
from jupyter_scheduler.utils import get_utc_timestamp
1825

@@ -29,11 +36,19 @@ class ExecutionManager(ABC):
2936
_model = None
3037
_db_session = None
3138

32-
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
39+
def __init__(
40+
self,
41+
job_id: str,
42+
root_dir: str,
43+
db_url: str,
44+
staging_paths: Dict[str, str],
45+
download_queue,
46+
):
3347
self.job_id = job_id
3448
self.staging_paths = staging_paths
3549
self.root_dir = root_dir
3650
self.db_url = db_url
51+
self.download_queue = download_queue
3752

3853
@property
3954
def model(self):
@@ -147,6 +162,31 @@ def execute(self):
147162
output, _ = cls().from_notebook_node(nb)
148163
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
149164
f.write(output)
165+
self.download_from_staging(job.job_id)
166+
167+
def download_from_staging(self, job_id: str):
168+
download_initiated_time = get_utc_timestamp()
169+
download_id = generate_uuid()
170+
download_cache = DescribeDownloadCache(
171+
job_id=job_id,
172+
download_id=download_id,
173+
download_initiated_time=download_initiated_time,
174+
)
175+
with self.db_session() as session:
176+
new_download = DownloadCacheRecord(**download_cache.dict())
177+
session.add(new_download)
178+
session.commit()
179+
download_task = DownloadTask(
180+
job_id=job_id,
181+
download_id=download_id,
182+
download_initiated_time=download_initiated_time,
183+
)
184+
self.download_queue.put(download_task)
185+
# print(
186+
# "\n\n***\n ExecutionManager.download_from_staging uuid and task being put on a qeueue"
187+
# )
188+
# print(download_id)
189+
# print(download_task)
150190

151191
def add_side_effects_files(self, staging_dir):
152192
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""

jupyter_scheduler/extension.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from jupyter_server.transutils import _i18n
66
from traitlets import Bool, Type, Unicode, default
77

8+
from jupyter_scheduler.download_manager import DownloadManager
9+
from jupyter_scheduler.download_runner import DownloadRunner
810
from jupyter_scheduler.orm import create_tables
911

1012
from .handlers import (
@@ -73,21 +75,33 @@ def initialize_settings(self):
7375

7476
environments_manager = self.environment_manager_class()
7577

78+
download_manager = DownloadManager(db_url=self.db_url)
79+
7680
scheduler = self.scheduler_class(
7781
root_dir=self.serverapp.root_dir,
7882
environments_manager=environments_manager,
7983
db_url=self.db_url,
8084
config=self.config,
85+
download_queue=download_manager.queue,
8186
)
8287

8388
job_files_manager = self.job_files_manager_class(scheduler=scheduler)
8489

90+
download_runner = DownloadRunner(
91+
download_manager=download_manager, job_files_manager=job_files_manager
92+
)
93+
8594
self.settings.update(
8695
environments_manager=environments_manager,
8796
scheduler=scheduler,
8897
job_files_manager=job_files_manager,
98+
download_from_staging=download_manager.download_from_staging,
8999
)
90100

91101
if scheduler.task_runner:
92102
loop = asyncio.get_event_loop()
93103
loop.create_task(scheduler.task_runner.start())
104+
105+
if download_runner:
106+
loop = asyncio.get_event_loop()
107+
loop.create_task(download_runner.start())

jupyter_scheduler/handlers.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -394,20 +394,29 @@ def get(self):
394394

395395

396396
class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler):
397-
_job_files_manager = None
397+
# _job_files_manager = None
398+
_download_from_staging = None
399+
400+
# @property
401+
# def job_files_manager(self):
402+
# if not self._job_files_manager:
403+
# self._job_files_manager = self.settings.get("job_files_manager", None)
404+
405+
# return self._job_files_manager
398406

399407
@property
400-
def job_files_manager(self):
401-
if not self._job_files_manager:
402-
self._job_files_manager = self.settings.get("job_files_manager", None)
408+
def download_from_staging(self):
409+
if not self._download_from_staging:
410+
self._download_from_staging = self.settings.get("download_from_staging", None)
403411

404-
return self._job_files_manager
412+
return self._download_from_staging
405413

406414
@authenticated
407415
async def get(self, job_id):
408-
redownload = self.get_query_argument("redownload", False)
416+
# redownload = self.get_query_argument("redownload", False)
409417
try:
410-
await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload)
418+
# await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload)
419+
self.download_from_staging(job_id)
411420
except Exception as e:
412421
self.log.exception(e)
413422
raise HTTPError(500, str(e)) from e

jupyter_scheduler/orm.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import os
32
from sqlite3 import OperationalError
43
from uuid import uuid4
54

@@ -112,6 +111,13 @@ class JobDefinition(CommonColumns, Base):
112111
active = Column(Boolean, default=True)
113112

114113

114+
class DownloadCacheRecord(Base):
115+
__tablename__ = "download_cache"
116+
job_id = Column(String(36), primary_key=True)
117+
download_id = Column(String(36), primary_key=True)
118+
download_initiated_time = Column(Integer)
119+
120+
115121
def create_tables(db_url, drop_tables=False):
116122
engine = create_engine(db_url)
117123
try:

0 commit comments

Comments
 (0)