Skip to content

Commit c4a6124

Browse files
authored
✨🐛Computational Backend: Introduce Dask plugins for tasks lifecycle (#7686)
1 parent 01eeeaf commit c4a6124

File tree

23 files changed

+617
-240
lines changed

23 files changed

+617
-240
lines changed
Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,72 @@
1-
from typing import TypeAlias
1+
from typing import Final, Literal, TypeAlias
2+
3+
from dask.typing import Key
4+
from distributed.scheduler import TaskStateState as SchedulerTaskState
5+
from distributed.worker_state_machine import TaskStateState as WorkerTaskState
6+
from models_library.projects_state import RunningState
7+
from pydantic import BaseModel
28

39
DaskJobID: TypeAlias = str
410
DaskResources: TypeAlias = dict[str, int | float]
11+
12+
TASK_LIFE_CYCLE_EVENT: Final[str] = "task-lifecycle-{key}"
13+
TASK_RUNNING_PROGRESS_EVENT: Final[str] = "task-progress-{key}"
14+
_SCHEDULER_TASK_STATE_TO_RUNNING_STATE: Final[
15+
dict[SchedulerTaskState, RunningState]
16+
] = {
17+
"released": RunningState.NOT_STARTED, # Known but not actively computing or in memory
18+
"waiting": RunningState.PENDING, # On track to be computed, waiting on dependencies to arrive in memory
19+
"no-worker": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).
20+
"queued": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but all workers are already full.
21+
"processing": RunningState.PENDING, # All dependencies are available and the task is assigned to a worker for compute (the scheduler doesn’t know whether it’s in a worker queue or actively being computed).
22+
"memory": RunningState.SUCCESS, # In memory on one or more workers
23+
"erred": RunningState.FAILED, # Task computation, or one of its dependencies, has encountered an error
24+
"forgotten": RunningState.UNKNOWN, # Task is no longer needed by any client or dependent task, so it disappears from the scheduler as well. As soon as a task reaches this state, it is immediately dereferenced from the scheduler.
25+
}
26+
27+
_WORKER_TASK_STATE_TO_RUNNING_STATE: Final[dict[WorkerTaskState, RunningState]] = {
28+
"cancelled": RunningState.ABORTED, # The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its previous state.
29+
"constrained": RunningState.PENDING, # Like ready, but the user specified resource constraints for this task. The task can be found in the WorkerState.constrained queue.
30+
"error": RunningState.FAILED, # Task execution failed
31+
"executing": RunningState.STARTED, # The task is currently being computed on a thread. It can be found in the WorkerState.executing set and in the distributed.worker.Worker.active_threads dict.
32+
"fetch": RunningState.PENDING, # This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in waiting state, or because the Active Memory Manager requested it to be replicated here. The task can be found in the WorkerState.data_needed heap.
33+
"flight": RunningState.PENDING, # The task data is currently being transferred over the network from another worker. The task can be found in the WorkerState.in_flight_tasks and WorkerState.in_flight_workers collections.
34+
"forgotten": RunningState.UNKNOWN, # The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker.
35+
"long-running": RunningState.STARTED, # Like executing, but the user code called distributed.secede() so the task no longer counts towards the maximum number of concurrent tasks. It can be found in the WorkerState.long_running set and in the distributed.worker.Worker.active_threads dict.
36+
"memory": RunningState.SUCCESS, # Task execution completed, or the task was successfully transferred from another worker, and is now held in either WorkerState.data or WorkerState.actors.
37+
"missing": RunningState.PENDING, # Like fetch, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again to fetch. The task can be found in the WorkerState.missing_dep_flight set.
38+
"ready": RunningState.PENDING, # The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the WorkerState.ready heap.
39+
"released": RunningState.PENDING, # Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.
40+
"rescheduled": RunningState.PENDING, # The task just raised the Reschedule exception. This is a transitory state, which is not stored permanently.
41+
"resumed": RunningState.PENDING, # The task was recovered from cancelled state. See Task cancellation. The task can be found in whatever collections it was in its previous state.
42+
"waiting": RunningState.PENDING, # The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.
43+
}
44+
45+
46+
class TaskLifeCycleState(BaseModel):
47+
key: str
48+
source: Literal["scheduler", "worker"]
49+
worker: str | None
50+
state: RunningState
51+
52+
@classmethod
53+
def from_scheduler_task_state(
54+
cls, key: Key, worker: str | None, task_state: SchedulerTaskState
55+
) -> "TaskLifeCycleState":
56+
return cls(
57+
key=f"{key!r}",
58+
source="scheduler",
59+
worker=worker,
60+
state=_SCHEDULER_TASK_STATE_TO_RUNNING_STATE[task_state],
61+
)
62+
63+
@classmethod
64+
def from_worker_task_state(
65+
cls, key: Key, worker: str | None, task_state: WorkerTaskState
66+
) -> "TaskLifeCycleState":
67+
return cls(
68+
key=f"{key!r}",
69+
source="worker",
70+
worker=worker,
71+
state=_WORKER_TASK_STATE_TO_RUNNING_STATE[task_state],
72+
)

packages/dask-task-models-library/src/dask_task_models_library/plugins/__init__.py

Whitespace-only changes.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# pylint: disable=unused-argument
2+
import logging
3+
from typing import Any
4+
5+
import click
6+
from dask.typing import Key
7+
from distributed import Scheduler, SchedulerPlugin
8+
from distributed.scheduler import TaskStateState
9+
10+
from ..models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
class TaskLifecycleSchedulerPlugin(SchedulerPlugin):
16+
def __init__(self) -> None:
17+
self.scheduler = None
18+
_logger.info("initialized TaskLifecycleSchedulerPlugin")
19+
20+
async def start(self, scheduler: Scheduler) -> None:
21+
self.scheduler = scheduler # type: ignore[assignment]
22+
_logger.info("started TaskLifecycleSchedulerPlugin")
23+
24+
def transition(
25+
self,
26+
key: Key,
27+
start: TaskStateState,
28+
finish: TaskStateState,
29+
*args: Any, # noqa: ARG002
30+
stimulus_id: str,
31+
**kwargs: Any,
32+
):
33+
_logger.debug(
34+
"Task %s transition from %s to %s due to %s",
35+
key,
36+
start,
37+
finish,
38+
stimulus_id,
39+
)
40+
41+
assert self.scheduler # nosec
42+
43+
self.scheduler.log_event(
44+
TASK_LIFE_CYCLE_EVENT.format(key=key),
45+
TaskLifeCycleState.from_scheduler_task_state(
46+
key, kwargs.get("worker"), finish
47+
).model_dump(mode="json"),
48+
)
49+
50+
51+
@click.command()
52+
def dask_setup(scheduler):
53+
plugin = TaskLifecycleSchedulerPlugin()
54+
scheduler.add_plugin(plugin)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
from collections.abc import Awaitable
3+
from typing import Any
4+
5+
import click
6+
from dask.typing import Key
7+
from distributed import WorkerPlugin
8+
from distributed.worker import Worker
9+
from distributed.worker_state_machine import TaskStateState
10+
11+
from ..models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
class TaskLifecycleWorkerPlugin(WorkerPlugin):
17+
def __init__(self) -> None:
18+
self._worker = None
19+
_logger.info("TaskLifecycleWorkerPlugin initialized")
20+
21+
def setup(self, worker: Worker) -> Awaitable[None]:
22+
async def _() -> None:
23+
self._worker = worker # type: ignore[assignment]
24+
_logger.info("TaskLifecycleWorkerPlugin setup completed")
25+
26+
return _()
27+
28+
def transition(
29+
self,
30+
key: Key,
31+
start: TaskStateState,
32+
finish: TaskStateState,
33+
**kwargs: Any,
34+
):
35+
_logger.info("Task '%s' transition from %s to %s", key, start, finish)
36+
assert self._worker # nosec
37+
self._worker.log_event(
38+
TASK_LIFE_CYCLE_EVENT.format(key=key),
39+
TaskLifeCycleState.from_worker_task_state(
40+
key, kwargs.get("worker"), finish
41+
).model_dump(mode="json"),
42+
)
43+
44+
45+
@click.command()
46+
async def dask_setup(worker: Worker) -> None:
47+
plugin = TaskLifecycleWorkerPlugin()
48+
await worker.plugin_add(plugin)

packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ def dask_workers_config() -> dict[str, Any]:
1919
"options": {
2020
"nthreads": 2,
2121
"resources": {"CPU": 2, "RAM": 48e9},
22+
"preload": (
23+
"dask_task_models_library.plugins.task_life_cycle_worker_plugin",
24+
),
2225
},
2326
},
2427
"gpu-worker": {
@@ -30,6 +33,9 @@ def dask_workers_config() -> dict[str, Any]:
3033
"GPU": 1,
3134
"RAM": 48e9,
3235
},
36+
"preload": (
37+
"dask_task_models_library.plugins.task_life_cycle_worker_plugin",
38+
),
3339
},
3440
},
3541
"large-ram-worker": {
@@ -40,6 +46,9 @@ def dask_workers_config() -> dict[str, Any]:
4046
"CPU": 8,
4147
"RAM": 768e9,
4248
},
49+
"preload": (
50+
"dask_task_models_library.plugins.task_life_cycle_worker_plugin",
51+
),
4352
},
4453
},
4554
}
@@ -54,6 +63,9 @@ def dask_scheduler_config(
5463
"options": {
5564
"port": unused_tcp_port_factory(),
5665
"dashboard_address": f":{unused_tcp_port_factory()}",
66+
"preload": (
67+
"dask_task_models_library.plugins.task_life_cycle_scheduler_plugin",
68+
),
5769
},
5870
}
5971

scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,11 @@ def _needs_manual_intervention(
265265
user_id=containers[0].user_id,
266266
project_id=containers[0].project_id,
267267
created_at=containers[0].created_at,
268-
needs_manual_intervention=_needs_manual_intervention(containers),
268+
needs_manual_intervention=_needs_manual_intervention(containers)
269+
and (
270+
(arrow.utcnow().datetime - containers[0].created_at)
271+
> datetime.timedelta(minutes=2)
272+
),
269273
containers=[c.name for c in containers],
270274
service_name=containers[0].service_name,
271275
service_version=containers[0].service_version,

services/clusters-keeper/requirements/ci.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# installs this repo's packages
1515
simcore-aws-library @ ../../packages/aws-library
1616
simcore-common-library @ ../../packages/common-library
17+
simcore-dask-task-models-library @ ../../packages/dask-task-models-library
1718
simcore-models-library @ ../../packages/models-library
1819
pytest-simcore @ ../../packages/pytest-simcore
1920
simcore-service-library[fastapi] @ ../../packages/service-library

services/clusters-keeper/requirements/dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
--editable ../../packages/pytest-simcore
1919
--editable ../../packages/service-library[fastapi]
2020
--editable ../../packages/settings-library
21+
--editable ../../packages/dask-task-models-library
2122

2223
# installs current package
2324
--editable .

services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_plugin.py renamed to services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
class RabbitMQPlugin(distributed.WorkerPlugin):
2525
"""Dask Worker Plugin for RabbitMQ integration"""
2626

27-
name = "rabbitmq_plugin"
27+
name = "rabbitmq_worker_plugin"
2828
_main_thread_loop: AbstractEventLoop | None = None
2929
_client: RabbitMQClient | None = None
3030
_settings: RabbitSettings | None = None
@@ -60,7 +60,7 @@ async def _() -> None:
6060

6161
if threading.current_thread() is not threading.main_thread():
6262
_logger.warning(
63-
"RabbitMQ client plugin setup is not in the main thread! Beware! if in pytest it's ok."
63+
"RabbitMQ client plugin setup is not in the main thread! TIP: if in pytest it's ok."
6464
)
6565

6666
with log_context(
@@ -98,7 +98,7 @@ async def _() -> None:
9898
)
9999
else:
100100
_logger.warning(
101-
"RabbitMQ client plugin setup is not the main thread!"
101+
"RabbitMQ client plugin setup is not the main thread! TIP: if in pytest it's ok."
102102
)
103103

104104
# Cancel the message processor task

services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import logging
22

33
import distributed
4+
from dask_task_models_library.plugins.task_life_cycle_scheduler_plugin import (
5+
TaskLifecycleSchedulerPlugin,
6+
)
47
from servicelib.logging_utils import log_context
58

69
from ._meta import print_dask_scheduler_banner
@@ -19,9 +22,13 @@ async def dask_setup(scheduler: distributed.Scheduler) -> None:
1922

2023
with log_context(_logger, logging.INFO, "Launch dask scheduler"):
2124
_logger.info("app settings: %s", settings.model_dump_json(indent=1))
25+
26+
scheduler.add_plugin(TaskLifecycleSchedulerPlugin())
2227
print_dask_scheduler_banner()
2328

2429

25-
async def dask_teardown(_worker: distributed.Worker) -> None:
26-
with log_context(_logger, logging.INFO, "Tear down dask scheduler"):
30+
async def dask_teardown(scheduler: distributed.Scheduler) -> None:
31+
with log_context(
32+
_logger, logging.INFO, f"Tear down dask scheduler at {scheduler.address}"
33+
):
2734
...

0 commit comments

Comments
 (0)