diff --git a/.env-devel b/.env-devel index d521ed2815cd..6996637ad3f4 100644 --- a/.env-devel +++ b/.env-devel @@ -129,6 +129,7 @@ DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS=[] DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS={} DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT=01:00:00 DIRECTOR_V2_TRACING={} +DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED=1 # DYNAMIC_SCHEDULER ---- DYNAMIC_SCHEDULER_LOGLEVEL=INFO diff --git a/packages/service-library/src/servicelib/deferred_tasks/__init__.py b/packages/service-library/src/servicelib/deferred_tasks/__init__.py index dd57b0838103..be2491ffb6f5 100644 --- a/packages/service-library/src/servicelib/deferred_tasks/__init__.py +++ b/packages/service-library/src/servicelib/deferred_tasks/__init__.py @@ -22,14 +22,16 @@ - `cancel`: (called by the user) [optional]: send a message to cancel the current task. A warning will be logged but no call to either `on_result` or `on_finished_with_error` will occur. +- `on_cancelled` (called by state `ManuallyCancelled`) [optional] {can be overwritten by the user}: + called after the cancellation is handled by the worker executing the `run` ## DeferredHandler lifecycle ```mermaid stateDiagram-v2 - * --> Scheduled: via [start] - ** --> ManuallyCancelled: via [cancel] + (1) --> Scheduled: via [start] + (2) --> ManuallyCancelled: via [cancel] ManuallyCancelled --> Worker: attempts to cancel task in @@ -41,9 +43,10 @@ ErrorResult --> FinishedWithError: gives up when out of retries or if cancelled Worker --> DeferredResult: success - DeferredResult --> °: calls [on_result] - FinishedWithError --> °°: calls [on_finished_with_error] - Worker --> °°°: task cancelled + DeferredResult --> (3): calls [on_result] + FinishedWithError --> (4): calls [on_finished_with_error] + Worker --> Removed*: task cancelled + Removed* --> (5): calls [on_cancelled] ``` ### States @@ -57,6 +60,7 @@ - `FinishedWIthError`: logs error, invokes `on_finished_with_error` and removes the schedule - `DeferredResult`: invokes `on_result` and removes the schedule - `ManuallyCancelled`: sends message to all instances to cancel. The instance handling the task will cancel the task and remove the schedule +- `Removed*`: is a fake state that does not exist only used to convey the information that the cancellation event is triggered after removal """ from ._base_deferred_handler import ( diff --git a/packages/service-library/src/servicelib/deferred_tasks/_base_deferred_handler.py b/packages/service-library/src/servicelib/deferred_tasks/_base_deferred_handler.py index 3c5110ef8f83..cf42eff26f47 100644 --- a/packages/service-library/src/servicelib/deferred_tasks/_base_deferred_handler.py +++ b/packages/service-library/src/servicelib/deferred_tasks/_base_deferred_handler.py @@ -45,6 +45,21 @@ async def get_retries(cls, context: DeferredContext) -> NonNegativeInt: assert context # nosec return 0 + @classmethod + async def get_retry_delay( + cls, + context: DeferredContext, + remaining_attempts: NonNegativeInt, + total_attempts: NonNegativeInt, + ) -> timedelta: + """ + returns: the delay between eatch retry attempt (default: 0s) + """ + assert context # nosec + assert remaining_attempts # nosec + assert total_attempts # nosec + return timedelta(seconds=0) + @classmethod @abstractmethod async def get_timeout(cls, context: DeferredContext) -> timedelta: @@ -84,6 +99,11 @@ async def on_finished_with_error( NOTE: by design the default action is to do nothing """ + @classmethod + @abstractmethod + async def on_cancelled(cls, context: DeferredContext) -> None: + """called after handling ``cancel`` request by the copy executing ``run``""" + @classmethod async def cancel(cls, task_uid: TaskUID) -> None: """cancels a deferred""" diff --git a/packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py b/packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py index 9c41d2e5f870..c1b26a3d8478 100644 --- a/packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py +++ b/packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py @@ -2,7 +2,7 @@ import inspect import logging from collections.abc import Awaitable, Callable, Iterable -from datetime import timedelta +from datetime import datetime, timedelta from enum import Enum from typing import Any, Final @@ -118,6 +118,14 @@ def _raise_if_not_type(task_result: Any, expected_types: Iterable[type]) -> None raise TypeError(msg) +async def _wait_until_future_date(possible_future_date: datetime) -> None: + while True: + now = arrow.utcnow().datetime + if now >= possible_future_date: + return + await asyncio.sleep(1) + + class DeferredManager: # pylint:disable=too-many-instance-attributes def __init__( self, @@ -297,18 +305,21 @@ async def __start( subclass = self.__get_subclass(class_unique_reference) deferred_context = self.__get_deferred_context(start_context) + retry_count = await subclass.get_retries(deferred_context) task_schedule = TaskScheduleModel( timeout=await subclass.get_timeout(deferred_context), - execution_attempts=await subclass.get_retries(deferred_context) + 1, + total_attempts=retry_count, + execution_attempts=retry_count + 1, class_unique_reference=class_unique_reference, start_context=start_context, state=TaskState.SCHEDULED, ) + await self._task_tracker.save(task_uid, task_schedule) + with log_catch(_logger, reraise=False): await subclass.on_created(task_uid, deferred_context) - await self._task_tracker.save(task_uid, task_schedule) _logger.debug("Scheduled task '%s' with entry: %s", task_uid, task_schedule) await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SCHEDULED) @@ -460,7 +471,29 @@ async def _fs_handle_error_result( # pylint:disable=method-hidden task_schedule.result, TaskResultCancelledError ): _logger.debug("Schedule retry attempt for task_uid '%s'", task_uid) - # does not retry if task was cancelled + + # resilenet wait before retrying + if task_schedule.wait_cancellation_until is None: + # save the new one + subclass = self.__get_subclass(task_schedule.class_unique_reference) + deferred_context = self.__get_deferred_context( + task_schedule.start_context + ) + sleep_interval = await subclass.get_retry_delay( + context=deferred_context, + remaining_attempts=task_schedule.execution_attempts, + total_attempts=task_schedule.total_attempts, + ) + task_schedule.wait_cancellation_until = ( + arrow.utcnow().datetime + sleep_interval + ) + await self._task_tracker.save(task_uid, task_schedule) + + await _wait_until_future_date(task_schedule.wait_cancellation_until) + task_schedule.wait_cancellation_until = None + await self._task_tracker.save(task_uid, task_schedule) + + # waiting is done can proceed with retry task_schedule.state = TaskState.SUBMIT_TASK await self._task_tracker.save(task_uid, task_schedule) await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SUBMIT_TASK) @@ -570,6 +603,11 @@ async def _fs_handle_manually_cancelled( # pylint:disable=method-hidden _logger.info("Found and cancelled run for '%s'", task_uid) await self.__remove_task(task_uid, task_schedule) + subclass = self.__get_subclass(task_schedule.class_unique_reference) + deferred_context = self.__get_deferred_context(task_schedule.start_context) + with log_catch(_logger, reraise=False): + await subclass.on_cancelled(deferred_context) + async def __is_present(self, task_uid: TaskUID) -> bool: task_schedule: TaskScheduleModel | None = await self._task_tracker.get(task_uid) return task_schedule is not None diff --git a/packages/service-library/src/servicelib/deferred_tasks/_task_schedule.py b/packages/service-library/src/servicelib/deferred_tasks/_task_schedule.py index 5a88b99568b3..8d34e1081637 100644 --- a/packages/service-library/src/servicelib/deferred_tasks/_task_schedule.py +++ b/packages/service-library/src/servicelib/deferred_tasks/_task_schedule.py @@ -1,7 +1,9 @@ from datetime import datetime, timedelta from enum import Enum +from typing import Annotated import arrow +from common_library.basic_types import DEFAULT_FACTORY from pydantic import BaseModel, Field, NonNegativeInt from ._base_deferred_handler import StartContext @@ -23,37 +25,63 @@ class TaskState(str, Enum): class TaskScheduleModel(BaseModel): - timeout: timedelta = Field( - ..., description="Amount of time after which the task execution will time out" - ) - class_unique_reference: ClassUniqueReference = Field( - ..., - description="reference to the class containing the code and handlers for the execution of the task", - ) - start_context: StartContext = Field( - ..., - description="data used to assemble the ``StartContext``", - ) - - state: TaskState = Field( - ..., description="represents the execution step of the task" - ) - - execution_attempts: NonNegativeInt = Field( - ..., - description="remaining attempts to run the code, only retries if this is > 0", - ) - - time_started: datetime = Field( - default_factory=lambda: arrow.utcnow().datetime, - description="time when task schedule was created, used for statistics", - ) - - result: TaskExecutionResult | None = Field( - default=None, - description=( - f"Populated by {TaskState.WORKER}. It always has a value after worker handles it." - "Will be used " + timeout: Annotated[ + timedelta, + Field( + description="Amount of time after which the task execution will time out" ), - discriminator="result_type", - ) + ] + class_unique_reference: Annotated[ + ClassUniqueReference, + Field( + description="reference to the class containing the code and handlers for the execution of the task", + ), + ] + start_context: Annotated[ + StartContext, + Field( + description="data used to assemble the ``StartContext``", + ), + ] + + state: Annotated[ + TaskState, Field(description="represents the execution step of the task") + ] + + total_attempts: Annotated[ + NonNegativeInt, + Field( + description="maximum number of attempts before giving up (0 means no retries)" + ), + ] + + execution_attempts: Annotated[ + NonNegativeInt, + Field( + description="remaining attempts to run the code, only retries if this is > 0", + ), + ] + + wait_cancellation_until: Annotated[ + datetime | None, + Field(description="when set has to wait till this before cancelling the task"), + ] = None + + time_started: Annotated[ + datetime, + Field( + default_factory=lambda: arrow.utcnow().datetime, + description="time when task schedule was created, used for statistics", + ), + ] = DEFAULT_FACTORY + + result: Annotated[ + TaskExecutionResult | None, + Field( + description=( + f"Populated by {TaskState.WORKER}. It always has a value after worker handles it." + "Will be used " + ), + discriminator="result_type", + ), + ] = None diff --git a/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py b/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py index a26156f2c3f2..d25c7824752c 100644 --- a/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py +++ b/packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py @@ -46,6 +46,7 @@ class MockKeys(StrAutoEnum): RUN_DEFERRED_AFTER_HANDLER = auto() ON_DEFERRED_RESULT = auto() ON_FINISHED_WITH_ERROR = auto() + ON_CANCELLED = auto() @pytest.fixture @@ -132,6 +133,10 @@ async def run(cls, context: DeferredContext) -> Any: async def on_result(cls, result: Any, context: DeferredContext) -> None: mocks[MockKeys.ON_DEFERRED_RESULT](result, context) + @classmethod + async def on_cancelled(cls, context: DeferredContext) -> None: + mocks[MockKeys.ON_CANCELLED](context) + @classmethod async def on_finished_with_error( cls, error: TaskResultError, context: DeferredContext @@ -324,6 +329,7 @@ async def _run_to_cancel(_: DeferredContext) -> None: await _assert_mock_call(mocks, key=MockKeys.RUN_DEFERRED_BEFORE_HANDLER, count=1) await mocked_deferred_handler.cancel(task_uid) + await _assert_mock_call(mocks, key=MockKeys.ON_CANCELLED, count=1) await _assert_mock_call(mocks, key=MockKeys.ON_FINISHED_WITH_ERROR, count=0) assert ( diff --git a/packages/service-library/tests/deferred_tasks/test__redis_task_tracker.py b/packages/service-library/tests/deferred_tasks/test__redis_task_tracker.py index 366759e22d3b..515ed901e98d 100644 --- a/packages/service-library/tests/deferred_tasks/test__redis_task_tracker.py +++ b/packages/service-library/tests/deferred_tasks/test__redis_task_tracker.py @@ -22,7 +22,8 @@ def task_schedule() -> TaskScheduleModel: return TypeAdapter(TaskScheduleModel).validate_python( { "timeout": timedelta(seconds=1), - "execution_attempts": 1, + "total_attempts": 1, + "execution_attempts": 2, "class_unique_reference": "mock", "start_context": {}, "state": TaskState.SCHEDULED, diff --git a/services/docker-compose.yml b/services/docker-compose.yml index dbda14eb590e..e8f7fba296a2 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -353,6 +353,8 @@ services: DIRECTOR_V2_PROFILING: ${DIRECTOR_V2_PROFILING} DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL: ${DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL} + DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED: ${DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED} + DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED: ${DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED} DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS: ${DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS} DYNAMIC_SIDECAR_IMAGE: ${DYNAMIC_SIDECAR_IMAGE} diff --git a/services/dynamic-scheduler/requirements/_base.txt b/services/dynamic-scheduler/requirements/_base.txt index c51f1c65fb5b..788876597397 100644 --- a/services/dynamic-scheduler/requirements/_base.txt +++ b/services/dynamic-scheduler/requirements/_base.txt @@ -132,6 +132,7 @@ grpcio==1.70.0 h11==0.16.0 # via # httpcore + # nicegui # uvicorn # wsproto h2==4.2.0 @@ -232,7 +233,7 @@ multidict==6.1.0 # via # aiohttp # yarl -nicegui==2.12.1 +nicegui==2.23.3 # via -r requirements/_base.in opentelemetry-api==1.34.1 # via @@ -441,8 +442,10 @@ python-dotenv==1.0.1 # via # pydantic-settings # uvicorn -python-engineio==4.11.2 - # via python-socketio +python-engineio==4.12.2 + # via + # nicegui + # python-socketio python-multipart==0.0.20 # via # fastapi @@ -505,9 +508,7 @@ referencing==0.35.1 # jsonschema # jsonschema-specifications requests==2.32.4 - # via - # nicegui - # opentelemetry-exporter-otlp-proto-http + # via opentelemetry-exporter-otlp-proto-http rich==14.1.0 # via # -r requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/_base.in @@ -571,6 +572,7 @@ starlette==0.47.2 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # fastapi + # nicegui stream-zip==0.0.83 # via -r requirements/../../../packages/service-library/requirements/_base.in tenacity==9.0.0 @@ -628,7 +630,6 @@ urllib3==2.5.0 # -c requirements/../../../packages/settings-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt - # nicegui # requests # sentry-sdk uvicorn==0.34.2 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py index 489cee153105..da57c84f3497 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py @@ -34,7 +34,12 @@ async def delete(self, node_id: NodeID) -> None: await self.redis_client_sdk.redis.delete(_get_key(node_id)) async def all(self) -> dict[NodeID, TrackedServiceModel]: - found_keys = await self.redis_client_sdk.redis.keys(f"{_KEY_PREFIX}*") + found_keys = [ + x + async for x in self.redis_client_sdk.redis.scan_iter( + match=f"{_KEY_PREFIX}*" + ) + ] found_values = await self.redis_client_sdk.redis.mget(found_keys) return {