Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f5acf41
now we also test the scheduler preload
sanderegg May 15, 2025
6963a8c
improve logs
sanderegg May 15, 2025
4ca2bd3
show manual intervention needed if started since >2 minutes
sanderegg May 15, 2025
2107619
add some better reason
sanderegg May 15, 2025
6224bf8
added life cycle plugins
sanderegg May 15, 2025
8d302cb
renamed
sanderegg May 16, 2025
937143a
refactor
sanderegg May 16, 2025
4014d3d
ongoing
sanderegg May 16, 2025
9f5d9aa
sending events about task status
sanderegg May 16, 2025
a451c39
check returned states
sanderegg May 16, 2025
a3a3cbf
rename
sanderegg May 16, 2025
f46aff5
mypy
sanderegg May 16, 2025
38c0a7a
seems to work
sanderegg May 16, 2025
ced8860
added get_task_status2
sanderegg May 16, 2025
93dc096
ongoing removal of old stuff
sanderegg May 16, 2025
c1ea3f8
ongoing
sanderegg May 19, 2025
b63f4fa
removed daskstate and upgrade test using running state
sanderegg May 19, 2025
75768fa
release constraint
sanderegg May 19, 2025
f6ea7c5
added dependency to dask-sidecar
sanderegg May 19, 2025
08438b9
removed old parts
sanderegg May 19, 2025
a62fd7f
ensure plugins are loaded for tests
sanderegg May 19, 2025
0f8b59d
added dependency
sanderegg May 19, 2025
63ee2c1
allows to setup plugins
sanderegg May 19, 2025
3f40d64
make the call async
sanderegg May 19, 2025
4d6ea9c
the test now runs
sanderegg May 19, 2025
7e0faa6
set a timeout
sanderegg May 19, 2025
5cb2fc0
mypy
sanderegg May 19, 2025
65bb64e
moved task life cycle plugins to dask-task-library
sanderegg May 19, 2025
65262ca
removed dependency to dask-sidecar
sanderegg May 19, 2025
d5e4314
fixed paths
sanderegg May 19, 2025
c9b3698
moved plugins down one level
sanderegg May 19, 2025
4084721
manage to poll task progress now
sanderegg May 19, 2025
3e3aab1
revert to have events
sanderegg May 19, 2025
eb4debb
update notes
sanderegg May 19, 2025
205c968
added specific events to be more effective
sanderegg May 19, 2025
443f596
remove servicelib from dask-task-models
sanderegg May 19, 2025
27d832c
added dependency for testing
sanderegg May 19, 2025
4ec4d40
mypy
sanderegg May 19, 2025
c8bec8f
fixed tests
sanderegg May 19, 2025
15b135e
pylint
sanderegg May 19, 2025
ffbc5a9
fix difference
sanderegg May 19, 2025
1f0ac5f
language
sanderegg May 19, 2025
23c1fcb
pylint
sanderegg May 19, 2025
3230606
pylint
sanderegg May 19, 2025
ab92c70
timeout increase
sanderegg May 19, 2025
1567bdd
revert changes
sanderegg May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,72 @@
from typing import TypeAlias
from typing import Final, Literal, TypeAlias

from dask.typing import Key
from distributed.scheduler import TaskStateState as SchedulerTaskState
from distributed.worker_state_machine import TaskStateState as WorkerTaskState
from models_library.projects_state import RunningState
from pydantic import BaseModel

DaskJobID: TypeAlias = str
DaskResources: TypeAlias = dict[str, int | float]

TASK_LIFE_CYCLE_EVENT: Final[str] = "task-lifecycle-{key}"
TASK_RUNNING_PROGRESS_EVENT: Final[str] = "task-progress-{key}"
_SCHEDULER_TASK_STATE_TO_RUNNING_STATE: Final[
dict[SchedulerTaskState, RunningState]
] = {
"released": RunningState.NOT_STARTED, # Known but not actively computing or in memory
"waiting": RunningState.PENDING, # On track to be computed, waiting on dependencies to arrive in memory
"no-worker": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).
"queued": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but all workers are already full.
"processing": RunningState.PENDING, # All dependencies are available and the task is assigned to a worker for compute (the scheduler doesn’t know whether it’s in a worker queue or actively being computed).
"memory": RunningState.SUCCESS, # In memory on one or more workers
"erred": RunningState.FAILED, # Task computation, or one of its dependencies, has encountered an error
"forgotten": RunningState.UNKNOWN, # Task is no longer needed by any client or dependent task, so it disappears from the scheduler as well. As soon as a task reaches this state, it is immediately dereferenced from the scheduler.
}

_WORKER_TASK_STATE_TO_RUNNING_STATE: Final[dict[WorkerTaskState, RunningState]] = {
"cancelled": RunningState.ABORTED, # The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its previous state.
"constrained": RunningState.PENDING, # Like ready, but the user specified resource constraints for this task. The task can be found in the WorkerState.constrained queue.
"error": RunningState.FAILED, # Task execution failed
"executing": RunningState.STARTED, # The task is currently being computed on a thread. It can be found in the WorkerState.executing set and in the distributed.worker.Worker.active_threads dict.
"fetch": RunningState.PENDING, # This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in waiting state, or because the Active Memory Manager requested it to be replicated here. The task can be found in the WorkerState.data_needed heap.
"flight": RunningState.PENDING, # The task data is currently being transferred over the network from another worker. The task can be found in the WorkerState.in_flight_tasks and WorkerState.in_flight_workers collections.
"forgotten": RunningState.UNKNOWN, # The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker.
"long-running": RunningState.STARTED, # Like executing, but the user code called distributed.secede() so the task no longer counts towards the maximum number of concurrent tasks. It can be found in the WorkerState.long_running set and in the distributed.worker.Worker.active_threads dict.
"memory": RunningState.SUCCESS, # Task execution completed, or the task was successfully transferred from another worker, and is now held in either WorkerState.data or WorkerState.actors.
"missing": RunningState.PENDING, # Like fetch, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again to fetch. The task can be found in the WorkerState.missing_dep_flight set.
"ready": RunningState.PENDING, # The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the WorkerState.ready heap.
"released": RunningState.PENDING, # Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.
"rescheduled": RunningState.PENDING, # The task just raised the Reschedule exception. This is a transitory state, which is not stored permanently.
"resumed": RunningState.PENDING, # The task was recovered from cancelled state. See Task cancellation. The task can be found in whatever collections it was in its previous state.
"waiting": RunningState.PENDING, # The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.
}


class TaskLifeCycleState(BaseModel):
key: str
source: Literal["scheduler", "worker"]
worker: str | None
state: RunningState

@classmethod
def from_scheduler_task_state(
cls, key: Key, worker: str | None, task_state: SchedulerTaskState
) -> "TaskLifeCycleState":
return cls(
key=f"{key!r}",
source="scheduler",
worker=worker,
state=_SCHEDULER_TASK_STATE_TO_RUNNING_STATE[task_state],
)

@classmethod
def from_worker_task_state(
cls, key: Key, worker: str | None, task_state: WorkerTaskState
) -> "TaskLifeCycleState":
return cls(
key=f"{key!r}",
source="worker",
worker=worker,
state=_WORKER_TASK_STATE_TO_RUNNING_STATE[task_state],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# pylint: disable=unused-argument
import logging
from typing import Any

import click
from dask.typing import Key
from distributed import Scheduler, SchedulerPlugin
from distributed.scheduler import TaskStateState

from ..models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState

_logger = logging.getLogger(__name__)


class TaskLifecycleSchedulerPlugin(SchedulerPlugin):
def __init__(self) -> None:
self.scheduler = None
_logger.info("initialized TaskLifecycleSchedulerPlugin")

async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler # type: ignore[assignment]
_logger.info("started TaskLifecycleSchedulerPlugin")

def transition(
self,
key: Key,
start: TaskStateState,
finish: TaskStateState,
*args: Any, # noqa: ARG002
stimulus_id: str,
**kwargs: Any,
):
_logger.debug(
"Task %s transition from %s to %s due to %s",
key,
start,
finish,
stimulus_id,
)

assert self.scheduler # nosec

self.scheduler.log_event(
TASK_LIFE_CYCLE_EVENT.format(key=key),
TaskLifeCycleState.from_scheduler_task_state(
key, kwargs.get("worker"), finish
).model_dump(mode="json"),
)


@click.command()
def dask_setup(scheduler):
plugin = TaskLifecycleSchedulerPlugin()
scheduler.add_plugin(plugin)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
from collections.abc import Awaitable
from typing import Any

import click
from dask.typing import Key
from distributed import WorkerPlugin
from distributed.worker import Worker
from distributed.worker_state_machine import TaskStateState

from ..models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState

_logger = logging.getLogger(__name__)


class TaskLifecycleWorkerPlugin(WorkerPlugin):
def __init__(self) -> None:
self._worker = None
_logger.info("TaskLifecycleWorkerPlugin initialized")

def setup(self, worker: Worker) -> Awaitable[None]:
async def _() -> None:
self._worker = worker # type: ignore[assignment]
_logger.info("TaskLifecycleWorkerPlugin setup completed")

return _()

def transition(
self,
key: Key,
start: TaskStateState,
finish: TaskStateState,
**kwargs: Any,
):
_logger.info("Task '%s' transition from %s to %s", key, start, finish)
assert self._worker # nosec
self._worker.log_event(
TASK_LIFE_CYCLE_EVENT.format(key=key),
TaskLifeCycleState.from_worker_task_state(
key, kwargs.get("worker"), finish
).model_dump(mode="json"),
)


@click.command()
async def dask_setup(worker: Worker) -> None:
plugin = TaskLifecycleWorkerPlugin()
await worker.plugin_add(plugin)
12 changes: 12 additions & 0 deletions packages/pytest-simcore/src/pytest_simcore/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def dask_workers_config() -> dict[str, Any]:
"options": {
"nthreads": 2,
"resources": {"CPU": 2, "RAM": 48e9},
"preload": (
"dask_task_models_library.plugins.task_life_cycle_worker_plugin",
),
},
},
"gpu-worker": {
Expand All @@ -30,6 +33,9 @@ def dask_workers_config() -> dict[str, Any]:
"GPU": 1,
"RAM": 48e9,
},
"preload": (
"dask_task_models_library.plugins.task_life_cycle_worker_plugin",
),
},
},
"large-ram-worker": {
Expand All @@ -40,6 +46,9 @@ def dask_workers_config() -> dict[str, Any]:
"CPU": 8,
"RAM": 768e9,
},
"preload": (
"dask_task_models_library.plugins.task_life_cycle_worker_plugin",
),
},
},
}
Expand All @@ -54,6 +63,9 @@ def dask_scheduler_config(
"options": {
"port": unused_tcp_port_factory(),
"dashboard_address": f":{unused_tcp_port_factory()}",
"preload": (
"dask_task_models_library.plugins.task_life_cycle_scheduler_plugin",
),
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@ def _needs_manual_intervention(
user_id=containers[0].user_id,
project_id=containers[0].project_id,
created_at=containers[0].created_at,
needs_manual_intervention=_needs_manual_intervention(containers),
needs_manual_intervention=_needs_manual_intervention(containers)
and (
(arrow.utcnow().datetime - containers[0].created_at)
> datetime.timedelta(minutes=2)
),
containers=[c.name for c in containers],
service_name=containers[0].service_name,
service_version=containers[0].service_version,
Expand Down
1 change: 1 addition & 0 deletions services/clusters-keeper/requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# installs this repo's packages
simcore-aws-library @ ../../packages/aws-library
simcore-common-library @ ../../packages/common-library
simcore-dask-task-models-library @ ../../packages/dask-task-models-library
simcore-models-library @ ../../packages/models-library
pytest-simcore @ ../../packages/pytest-simcore
simcore-service-library[fastapi] @ ../../packages/service-library
Expand Down
1 change: 1 addition & 0 deletions services/clusters-keeper/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
--editable ../../packages/pytest-simcore
--editable ../../packages/service-library[fastapi]
--editable ../../packages/settings-library
--editable ../../packages/dask-task-models-library

# installs current package
--editable .
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class RabbitMQPlugin(distributed.WorkerPlugin):
"""Dask Worker Plugin for RabbitMQ integration"""

name = "rabbitmq_plugin"
name = "rabbitmq_worker_plugin"
_main_thread_loop: AbstractEventLoop | None = None
_client: RabbitMQClient | None = None
_settings: RabbitSettings | None = None
Expand Down Expand Up @@ -60,7 +60,7 @@ async def _() -> None:

if threading.current_thread() is not threading.main_thread():
_logger.warning(
"RabbitMQ client plugin setup is not in the main thread! Beware! if in pytest it's ok."
"RabbitMQ client plugin setup is not in the main thread! TIP: if in pytest it's ok."
)

with log_context(
Expand Down Expand Up @@ -98,7 +98,7 @@ async def _() -> None:
)
else:
_logger.warning(
"RabbitMQ client plugin setup is not the main thread!"
"RabbitMQ client plugin setup is not the main thread! TIP: if in pytest it's ok."
)

# Cancel the message processor task
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging

import distributed
from dask_task_models_library.plugins.task_life_cycle_scheduler_plugin import (
TaskLifecycleSchedulerPlugin,
)
from servicelib.logging_utils import log_context

from ._meta import print_dask_scheduler_banner
Expand All @@ -19,9 +22,13 @@ async def dask_setup(scheduler: distributed.Scheduler) -> None:

with log_context(_logger, logging.INFO, "Launch dask scheduler"):
_logger.info("app settings: %s", settings.model_dump_json(indent=1))

scheduler.add_plugin(TaskLifecycleSchedulerPlugin())
print_dask_scheduler_banner()


async def dask_teardown(_worker: distributed.Worker) -> None:
with log_context(_logger, logging.INFO, "Tear down dask scheduler"):
async def dask_teardown(scheduler: distributed.Scheduler) -> None:
with log_context(
_logger, logging.INFO, f"Tear down dask scheduler at {scheduler.address}"
):
...
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
)
from dask_task_models_library.container_tasks.io import TaskCancelEventName
from dask_task_models_library.container_tasks.protocol import TaskOwner
from dask_task_models_library.models import TASK_RUNNING_PROGRESS_EVENT
from distributed.worker import get_worker
from distributed.worker_state_machine import TaskState
from models_library.progress_bar import ProgressReport
from models_library.rabbitmq_messages import LoggerRabbitMessage
from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context

from ..rabbitmq_plugin import get_rabbitmq_client
from ..rabbitmq_worker_plugin import get_rabbitmq_client

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -178,4 +179,10 @@ def publish_event(
log_catch(_logger, reraise=False),
log_context(_logger, logging.DEBUG, msg=f"publishing {event=}"),
):
worker.log_event(TaskProgressEvent.topic_name(), event.model_dump_json())
worker.log_event(
[
TaskProgressEvent.topic_name(),
TASK_RUNNING_PROGRESS_EVENT.format(key=event.job_id),
],
event.model_dump_json(),
)
16 changes: 13 additions & 3 deletions services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
ContainerTaskParameters,
LogFileUploadURL,
)
from dask_task_models_library.plugins.task_life_cycle_worker_plugin import (
TaskLifecycleWorkerPlugin,
)
from servicelib.logging_utils import log_context
from settings_library.s3 import S3Settings

from ._meta import print_dask_sidecar_banner
from .computational_sidecar.core import ComputationalSidecar
from .rabbitmq_plugin import RabbitMQPlugin
from .rabbitmq_worker_plugin import RabbitMQPlugin
from .settings import ApplicationSettings
from .utils.dask import (
TaskPublisher,
Expand Down Expand Up @@ -76,14 +79,21 @@ async def dask_setup(worker: distributed.Worker) -> None:
RabbitMQPlugin(settings.DASK_SIDECAR_RABBITMQ), catch_errors=False
)
except Exception:
await worker.close()
await worker.close(reason="failed to add RabbitMQ plugin")
raise
try:
await worker.plugin_add(TaskLifecycleWorkerPlugin(), catch_errors=False)
except Exception:
await worker.close(reason="failed to add TaskLifecycleWorkerPlugin")
raise

print_dask_sidecar_banner()


async def dask_teardown(worker: distributed.Worker) -> None:
with log_context(_logger, logging.INFO, f"tear down dask {worker.address}"):
with log_context(
_logger, logging.INFO, f"tear down dask worker at {worker.address}"
):
...


Expand Down
1 change: 1 addition & 0 deletions services/dask-sidecar/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def local_cluster(app_environment: EnvVarsDict) -> Iterator[distributed.LocalClu
with distributed.LocalCluster(
worker_class=distributed.Worker,
resources={"CPU": 10, "GPU": 10},
scheduler_kwargs={"preload": "simcore_service_dask_sidecar.scheduler"},
preload="simcore_service_dask_sidecar.worker",
) as cluster:
assert cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def dask_subsystem_mock(
)
# mock dask rabbitmq plugin
mock_dask_rabbitmq_plugin = mocker.patch(
"simcore_service_dask_sidecar.rabbitmq_plugin.RabbitMQPlugin", autospec=True
"simcore_service_dask_sidecar.rabbitmq_worker_plugin.RabbitMQPlugin",
autospec=True,
)
mock_rabbitmq_client = create_rabbitmq_client("pytest_dask_sidecar_logs_publisher")
mock_dask_rabbitmq_plugin.get_client.return_value = mock_rabbitmq_client
Expand Down Expand Up @@ -505,7 +506,7 @@ async def subscribe_and_process(a_mock: mock.AsyncMock):
ready_event.set()

# Wait until the test is done
while not shutdown_event.is_set():
while not shutdown_event.is_set(): # noqa: ASYNC110
await asyncio.sleep(0.1)

# Cleanup
Expand Down
Loading
Loading