Skip to content

Commit 03a0316

Browse files
committed
ongoing
1 parent 7cfe1c3 commit 03a0316

File tree

2 files changed

+64
-31
lines changed

2 files changed

+64
-31
lines changed
Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,61 @@
1-
from typing import TypeAlias
1+
from dataclasses import asdict, dataclass
2+
from typing import Any, Final, TypeAlias
3+
4+
from dask.typing import Key
5+
from distributed.scheduler import TaskStateState as SchedulerTaskState
6+
from distributed.worker_state_machine import TaskStateState as WorkerTaskState
7+
from models_library.projects_state import RunningState
28

39
DaskJobID: TypeAlias = str
410
DaskResources: TypeAlias = dict[str, int | float]
11+
12+
TASK_LIFE_CYCLE_EVENT: Final[str] = "task-lifecycle-{key}"
13+
_SCHEDULER_TASK_STATE_TO_RUNNING_STATE: Final[
14+
dict[SchedulerTaskState, RunningState]
15+
] = {
16+
"waiting": RunningState.PENDING,
17+
"no-worker": RunningState.WAITING_FOR_RESOURCES,
18+
"queued": RunningState.WAITING_FOR_RESOURCES,
19+
"processing": RunningState.PENDING,
20+
"memory": RunningState.SUCCESS,
21+
"erred": RunningState.FAILED,
22+
"forgotten": RunningState.UNKNOWN,
23+
}
24+
25+
_WORKER_TASK_STATE_TO_RUNNING_STATE: Final[dict[WorkerTaskState, RunningState]] = {
26+
"cancelled": RunningState.UNKNOWN,
27+
"constrained": RunningState.UNKNOWN,
28+
"error": RunningState.UNKNOWN,
29+
"executing": RunningState.UNKNOWN,
30+
"fetch": RunningState.UNKNOWN,
31+
"flight": RunningState.UNKNOWN,
32+
"forgotten": RunningState.UNKNOWN,
33+
"long-running": RunningState.UNKNOWN,
34+
"memory": RunningState.UNKNOWN,
35+
"missing": RunningState.UNKNOWN,
36+
"ready": RunningState.UNKNOWN,
37+
"released": RunningState.UNKNOWN,
38+
"rescheduled": RunningState.UNKNOWN,
39+
"resumed": RunningState.UNKNOWN,
40+
"waiting": RunningState.UNKNOWN,
41+
}
42+
43+
44+
@dataclass
45+
class TaskLifeCycleState:
46+
key: Key
47+
worker: str | None
48+
state: RunningState
49+
50+
@classmethod
51+
def from_scheduler_task_state(
52+
cls, key: Key, worker: str | None, task_state: SchedulerTaskState
53+
) -> "TaskLifeCycleState":
54+
return cls(
55+
key=key,
56+
worker=worker,
57+
state=_SCHEDULER_TASK_STATE_TO_RUNNING_STATE[task_state],
58+
)
59+
60+
def model_dump(self) -> dict[str, Any]:
61+
return asdict(self)
Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,15 @@
11
import logging
2-
from dataclasses import asdict, dataclass
3-
from typing import Any, Final
2+
from typing import Any
43

54
from dask.typing import Key
5+
from dask_task_models_library.models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
66
from distributed import Scheduler, SchedulerPlugin
77
from distributed.scheduler import TaskStateState
8-
from models_library.projects_state import RunningState
98
from servicelib.logging_utils import log_context
109

1110
_logger = logging.getLogger(__name__)
1211

1312

14-
_TASK_LIFE_CYCLE_EVENT: Final[str] = "task-lifecycle-{key}"
15-
_SCHEDULER_TASK_STATE_TO_RUNNING_STATE: Final[dict[TaskStateState, RunningState]] = {}
16-
17-
18-
@dataclass
19-
class TaskLifeCycleState:
20-
key: Key
21-
worker: str | None
22-
state: RunningState
23-
24-
@classmethod
25-
def from_scheduler_task_state(
26-
cls, key: Key, worker: str | None, task_state: TaskStateState
27-
) -> "TaskLifeCycleState":
28-
return cls(
29-
key=key,
30-
worker=worker,
31-
state=_SCHEDULER_TASK_STATE_TO_RUNNING_STATE[task_state],
32-
)
33-
34-
3513
class TaskLifecycleSchedulerPlugin(SchedulerPlugin):
3614
def __init__(self) -> None:
3715
with log_context(
@@ -66,10 +44,8 @@ def transition(
6644
):
6745
assert self.scheduler # nosec
6846
self.scheduler.log_event(
69-
_TASK_LIFE_CYCLE_EVENT.format(key=key),
70-
asdict(
71-
TaskLifeCycleState.from_scheduler_task_state(
72-
key, kwargs.get("worker"), finish
73-
)
74-
),
47+
TASK_LIFE_CYCLE_EVENT.format(key=key),
48+
TaskLifeCycleState.from_scheduler_task_state(
49+
key, kwargs.get("worker"), finish
50+
).model_dump(),
7551
)

0 commit comments

Comments
 (0)