Skip to content

Commit e835adc

Browse files
committed
improve logs
1 parent e49c424 commit e835adc

File tree

3 files changed

+43
-15
lines changed

3 files changed

+43
-15
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
from ._meta import print_dask_scheduler_banner
77
from .settings import ApplicationSettings
8+
from .task_life_cycle_scheduler_plugin import (
9+
TaskLifecycleSchedulerPlugin,
10+
)
811
from .utils.logs import setup_app_logging
912

1013
_logger = logging.getLogger(__name__)
@@ -19,9 +22,13 @@ async def dask_setup(scheduler: distributed.Scheduler) -> None:
1922

2023
with log_context(_logger, logging.INFO, "Launch dask scheduler"):
2124
_logger.info("app settings: %s", settings.model_dump_json(indent=1))
25+
26+
scheduler.add_plugin(TaskLifecycleSchedulerPlugin())
2227
print_dask_scheduler_banner()
2328

2429

2530
async def dask_teardown(scheduler: distributed.Scheduler) -> None:
26-
with log_context(_logger, logging.INFO, f"Tear down dask {scheduler.address}"):
31+
with log_context(
32+
_logger, logging.INFO, f"Tear down dask scheduler at {scheduler.address}"
33+
):
2734
...
Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,54 @@
1+
import logging
12
from typing import Any
23

34
from dask.typing import Key
4-
from distributed import SchedulerPlugin
5+
from distributed import Scheduler, SchedulerPlugin
56
from distributed.scheduler import TaskStateState
7+
from servicelib.logging_utils import log_context
68

9+
_logger = logging.getLogger(__name__)
710

8-
class SchedulerLifecyclePlugin(SchedulerPlugin):
11+
12+
class TaskLifecycleSchedulerPlugin(SchedulerPlugin):
913
def __init__(self) -> None:
10-
self.scheduler = None
14+
with log_context(
15+
_logger,
16+
logging.INFO,
17+
"TaskLifecycleSchedulerPlugin init",
18+
):
19+
self.scheduler = None
1120

12-
def add_task(self, key, **kwargs):
13-
"""Task published to cluster"""
14-
self.scheduler.log_event(
15-
"task-published",
16-
{"key": key, "timestamp": time.time(), "client": kwargs.get("client")},
17-
)
21+
async def start(self, scheduler: Scheduler) -> None:
22+
with log_context(
23+
_logger,
24+
logging.INFO,
25+
"TaskLifecycleSchedulerPlugin start",
26+
):
27+
self.scheduler = scheduler
1828

1929
def transition(
2030
self,
2131
key: Key,
2232
start: TaskStateState,
2333
finish: TaskStateState,
34+
*args: Any,
35+
stimulus_id: str,
2436
**kwargs: Any,
2537
):
26-
"""State transitions"""
27-
if finish in ("waiting", "processing", "memory", "erred"):
38+
# Start state: one of released, waiting, processing, memory, error
39+
with log_context(
40+
_logger,
41+
logging.INFO,
42+
f"Task {key} transition from {start} to {finish} due to {stimulus_id=}",
43+
):
44+
assert self.scheduler # nosec
2845
self.scheduler.log_event(
29-
f"task-{finish}",
46+
f"task-lifecycle-{key}",
3047
{
3148
"key": key,
3249
"worker": kwargs.get("worker"),
33-
"duration": time.time() - self.scheduler.tasks[key].start_time,
50+
"start": start,
51+
"finish": finish,
52+
"stimulus_id": stimulus_id,
3453
},
3554
)

services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ async def dask_setup(worker: distributed.Worker) -> None:
8383

8484

8585
async def dask_teardown(worker: distributed.Worker) -> None:
86-
with log_context(_logger, logging.INFO, f"tear down dask {worker.address}"):
86+
with log_context(
87+
_logger, logging.INFO, f"tear down dask worker at {worker.address}"
88+
):
8789
...
8890

8991

0 commit comments

Comments
 (0)