Skip to content

Commit f18071e

Browse files
committed
Remove Dask
1 parent 69b34ca commit f18071e

File tree

4 files changed

+0
-53
lines changed

4 files changed

+0
-53
lines changed

jupyter_scheduler/executors.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1515
from prefect import flow, task
1616
from prefect.futures import as_completed
17-
from prefect_dask.task_runners import DaskTaskRunner
1817

1918
from jupyter_scheduler.models import CreateJob, DescribeJob, JobFeature, Status
2019
from jupyter_scheduler.orm import Job, Workflow, create_session

jupyter_scheduler/extension.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -107,25 +107,3 @@ def initialize_settings(self):
107107
if scheduler.task_runner:
108108
loop = asyncio.get_event_loop()
109109
loop.create_task(scheduler.task_runner.start())
110-
111-
async def stop_extension(self):
112-
"""
113-
Public method called by Jupyter Server when the server is stopping.
114-
This calls the cleanup code defined in `self._stop_exception()` inside
115-
an exception handler, as the server halts if this method raises an
116-
exception.
117-
"""
118-
try:
119-
await self._stop_extension()
120-
except Exception as e:
121-
self.log.error("Jupyter Scheduler raised an exception while stopping:")
122-
self.log.exception(e)
123-
124-
async def _stop_extension(self):
125-
"""
126-
Private method that defines the cleanup code to run when the server is
127-
stopping.
128-
"""
129-
if "scheduler" in self.settings:
130-
scheduler: SchedulerApp = self.settings["scheduler"]
131-
await scheduler.stop_extension()

jupyter_scheduler/scheduler.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import fsspec
77
import psutil
8-
from dask.distributed import Client as DaskClient
98
from distributed import LocalCluster
109
from jupyter_core.paths import jupyter_data_dir
1110
from jupyter_server.transutils import _i18n
@@ -399,12 +398,6 @@ def get_local_output_path(
399398
else:
400399
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
401400

402-
async def stop_extension(self):
403-
"""
404-
Placeholder method for a cleanup code to run when the server is stopping.
405-
"""
406-
pass
407-
408401

409402
class Scheduler(BaseScheduler):
410403
_db_session = None
@@ -419,12 +412,6 @@ class Scheduler(BaseScheduler):
419412
),
420413
)
421414

422-
dask_cluster_url = Unicode(
423-
allow_none=True,
424-
config=True,
425-
help="URL of the Dask cluster to connect to.",
426-
)
427-
428415
db_url = Unicode(help=_i18n("Scheduler database url"))
429416

430417
task_runner = Instance(allow_none=True, klass="jupyter_scheduler.task_runner.BaseTaskRunner")
@@ -444,15 +431,6 @@ def __init__(
444431
if self.task_runner_class:
445432
self.task_runner = self.task_runner_class(scheduler=self, config=config)
446433

447-
self.dask_client: DaskClient = self._get_dask_client()
448-
449-
def _get_dask_client(self):
450-
"""Creates and configures a Dask client."""
451-
if self.dask_cluster_url:
452-
return DaskClient(self.dask_cluster_url)
453-
cluster = LocalCluster(processes=True)
454-
return DaskClient(cluster)
455-
456434
@property
457435
def db_session(self):
458436
if not self._db_session:
@@ -875,13 +853,6 @@ def get_staging_paths(model: Union[DescribeJob, DescribeJobDefinition]) -> Dict[
875853

876854
return staging_paths
877855

878-
async def stop_extension(self):
879-
"""
880-
Cleanup code to run when the server is stopping.
881-
"""
882-
if self.dask_client:
883-
await self.dask_client.close()
884-
885856

886857
class ArchivingScheduler(Scheduler):
887858
"""Scheduler that captures all files in output directory in an archive."""

jupyter_scheduler/workflows.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ async def get(self, workflow_id: str = None):
6363
class WorkflowsTasksHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler):
6464
@authenticated
6565
async def post(self, workflow_id: str):
66-
print("WorkflowsTasksHandler post")
6766
payload = self.get_json_body()
6867
if workflow_id != payload.get("workflow_id"):
6968
raise HTTPError(

0 commit comments

Comments
 (0)