Skip to content

Commit c3cf181

Browse files
committed
now we also test the scheduler preload
1 parent 437adab commit c3cf181

File tree

4 files changed

+47
-2
lines changed

4 files changed

+47
-2
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@ async def dask_setup(scheduler: distributed.Scheduler) -> None:
2222
print_dask_scheduler_banner()
2323

2424

25-
async def dask_teardown(_worker: distributed.Worker) -> None:
26-
with log_context(_logger, logging.INFO, "Tear down dask scheduler"):
25+
async def dask_teardown(scheduler: distributed.Scheduler) -> None:
26+
with log_context(_logger, logging.INFO, f"Tear down dask {scheduler.address}"):
2727
...
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from typing import Any
2+
3+
from dask.typing import Key
4+
from distributed import SchedulerPlugin
5+
from distributed.scheduler import TaskStateState
6+
7+
8+
class SchedulerLifecyclePlugin(SchedulerPlugin):
9+
def __init__(self) -> None:
10+
self.scheduler = None
11+
12+
def add_task(self, key, **kwargs):
13+
"""Task published to cluster"""
14+
self.scheduler.log_event(
15+
"task-published",
16+
{"key": key, "timestamp": time.time(), "client": kwargs.get("client")},
17+
)
18+
19+
def transition(
20+
self,
21+
key: Key,
22+
start: TaskStateState,
23+
finish: TaskStateState,
24+
**kwargs: Any,
25+
):
26+
"""State transitions"""
27+
if finish in ("waiting", "processing", "memory", "erred"):
28+
self.scheduler.log_event(
29+
f"task-{finish}",
30+
{
31+
"key": key,
32+
"worker": kwargs.get("worker"),
33+
"duration": time.time() - self.scheduler.tasks[key].start_time,
34+
},
35+
)

services/dask-sidecar/tests/unit/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def local_cluster(app_environment: EnvVarsDict) -> Iterator[distributed.LocalClu
117117
with distributed.LocalCluster(
118118
worker_class=distributed.Worker,
119119
resources={"CPU": 10, "GPU": 10},
120+
scheduler_kwargs={"preload": "simcore_service_dask_sidecar.scheduler"},
120121
preload="simcore_service_dask_sidecar.worker",
121122
) as cluster:
122123
assert cluster
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import distributed
2+
3+
pytest_simcore_core_services_selection = [
4+
"rabbit",
5+
]
6+
7+
8+
def test_scheduler(dask_client: distributed.Client) -> None:
9+
assert True

0 commit comments

Comments
 (0)