11import logging
2- from typing import Any
2+ from dataclasses import asdict , dataclass
3+ from typing import Any , Final
34
45from dask .typing import Key
56from distributed import Scheduler , SchedulerPlugin
67from distributed .scheduler import TaskStateState
8+ from models_library .projects_state import RunningState
79from servicelib .logging_utils import log_context
810
911_logger = logging .getLogger (__name__ )
1012
1113
14+ _TASK_LIFE_CYCLE_EVENT : Final [str ] = "task-lifecycle-{key}"
15+ _SCHEDULER_TASK_STATE_TO_RUNNING_STATE : Final [dict [TaskStateState , RunningState ]] = {}
16+
17+
18+ @dataclass
19+ class TaskLifeCycleState :
20+ key : Key
21+ worker : str | None
22+ state : RunningState
23+
24+ @classmethod
25+ def from_scheduler_task_state (
26+ cls , key : Key , worker : str | None , task_state : TaskStateState
27+ ) -> "TaskLifeCycleState" :
28+ return cls (
29+ key = key ,
30+ worker = worker ,
31+ state = _SCHEDULER_TASK_STATE_TO_RUNNING_STATE [task_state ],
32+ )
33+
34+
1235class TaskLifecycleSchedulerPlugin (SchedulerPlugin ):
1336 def __init__ (self ) -> None :
1437 with log_context (
@@ -31,7 +54,7 @@ def transition(
3154 key : Key ,
3255 start : TaskStateState ,
3356 finish : TaskStateState ,
34- * args : Any ,
57+ * args : Any , # noqa: ARG002
3558 stimulus_id : str ,
3659 ** kwargs : Any ,
3760 ):
@@ -43,12 +66,10 @@ def transition(
4366 ):
4467 assert self .scheduler # nosec
4568 self .scheduler .log_event (
46- f"task-lifecycle-{ key } " ,
47- {
48- "key" : key ,
49- "worker" : kwargs .get ("worker" ),
50- "start" : start ,
51- "finish" : finish ,
52- "stimulus_id" : stimulus_id ,
53- },
69+ _TASK_LIFE_CYCLE_EVENT .format (key = key ),
70+ asdict (
71+ TaskLifeCycleState .from_scheduler_task_state (
72+ key , kwargs .get ("worker" ), finish
73+ )
74+ ),
5475 )
0 commit comments