Skip to content

Commit c7534e1

Browse files
committed
sending events about task status
1 parent 03a0316 commit c7534e1

File tree

4 files changed

+66
-50
lines changed

4 files changed

+66
-50
lines changed
Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from dataclasses import asdict, dataclass
2-
from typing import Any, Final, TypeAlias
1+
from typing import Final, Literal, TypeAlias
32

43
from dask.typing import Key
54
from distributed.scheduler import TaskStateState as SchedulerTaskState
65
from distributed.worker_state_machine import TaskStateState as WorkerTaskState
76
from models_library.projects_state import RunningState
7+
from pydantic import BaseModel
88

99
DaskJobID: TypeAlias = str
1010
DaskResources: TypeAlias = dict[str, int | float]
@@ -13,37 +13,38 @@
1313
_SCHEDULER_TASK_STATE_TO_RUNNING_STATE: Final[
1414
dict[SchedulerTaskState, RunningState]
1515
] = {
16-
"waiting": RunningState.PENDING,
17-
"no-worker": RunningState.WAITING_FOR_RESOURCES,
18-
"queued": RunningState.WAITING_FOR_RESOURCES,
19-
"processing": RunningState.PENDING,
20-
"memory": RunningState.SUCCESS,
21-
"erred": RunningState.FAILED,
22-
"forgotten": RunningState.UNKNOWN,
16+
"released": RunningState.NOT_STARTED, # Known but not actively computing or in memory
17+
"waiting": RunningState.PENDING, # On track to be computed, waiting on dependencies to arrive in memory
18+
"no-worker": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).
19+
"queued": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but all workers are already full.
20+
"processing": RunningState.PENDING, # All dependencies are available and the task is assigned to a worker for compute (the scheduler doesn’t know whether it’s in a worker queue or actively being computed).
21+
"memory": RunningState.SUCCESS, # In memory on one or more workers
22+
"erred": RunningState.FAILED, # Task computation, or one of its dependencies, has encountered an error
23+
"forgotten": RunningState.UNKNOWN, # Task is no longer needed by any client or dependent task, so it disappears from the scheduler as well. As soon as a task reaches this state, it is immediately dereferenced from the scheduler.
2324
}
2425

2526
_WORKER_TASK_STATE_TO_RUNNING_STATE: Final[dict[WorkerTaskState, RunningState]] = {
26-
"cancelled": RunningState.UNKNOWN,
27-
"constrained": RunningState.UNKNOWN,
28-
"error": RunningState.UNKNOWN,
29-
"executing": RunningState.UNKNOWN,
30-
"fetch": RunningState.UNKNOWN,
31-
"flight": RunningState.UNKNOWN,
32-
"forgotten": RunningState.UNKNOWN,
33-
"long-running": RunningState.UNKNOWN,
34-
"memory": RunningState.UNKNOWN,
35-
"missing": RunningState.UNKNOWN,
36-
"ready": RunningState.UNKNOWN,
37-
"released": RunningState.UNKNOWN,
38-
"rescheduled": RunningState.UNKNOWN,
39-
"resumed": RunningState.UNKNOWN,
40-
"waiting": RunningState.UNKNOWN,
27+
"cancelled": RunningState.ABORTED, # The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its previous state.
28+
"constrained": RunningState.PENDING, # Like ready, but the user specified resource constraints for this task. The task can be found in the WorkerState.constrained queue.
29+
"error": RunningState.FAILED, # Task execution failed
30+
"executing": RunningState.STARTED, # The task is currently being computed on a thread. It can be found in the WorkerState.executing set and in the distributed.worker.Worker.active_threads dict.
31+
"fetch": RunningState.PENDING, # This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in waiting state, or because the Active Memory Manager requested it to be replicated here. The task can be found in the WorkerState.data_needed heap.
32+
"flight": RunningState.PENDING, # The task data is currently being transferred over the network from another worker. The task can be found in the WorkerState.in_flight_tasks and WorkerState.in_flight_workers collections.
33+
"forgotten": RunningState.UNKNOWN, # The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker.
34+
"long-running": RunningState.STARTED, # Like executing, but the user code called distributed.secede() so the task no longer counts towards the maximum number of concurrent tasks. It can be found in the WorkerState.long_running set and in the distributed.worker.Worker.active_threads dict.
35+
"memory": RunningState.SUCCESS, # Task execution completed, or the task was successfully transferred from another worker, and is now held in either WorkerState.data or WorkerState.actors.
36+
"missing": RunningState.PENDING, # Like fetch, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again to fetch. The task can be found in the WorkerState.missing_dep_flight set.
37+
"ready": RunningState.PENDING, # The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the WorkerState.ready heap.
38+
"released": RunningState.PENDING, # Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.
39+
"rescheduled": RunningState.PENDING, # The task just raised the Reschedule exception. This is a transitory state, which is not stored permanently.
40+
"resumed": RunningState.PENDING, # The task was recovered from cancelled state. See Task cancellation. The task can be found in whatever collections it was in its previous state.
41+
"waiting": RunningState.PENDING, # The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.
4142
}
4243

4344

44-
@dataclass
45-
class TaskLifeCycleState:
46-
key: Key
45+
class TaskLifeCycleState(BaseModel):
46+
key: str
47+
source: Literal["scheduler", "worker"]
4748
worker: str | None
4849
state: RunningState
4950

@@ -52,10 +53,19 @@ def from_scheduler_task_state(
5253
cls, key: Key, worker: str | None, task_state: SchedulerTaskState
5354
) -> "TaskLifeCycleState":
5455
return cls(
55-
key=key,
56+
key=f"{key!r}",
57+
source="scheduler",
5658
worker=worker,
5759
state=_SCHEDULER_TASK_STATE_TO_RUNNING_STATE[task_state],
5860
)
5961

60-
def model_dump(self) -> dict[str, Any]:
61-
return asdict(self)
62+
@classmethod
63+
def from_worker_task_state(
64+
cls, key: Key, worker: str | None, task_state: WorkerTaskState
65+
) -> "TaskLifeCycleState":
66+
return cls(
67+
key=f"{key!r}",
68+
source="worker",
69+
worker=worker,
70+
state=_WORKER_TASK_STATE_TO_RUNNING_STATE[task_state],
71+
)

services/dask-sidecar/src/simcore_service_dask_sidecar/task_life_cycle_scheduler_plugin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ def transition(
3636
stimulus_id: str,
3737
**kwargs: Any,
3838
):
39-
# Start state: one of released, waiting, processing, memory, error
4039
with log_context(
4140
_logger,
4241
logging.INFO,
43-
f"Task {key} transition from {start} to {finish} due to {stimulus_id=}",
42+
f"Task {key!r} transition from {start} to {finish} due to {stimulus_id=}",
4443
):
4544
assert self.scheduler # nosec
45+
4646
self.scheduler.log_event(
4747
TASK_LIFE_CYCLE_EVENT.format(key=key),
4848
TaskLifeCycleState.from_scheduler_task_state(
4949
key, kwargs.get("worker"), finish
50-
).model_dump(),
50+
).model_dump(mode="json"),
5151
)

services/dask-sidecar/src/simcore_service_dask_sidecar/task_life_cycle_worker_plugin.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from typing import Any
44

55
from dask.typing import Key
6-
from distributed import Worker, WorkerPlugin
7-
from distributed.scheduler import TaskStateState
6+
from dask_task_models_library.models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
7+
from distributed import WorkerPlugin
8+
from distributed.worker import Worker
9+
from distributed.worker_state_machine import TaskStateState
810
from servicelib.logging_utils import log_context
911

1012
_logger = logging.getLogger(__name__)
@@ -17,7 +19,7 @@ def __init__(self) -> None:
1719
logging.INFO,
1820
"TaskLifecycleWorkerPlugin init",
1921
):
20-
self.worker = None
22+
self._worker = None
2123

2224
def setup(self, worker: Worker) -> Awaitable[None]:
2325
async def _() -> None:
@@ -26,7 +28,8 @@ async def _() -> None:
2628
logging.INFO,
2729
"TaskLifecycleWorkerPlugin start",
2830
):
29-
self.worker = worker
31+
assert worker # nosec
32+
self._worker = worker
3033

3134
return _()
3235

@@ -37,19 +40,15 @@ def transition(
3740
finish: TaskStateState,
3841
**kwargs: Any,
3942
):
40-
# Start state: one of released, waiting, processing, memory, error
4143
with log_context(
4244
_logger,
4345
logging.INFO,
44-
f"Task {key} transition from {start} to {finish}",
46+
f"Task {key!r} transition from {start} to {finish}",
4547
):
46-
assert self.worker # nosec
47-
self.worker.log_event(
48-
f"task-lifecycle-{key}",
49-
{
50-
"key": key,
51-
"worker": kwargs.get("worker"),
52-
"start": start,
53-
"finish": finish,
54-
},
48+
assert self._worker # nosec
49+
self._worker.log_event(
50+
TASK_LIFE_CYCLE_EVENT.format(key=key),
51+
TaskLifeCycleState.from_worker_task_state(
52+
key, kwargs.get("worker"), finish
53+
).model_dump(mode="json"),
5554
)

services/dask-sidecar/tests/unit/test_scheduler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
3+
# pylint: disable=unused-variable
4+
# pylint: disable=no-member
5+
16
import time
27

38
import distributed
49
import pytest
10+
from dask_task_models_library.models import TASK_LIFE_CYCLE_EVENT
511

612
pytest_simcore_core_services_selection = [
713
"rabbit",
@@ -20,7 +26,8 @@ def _some_failing_task() -> None:
2026

2127
future = dask_client.submit(_some_task)
2228
assert future.result(timeout=10) == 2
23-
events = dask_client.get_events(f"task-lifecycle-{future.key}")
29+
30+
events = dask_client.get_events(TASK_LIFE_CYCLE_EVENT.format(key=future.key))
2431
print("XXXX received events:")
2532
assert events
2633
assert isinstance(events, tuple)
@@ -30,7 +37,7 @@ def _some_failing_task() -> None:
3037
future = dask_client.submit(_some_failing_task)
3138
with pytest.raises(RuntimeError):
3239
future.result(timeout=10)
33-
events = dask_client.get_events(f"task-lifecycle-{future.key}")
40+
events = dask_client.get_events(TASK_LIFE_CYCLE_EVENT.format(key=future.key))
3441
print("XXXX received events:")
3542
assert events
3643
assert isinstance(events, tuple)

0 commit comments

Comments
 (0)