Skip to content

Commit 25e17a5

Browse files
committed
use download queue
1 parent 953aea6 commit 25e17a5

File tree

7 files changed

+283
-9
lines changed

7 files changed

+283
-9
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
from multiprocessing import Queue
4+
from typing import List, Optional
5+
6+
from jupyter_scheduler.orm import DownloadCacheRecord, create_session, generate_uuid
7+
from jupyter_scheduler.utils import get_utc_timestamp
8+
from jupyter_scheduler.pydantic_v1 import BaseModel
9+
10+
11+
class DescribeDownloadCache(BaseModel):
12+
job_id: str
13+
download_id: str
14+
download_initiated_time: int
15+
16+
class Config:
17+
orm_mode = True
18+
19+
20+
@dataclass
21+
class DownloadTask:
22+
job_id: str
23+
download_id: str
24+
download_initiated_time: int
25+
26+
def __lt__(self, other):
27+
return self.download_initiated_time < other.download_initiated_time
28+
29+
def __str__(self):
30+
download_initiated_time = datetime.fromtimestamp(self.download_initiated_time / 1e3)
31+
return f"Id: {self.job_id}, Download initiated: {download_initiated_time}"
32+
33+
34+
class MultiprocessQueue:
35+
"""A multiprocess-safe queue using multiprocessing.Queue()"""
36+
37+
def __init__(self):
38+
self.queue = Queue()
39+
40+
def put(self, download: DownloadTask):
41+
self.queue.put(download)
42+
43+
def get(self) -> Optional[DownloadTask]:
44+
return self.queue.get() if not self.queue.empty() else None
45+
46+
def isempty(self) -> bool:
47+
return self.queue.empty()
48+
49+
50+
class DownloadCache:
51+
def __init__(self, db_url):
52+
self.session = create_session(db_url)
53+
54+
def put(self, download: DescribeDownloadCache):
55+
with self.session() as session:
56+
new_download = DownloadCacheRecord(**download.dict())
57+
session.add(new_download)
58+
session.commit()
59+
60+
def get(self, job_id: str) -> Optional[DescribeDownloadCache]:
61+
with self.session() as session:
62+
download = (
63+
session.query(DownloadCacheRecord)
64+
.filter(DownloadCacheRecord.job_id == job_id)
65+
.first()
66+
)
67+
68+
if download:
69+
return DescribeDownloadCache.from_orm(download)
70+
else:
71+
return None
72+
73+
def get_tasks(self) -> List[DescribeDownloadCache]:
74+
with self.session() as session:
75+
return (
76+
session.query(DownloadCacheRecord)
77+
.order_by(DownloadCacheRecord.download_initiated_time)
78+
.all()
79+
)
80+
81+
def delete_download(self, download_id: str):
82+
with self.session() as session:
83+
session.query(DownloadCacheRecord).filter(
84+
DownloadCacheRecord.download_id == download_id
85+
).delete()
86+
session.commit()
87+
88+
def delete_job_downloads(self, job_id: str):
89+
with self.session() as session:
90+
session.query(DownloadCacheRecord).filter(DownloadCacheRecord.job_id == job_id).delete()
91+
session.commit()
92+
93+
94+
class DownloadManager:
95+
def __init__(self, db_url: str):
96+
self.cache = DownloadCache(db_url=db_url)
97+
self.queue = MultiprocessQueue()
98+
99+
def download_from_staging(self, job_id: str):
100+
download_initiated_time = get_utc_timestamp()
101+
download_id = generate_uuid()
102+
download_cache = DescribeDownloadCache(
103+
job_id=job_id,
104+
download_id=download_id,
105+
download_initiated_time=download_initiated_time,
106+
)
107+
self.cache.put(download_cache)
108+
download_task = DownloadTask(
109+
job_id=job_id,
110+
download_id=download_id,
111+
download_initiated_time=download_initiated_time,
112+
)
113+
self.queue.put(download_task)
114+
115+
def delete_download(self, download_id: str):
116+
self.cache.delete_download(download_id)
117+
118+
def delete_job_downloads(self, job_id: str):
119+
self.cache.delete_job_downloads(job_id)
120+
121+
def populate_queue(self):
122+
tasks = self.cache.get_tasks()
123+
for task in tasks:
124+
download_task = DownloadTask(
125+
job_id=task.job_id,
126+
download_id=task.download_id,
127+
download_initiated_time=task.download_initiated_time,
128+
)
129+
self.queue.put(download_task)

jupyter_scheduler/download_runner.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import asyncio
2+
3+
from jupyter_scheduler.download_manager import DownloadManager
4+
from jupyter_scheduler.job_files_manager import JobFilesManager
5+
6+
7+
class BaseDownloadRunner:
8+
def start(self):
9+
raise NotImplementedError("Must be implemented by subclass")
10+
11+
12+
class DownloadRunner(BaseDownloadRunner):
13+
def __init__(
14+
self,
15+
download_manager: DownloadManager,
16+
job_files_manager: JobFilesManager,
17+
poll_interval: int = 10,
18+
):
19+
self.download_manager = download_manager
20+
self.job_files_manager = job_files_manager
21+
self.poll_interval = poll_interval
22+
23+
# def add_download(self, job_id: str):
24+
# download_initiated_time = get_utc_timestamp()
25+
# download_id = generate_uuid()
26+
# download_cache = DescribeDownloadCache(
27+
# job_id=job_id,
28+
# download_id=download_id,
29+
# download_initiated_time=download_initiated_time,
30+
# )
31+
# self.download_cache.put(download_cache)
32+
# download_task = DownloadTask(
33+
# job_id=job_id,
34+
# download_id=download_id,
35+
# download_initiated_time=download_initiated_time,
36+
# )
37+
# self.download_queue.put(download_task)
38+
39+
# def delete_download(self, download_id: str):
40+
# self.download_cache.delete_download(download_id)
41+
42+
# def delete_job_downloads(self, job_id: str):
43+
# self.download_cache.delete_job_downloads(job_id)
44+
45+
async def process_download_queue(self):
46+
print("\n\n***\nDownloadRunner.process_download_queue isempty")
47+
print(self.download_manager.queue.isempty())
48+
while not self.download_manager.queue.isempty():
49+
download = self.download_manager.queue.get()
50+
print(download)
51+
cache = self.download_manager.cache.get(download.job_id)
52+
print(cache)
53+
if not cache or not download:
54+
continue
55+
await self.job_files_manager.copy_from_staging(cache.job_id)
56+
self.download_manager.cache.delete_download(cache.download_id)
57+
58+
# def populate_queue(self):
59+
# tasks = self.download_manager.cache.get_tasks()
60+
# for task in tasks:
61+
# download_task = DownloadTask(
62+
# job_id=task.job_id,
63+
# download_id=task.download_id,
64+
# download_initiated_time=task.download_initiated_time,
65+
# )
66+
# self.download_manager.queue.put(download_task)
67+
68+
async def start(self):
69+
self.download_manager.populate_queue()
70+
while True:
71+
await self.process_download_queue()
72+
await asyncio.sleep(self.poll_interval)

jupyter_scheduler/executors.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import io
2+
import multiprocessing
23
import os
34
import shutil
45
import tarfile
@@ -13,6 +14,13 @@
1314

1415
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1516
from jupyter_scheduler.orm import Job, create_session
17+
from jupyter_scheduler.download_manager import (
18+
DescribeDownloadCache,
19+
DownloadCacheRecord,
20+
DownloadTask,
21+
)
22+
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
23+
from jupyter_scheduler.orm import Job, create_session, generate_uuid
1624
from jupyter_scheduler.parameterize import add_parameters
1725
from jupyter_scheduler.utils import get_utc_timestamp
1826

@@ -29,11 +37,19 @@ class ExecutionManager(ABC):
2937
_model = None
3038
_db_session = None
3139

32-
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
40+
def __init__(
41+
self,
42+
job_id: str,
43+
root_dir: str,
44+
db_url: str,
45+
staging_paths: Dict[str, str],
46+
download_queue,
47+
):
3348
self.job_id = job_id
3449
self.staging_paths = staging_paths
3550
self.root_dir = root_dir
3651
self.db_url = db_url
52+
self.download_queue = download_queue
3753

3854
@property
3955
def model(self):
@@ -144,6 +160,30 @@ def execute(self):
144160
self.add_side_effects_files(staging_dir)
145161
self.create_output_files(job, nb)
146162

163+
def download_from_staging(self, job_id: str):
164+
download_initiated_time = get_utc_timestamp()
165+
download_id = generate_uuid()
166+
download_cache = DescribeDownloadCache(
167+
job_id=job_id,
168+
download_id=download_id,
169+
download_initiated_time=download_initiated_time,
170+
)
171+
with self.db_session() as session:
172+
new_download = DownloadCacheRecord(**download_cache.dict())
173+
session.add(new_download)
174+
session.commit()
175+
download_task = DownloadTask(
176+
job_id=job_id,
177+
download_id=download_id,
178+
download_initiated_time=download_initiated_time,
179+
)
180+
self.download_queue.put(download_task)
181+
# print(
182+
# "\n\n***\n ExecutionManager.download_from_staging uuid and task being put on a qeueue"
183+
# )
184+
# print(download_id)
185+
# print(download_task)
186+
147187
def add_side_effects_files(self, staging_dir: str):
148188
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
149189
input_notebook = os.path.relpath(self.staging_paths["input"])

jupyter_scheduler/extension.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from jupyter_server.transutils import _i18n
66
from traitlets import Bool, Type, Unicode, default
77

8+
from jupyter_scheduler.download_manager import DownloadManager
9+
from jupyter_scheduler.download_runner import DownloadRunner
810
from jupyter_scheduler.orm import create_tables
911

1012
from .handlers import (
@@ -73,21 +75,33 @@ def initialize_settings(self):
7375

7476
environments_manager = self.environment_manager_class()
7577

78+
download_manager = DownloadManager(db_url=self.db_url)
79+
7680
scheduler = self.scheduler_class(
7781
root_dir=self.serverapp.root_dir,
7882
environments_manager=environments_manager,
7983
db_url=self.db_url,
8084
config=self.config,
85+
download_queue=download_manager.queue,
8186
)
8287

8388
job_files_manager = self.job_files_manager_class(scheduler=scheduler)
8489

90+
download_runner = DownloadRunner(
91+
download_manager=download_manager, job_files_manager=job_files_manager
92+
)
93+
8594
self.settings.update(
8695
environments_manager=environments_manager,
8796
scheduler=scheduler,
8897
job_files_manager=job_files_manager,
98+
download_from_staging=download_manager.download_from_staging,
8999
)
90100

91101
if scheduler.task_runner:
92102
loop = asyncio.get_event_loop()
93103
loop.create_task(scheduler.task_runner.start())
104+
105+
if download_runner:
106+
loop = asyncio.get_event_loop()
107+
loop.create_task(download_runner.start())

jupyter_scheduler/handlers.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,20 +395,29 @@ def get(self):
395395

396396

397397
class FilesDownloadHandler(ExtensionHandlerMixin, APIHandler):
398-
_job_files_manager = None
398+
# _job_files_manager = None
399+
_download_from_staging = None
400+
401+
# @property
402+
# def job_files_manager(self):
403+
# if not self._job_files_manager:
404+
# self._job_files_manager = self.settings.get("job_files_manager", None)
405+
406+
# return self._job_files_manager
399407

400408
@property
401-
def job_files_manager(self):
402-
if not self._job_files_manager:
403-
self._job_files_manager = self.settings.get("job_files_manager", None)
409+
def download_from_staging(self):
410+
if not self._download_from_staging:
411+
self._download_from_staging = self.settings.get("download_from_staging", None)
404412

405-
return self._job_files_manager
413+
return self._download_from_staging
406414

407415
@authenticated
408416
async def get(self, job_id):
409-
redownload = self.get_query_argument("redownload", False)
417+
# redownload = self.get_query_argument("redownload", False)
410418
try:
411-
await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload)
419+
# await self.job_files_manager.copy_from_staging(job_id=job_id, redownload=redownload)
420+
self.download_from_staging(job_id)
412421
except Exception as e:
413422
self.log.exception(e)
414423
raise HTTPError(500, str(e)) from e

jupyter_scheduler/orm.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import os
32
from sqlite3 import OperationalError
43
from uuid import uuid4
54

@@ -112,6 +111,13 @@ class JobDefinition(CommonColumns, Base):
112111
active = Column(Boolean, default=True)
113112

114113

114+
class DownloadCacheRecord(Base):
115+
__tablename__ = "download_cache"
116+
job_id = Column(String(36), primary_key=True)
117+
download_id = Column(String(36), primary_key=True)
118+
download_initiated_time = Column(Integer)
119+
120+
115121
def create_tables(db_url, drop_tables=False):
116122
engine = create_engine(db_url)
117123
try:

0 commit comments

Comments
 (0)