Skip to content

Commit 3fc2cc0

Browse files
committed
rename Download tables, use mp.quque directly without a wrapper
1 parent 25e17a5 commit 3fc2cc0

File tree

5 files changed

+32
-99
lines changed

5 files changed

+32
-99
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
from multiprocessing import Queue
44
from typing import List, Optional
55

6-
from jupyter_scheduler.orm import DownloadCacheRecord, create_session, generate_uuid
6+
from jupyter_scheduler.orm import Downloads, create_session, generate_uuid
7+
from jupyter_scheduler.pydantic_v1 import BaseModel
78
from jupyter_scheduler.utils import get_utc_timestamp
89
from jupyter_scheduler.pydantic_v1 import BaseModel
910

1011

11-
class DescribeDownloadCache(BaseModel):
12+
class DescribeDownload(BaseModel):
1213
job_id: str
1314
download_id: str
1415
download_initiated_time: int
@@ -31,80 +32,54 @@ def __str__(self):
3132
return f"Id: {self.job_id}, Download initiated: {download_initiated_time}"
3233

3334

34-
class MultiprocessQueue:
35-
"""A multiprocess-safe queue using multiprocessing.Queue()"""
36-
37-
def __init__(self):
38-
self.queue = Queue()
39-
40-
def put(self, download: DownloadTask):
41-
self.queue.put(download)
42-
43-
def get(self) -> Optional[DownloadTask]:
44-
return self.queue.get() if not self.queue.empty() else None
45-
46-
def isempty(self) -> bool:
47-
return self.queue.empty()
48-
49-
50-
class DownloadCache:
35+
class DownloadRecordManager:
5136
def __init__(self, db_url):
5237
self.session = create_session(db_url)
5338

54-
def put(self, download: DescribeDownloadCache):
39+
def put(self, download: DescribeDownload):
5540
with self.session() as session:
56-
new_download = DownloadCacheRecord(**download.dict())
41+
new_download = Downloads(**download.dict())
5742
session.add(new_download)
5843
session.commit()
5944

60-
def get(self, job_id: str) -> Optional[DescribeDownloadCache]:
45+
def get(self, job_id: str) -> Optional[DescribeDownload]:
6146
with self.session() as session:
62-
download = (
63-
session.query(DownloadCacheRecord)
64-
.filter(DownloadCacheRecord.job_id == job_id)
65-
.first()
66-
)
47+
download = session.query(Downloads).filter(Downloads.job_id == job_id).first()
6748

6849
if download:
69-
return DescribeDownloadCache.from_orm(download)
50+
return DescribeDownload.from_orm(download)
7051
else:
7152
return None
7253

73-
def get_tasks(self) -> List[DescribeDownloadCache]:
54+
def get_tasks(self) -> List[DescribeDownload]:
7455
with self.session() as session:
75-
return (
76-
session.query(DownloadCacheRecord)
77-
.order_by(DownloadCacheRecord.download_initiated_time)
78-
.all()
79-
)
56+
return session.query(Downloads).order_by(Downloads.download_initiated_time).all()
8057

8158
def delete_download(self, download_id: str):
8259
with self.session() as session:
83-
session.query(DownloadCacheRecord).filter(
84-
DownloadCacheRecord.download_id == download_id
85-
).delete()
60+
session.query(Downloads).filter(Downloads.download_id == download_id).delete()
8661
session.commit()
8762

8863
def delete_job_downloads(self, job_id: str):
8964
with self.session() as session:
90-
session.query(DownloadCacheRecord).filter(DownloadCacheRecord.job_id == job_id).delete()
65+
session.query(Downloads).filter(Downloads.job_id == job_id).delete()
9166
session.commit()
9267

9368

9469
class DownloadManager:
9570
def __init__(self, db_url: str):
96-
self.cache = DownloadCache(db_url=db_url)
97-
self.queue = MultiprocessQueue()
71+
self.record_manager = DownloadRecordManager(db_url=db_url)
72+
self.queue = Queue()
9873

9974
def download_from_staging(self, job_id: str):
10075
download_initiated_time = get_utc_timestamp()
10176
download_id = generate_uuid()
102-
download_cache = DescribeDownloadCache(
77+
download_cache = DescribeDownload(
10378
job_id=job_id,
10479
download_id=download_id,
10580
download_initiated_time=download_initiated_time,
10681
)
107-
self.cache.put(download_cache)
82+
self.record_manager.put(download_cache)
10883
download_task = DownloadTask(
10984
job_id=job_id,
11085
download_id=download_id,
@@ -113,13 +88,13 @@ def download_from_staging(self, job_id: str):
11388
self.queue.put(download_task)
11489

11590
def delete_download(self, download_id: str):
116-
self.cache.delete_download(download_id)
91+
self.record_manager.delete_download(download_id)
11792

11893
def delete_job_downloads(self, job_id: str):
119-
self.cache.delete_job_downloads(job_id)
94+
self.record_manager.delete_job_downloads(job_id)
12095

12196
def populate_queue(self):
122-
tasks = self.cache.get_tasks()
97+
tasks = self.record_manager.get_tasks()
12398
for task in tasks:
12499
download_task = DownloadTask(
125100
job_id=task.job_id,

jupyter_scheduler/download_runner.py

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,56 +14,20 @@ def __init__(
1414
self,
1515
download_manager: DownloadManager,
1616
job_files_manager: JobFilesManager,
17-
poll_interval: int = 10,
17+
poll_interval: int = 5,
1818
):
1919
self.download_manager = download_manager
2020
self.job_files_manager = job_files_manager
2121
self.poll_interval = poll_interval
2222

23-
# def add_download(self, job_id: str):
24-
# download_initiated_time = get_utc_timestamp()
25-
# download_id = generate_uuid()
26-
# download_cache = DescribeDownloadCache(
27-
# job_id=job_id,
28-
# download_id=download_id,
29-
# download_initiated_time=download_initiated_time,
30-
# )
31-
# self.download_cache.put(download_cache)
32-
# download_task = DownloadTask(
33-
# job_id=job_id,
34-
# download_id=download_id,
35-
# download_initiated_time=download_initiated_time,
36-
# )
37-
# self.download_queue.put(download_task)
38-
39-
# def delete_download(self, download_id: str):
40-
# self.download_cache.delete_download(download_id)
41-
42-
# def delete_job_downloads(self, job_id: str):
43-
# self.download_cache.delete_job_downloads(job_id)
44-
4523
async def process_download_queue(self):
46-
print("\n\n***\nDownloadRunner.process_download_queue isempty")
47-
print(self.download_manager.queue.isempty())
48-
while not self.download_manager.queue.isempty():
24+
while not self.download_manager.queue.empty():
4925
download = self.download_manager.queue.get()
50-
print(download)
51-
cache = self.download_manager.cache.get(download.job_id)
52-
print(cache)
26+
cache = self.download_manager.record_manager.get(download.job_id)
5327
if not cache or not download:
5428
continue
5529
await self.job_files_manager.copy_from_staging(cache.job_id)
56-
self.download_manager.cache.delete_download(cache.download_id)
57-
58-
# def populate_queue(self):
59-
# tasks = self.download_manager.cache.get_tasks()
60-
# for task in tasks:
61-
# download_task = DownloadTask(
62-
# job_id=task.job_id,
63-
# download_id=task.download_id,
64-
# download_initiated_time=task.download_initiated_time,
65-
# )
66-
# self.download_manager.queue.put(download_task)
30+
self.download_manager.record_manager.delete_download(cache.download_id)
6731

6832
async def start(self):
6933
self.download_manager.populate_queue()

jupyter_scheduler/executors.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1616
from jupyter_scheduler.orm import Job, create_session
1717
from jupyter_scheduler.download_manager import (
18-
DescribeDownloadCache,
19-
DownloadCacheRecord,
18+
DescribeDownload,
19+
Downloads,
2020
DownloadTask,
2121
)
2222
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
@@ -163,13 +163,13 @@ def execute(self):
163163
def download_from_staging(self, job_id: str):
164164
download_initiated_time = get_utc_timestamp()
165165
download_id = generate_uuid()
166-
download_cache = DescribeDownloadCache(
166+
download_cache = DescribeDownload(
167167
job_id=job_id,
168168
download_id=download_id,
169169
download_initiated_time=download_initiated_time,
170170
)
171171
with self.db_session() as session:
172-
new_download = DownloadCacheRecord(**download_cache.dict())
172+
new_download = Downloads(**download_cache.dict())
173173
session.add(new_download)
174174
session.commit()
175175
download_task = DownloadTask(
@@ -178,11 +178,6 @@ def download_from_staging(self, job_id: str):
178178
download_initiated_time=download_initiated_time,
179179
)
180180
self.download_queue.put(download_task)
181-
# print(
182-
# "\n\n***\n ExecutionManager.download_from_staging uuid and task being put on a qeueue"
183-
# )
184-
# print(download_id)
185-
# print(download_task)
186181

187182
def add_side_effects_files(self, staging_dir: str):
188183
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""

jupyter_scheduler/orm.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ class JobDefinition(CommonColumns, Base):
111111
active = Column(Boolean, default=True)
112112

113113

114-
class DownloadCacheRecord(Base):
115-
__tablename__ = "download_cache"
114+
class Downloads(Base):
115+
__tablename__ = "downloads"
116116
job_id = Column(String(36), primary_key=True)
117117
download_id = Column(String(36), primary_key=True)
118118
download_initiated_time = Column(Integer)

jupyter_scheduler/scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from traitlets import Unicode, default
1616
from traitlets.config import LoggingConfigurable
1717

18-
from jupyter_scheduler.download_manager import MultiprocessQueue
1918
from jupyter_scheduler.environments import EnvironmentManager
2019
from jupyter_scheduler.exceptions import (
2120
IdempotencyTokenError,
@@ -405,7 +404,7 @@ def __init__(
405404
root_dir: str,
406405
environments_manager: Type[EnvironmentManager],
407406
db_url: str,
408-
download_queue: MultiprocessQueue,
407+
download_queue: mp.Queue,
409408
config=None,
410409
**kwargs,
411410
):
@@ -495,7 +494,7 @@ def create_job(self, model: CreateJob) -> str:
495494
staging_paths=staging_paths,
496495
root_dir=self.root_dir,
497496
db_url=self.db_url,
498-
download_queue=self.download_queue.queue,
497+
download_queue=self.download_queue,
499498
).process
500499
)
501500
p.start()

0 commit comments

Comments
 (0)