Skip to content

Commit 7abdf27

Browse files
committed
Remove DownloadTask data class, use DescribeDownload for both queue and db records
1 parent 3fc2cc0 commit 7abdf27

File tree

4 files changed

+35
-67
lines changed

4 files changed

+35
-67
lines changed

jupyter_scheduler/download_manager.py

Lines changed: 15 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,43 @@
1-
from dataclasses import dataclass
2-
from datetime import datetime
31
from multiprocessing import Queue
42
from typing import List, Optional
53

6-
from jupyter_scheduler.orm import Downloads, create_session, generate_uuid
7-
from jupyter_scheduler.pydantic_v1 import BaseModel
4+
from jupyter_scheduler.models import DescribeDownload
5+
from jupyter_scheduler.orm import Download, create_session, generate_uuid
86
from jupyter_scheduler.utils import get_utc_timestamp
97
from jupyter_scheduler.pydantic_v1 import BaseModel
108

119

12-
class DescribeDownload(BaseModel):
13-
job_id: str
14-
download_id: str
15-
download_initiated_time: int
16-
17-
class Config:
18-
orm_mode = True
19-
20-
21-
@dataclass
22-
class DownloadTask:
23-
job_id: str
24-
download_id: str
25-
download_initiated_time: int
26-
27-
def __lt__(self, other):
28-
return self.download_initiated_time < other.download_initiated_time
29-
30-
def __str__(self):
31-
download_initiated_time = datetime.fromtimestamp(self.download_initiated_time / 1e3)
32-
return f"Id: {self.job_id}, Download initiated: {download_initiated_time}"
33-
34-
3510
class DownloadRecordManager:
3611
def __init__(self, db_url):
3712
self.session = create_session(db_url)
3813

3914
def put(self, download: DescribeDownload):
4015
with self.session() as session:
41-
new_download = Downloads(**download.dict())
42-
session.add(new_download)
16+
download = Download(**download.dict())
17+
session.add(download)
4318
session.commit()
4419

4520
def get(self, job_id: str) -> Optional[DescribeDownload]:
4621
with self.session() as session:
47-
download = session.query(Downloads).filter(Downloads.job_id == job_id).first()
22+
download = session.query(Download).filter(Download.job_id == job_id).first()
4823

4924
if download:
5025
return DescribeDownload.from_orm(download)
5126
else:
5227
return None
5328

54-
def get_tasks(self) -> List[DescribeDownload]:
29+
def get_downloads(self) -> List[DescribeDownload]:
5530
with self.session() as session:
56-
return session.query(Downloads).order_by(Downloads.download_initiated_time).all()
31+
return session.query(Download).order_by(Download.download_initiated_time).all()
5732

5833
def delete_download(self, download_id: str):
5934
with self.session() as session:
60-
session.query(Downloads).filter(Downloads.download_id == download_id).delete()
35+
session.query(Download).filter(Download.download_id == download_id).delete()
6136
session.commit()
6237

6338
def delete_job_downloads(self, job_id: str):
6439
with self.session() as session:
65-
session.query(Downloads).filter(Downloads.job_id == job_id).delete()
40+
session.query(Download).filter(Download.job_id == job_id).delete()
6641
session.commit()
6742

6843

@@ -74,18 +49,13 @@ def __init__(self, db_url: str):
7449
def download_from_staging(self, job_id: str):
7550
download_initiated_time = get_utc_timestamp()
7651
download_id = generate_uuid()
77-
download_cache = DescribeDownload(
78-
job_id=job_id,
79-
download_id=download_id,
80-
download_initiated_time=download_initiated_time,
81-
)
82-
self.record_manager.put(download_cache)
83-
download_task = DownloadTask(
52+
download = DescribeDownload(
8453
job_id=job_id,
8554
download_id=download_id,
8655
download_initiated_time=download_initiated_time,
8756
)
88-
self.queue.put(download_task)
57+
self.record_manager.put(download)
58+
self.queue.put(download)
8959

9060
def delete_download(self, download_id: str):
9161
self.record_manager.delete_download(download_id)
@@ -94,11 +64,6 @@ def delete_job_downloads(self, job_id: str):
9464
self.record_manager.delete_job_downloads(job_id)
9565

9666
def populate_queue(self):
97-
tasks = self.record_manager.get_tasks()
98-
for task in tasks:
99-
download_task = DownloadTask(
100-
job_id=task.job_id,
101-
download_id=task.download_id,
102-
download_initiated_time=task.download_initiated_time,
103-
)
104-
self.queue.put(download_task)
67+
downloads = self.record_manager.get_downloads()
68+
for download in downloads:
69+
self.queue.put(download)

jupyter_scheduler/executors.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import io
2-
import multiprocessing
32
import os
43
import shutil
54
import tarfile
@@ -14,11 +13,7 @@
1413

1514
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1615
from jupyter_scheduler.orm import Job, create_session
17-
from jupyter_scheduler.download_manager import (
18-
DescribeDownload,
19-
Downloads,
20-
DownloadTask,
21-
)
16+
from jupyter_scheduler.download_manager import DescribeDownload, Download
2217
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
2318
from jupyter_scheduler.orm import Job, create_session, generate_uuid
2419
from jupyter_scheduler.parameterize import add_parameters
@@ -159,25 +154,21 @@ def execute(self):
159154
finally:
160155
self.add_side_effects_files(staging_dir)
161156
self.create_output_files(job, nb)
157+
self._download_from_staging(job.job_id)
162158

163-
def download_from_staging(self, job_id: str):
159+
def _download_from_staging(self, job_id: str):
164160
download_initiated_time = get_utc_timestamp()
165161
download_id = generate_uuid()
166-
download_cache = DescribeDownload(
162+
download = DescribeDownload(
167163
job_id=job_id,
168164
download_id=download_id,
169165
download_initiated_time=download_initiated_time,
170166
)
171167
with self.db_session() as session:
172-
new_download = Downloads(**download_cache.dict())
173-
session.add(new_download)
168+
download_record = Download(**download.dict())
169+
session.add(download_record)
174170
session.commit()
175-
download_task = DownloadTask(
176-
job_id=job_id,
177-
download_id=download_id,
178-
download_initiated_time=download_initiated_time,
179-
)
180-
self.download_queue.put(download_task)
171+
self.download_queue.put(download)
181172

182173
def add_side_effects_files(self, staging_dir: str):
183174
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""

jupyter_scheduler/models.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,3 +295,15 @@ class JobFeature(str, Enum):
295295
output_filename_template = "output_filename_template"
296296
stop_job = "stop_job"
297297
delete_job = "delete_job"
298+
299+
300+
class DescribeDownload(BaseModel):
301+
job_id: str
302+
download_id: str
303+
download_initiated_time: int
304+
305+
class Config:
306+
orm_mode = True
307+
308+
def __str__(self) -> str:
309+
return self.json()

jupyter_scheduler/orm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class JobDefinition(CommonColumns, Base):
111111
active = Column(Boolean, default=True)
112112

113113

114-
class Downloads(Base):
114+
class Download(Base):
115115
__tablename__ = "downloads"
116116
job_id = Column(String(36), primary_key=True)
117117
download_id = Column(String(36), primary_key=True)

0 commit comments

Comments
 (0)