15
15
from traitlets import Unicode , default
16
16
from traitlets .config import LoggingConfigurable
17
17
18
+ from jupyter_scheduler .download_manager import DownloadManager
18
19
from jupyter_scheduler .environments import EnvironmentManager
19
20
from jupyter_scheduler .exceptions import (
20
21
IdempotencyTokenError ,
@@ -404,7 +405,7 @@ def __init__(
404
405
root_dir : str ,
405
406
environments_manager : Type [EnvironmentManager ],
406
407
db_url : str ,
407
- download_queue : mp . Queue ,
408
+ download_manager : DownloadManager ,
408
409
config = None ,
409
410
** kwargs ,
410
411
):
@@ -414,7 +415,7 @@ def __init__(
414
415
self .db_url = db_url
415
416
if self .task_runner_class :
416
417
self .task_runner = self .task_runner_class (scheduler = self , config = config )
417
- self .download_queue = download_queue
418
+ self .download_manager = download_manager
418
419
419
420
@property
420
421
def db_session (self ):
@@ -494,7 +495,7 @@ def create_job(self, model: CreateJob) -> str:
494
495
staging_paths = staging_paths ,
495
496
root_dir = self .root_dir ,
496
497
db_url = self .db_url ,
497
- download_queue = self .download_queue ,
498
+ download_queue = self .download_manager . queue ,
498
499
).process
499
500
)
500
501
p .start ()
@@ -586,6 +587,7 @@ def delete_job(self, job_id: str):
586
587
587
588
session .query (Job ).filter (Job .job_id == job_id ).delete ()
588
589
session .commit ()
590
+ self .download_manager .delete_job_downloads (job_id )
589
591
590
592
def stop_job (self , job_id ):
591
593
with self .db_session () as session :
0 commit comments