Skip to content
Merged
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS=[]
DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS={}
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT=01:00:00
DIRECTOR_V2_TRACING={}
DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED=1

# DYNAMIC_SCHEDULER ----
DYNAMIC_SCHEDULER_LOGLEVEL=INFO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
- `cancel`: (called by the user) [optional]:
send a message to cancel the current task. A warning will be logged but no call to either
`on_result` or `on_finished_with_error` will occur.
- `on_cancelled` (called by state `ManuallyCancelled`) [optional] {can be overwritten by the user}:
called after the cancellation is handled by the worker executing the `run`
## DeferredHandler lifecycle
```mermaid
stateDiagram-v2
* --> Scheduled: via [start]
** --> ManuallyCancelled: via [cancel]
(1) --> Scheduled: via [start]
(2) --> ManuallyCancelled: via [cancel]
ManuallyCancelled --> Worker: attempts to cancel task in
Expand All @@ -41,9 +43,10 @@
ErrorResult --> FinishedWithError: gives up when out of retries or if cancelled
Worker --> DeferredResult: success
DeferredResult --> °: calls [on_result]
FinishedWithError --> °°: calls [on_finished_with_error]
Worker --> °°°: task cancelled
DeferredResult --> (3): calls [on_result]
FinishedWithError --> (4): calls [on_finished_with_error]
Worker --> Removed*: task cancelled
Removed* --> (5): calls [on_cancelled]
```
### States
Expand All @@ -57,6 +60,7 @@
- `FinishedWIthError`: logs error, invokes `on_finished_with_error` and removes the schedule
- `DeferredResult`: invokes `on_result` and removes the schedule
- `ManuallyCancelled`: sends message to all instances to cancel. The instance handling the task will cancel the task and remove the schedule
- `Removed*`: is a fake state that does not exist only used to convey the information that the cancellation event is triggered after removal
"""

from ._base_deferred_handler import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ async def get_retries(cls, context: DeferredContext) -> NonNegativeInt:
assert context # nosec
return 0

@classmethod
async def get_retry_delay(
cls,
context: DeferredContext,
remaining_attempts: NonNegativeInt,
total_attempts: NonNegativeInt,
) -> timedelta:
"""
returns: the delay between eatch retry attempt (default: 0s)
"""
assert context # nosec
assert remaining_attempts # nosec
assert total_attempts # nosec
return timedelta(seconds=0)

@classmethod
@abstractmethod
async def get_timeout(cls, context: DeferredContext) -> timedelta:
Expand Down Expand Up @@ -84,6 +99,11 @@ async def on_finished_with_error(
NOTE: by design the default action is to do nothing
"""

@classmethod
@abstractmethod
async def on_cancelled(cls, context: DeferredContext) -> None:
"""called after handling ``cancel`` request by the copy executing ``run``"""

@classmethod
async def cancel(cls, task_uid: TaskUID) -> None:
"""cancels a deferred"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import inspect
import logging
from collections.abc import Awaitable, Callable, Iterable
from datetime import timedelta
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Final

Expand Down Expand Up @@ -118,6 +118,14 @@ def _raise_if_not_type(task_result: Any, expected_types: Iterable[type]) -> None
raise TypeError(msg)


async def _wait_until_future_date(possible_future_date: datetime) -> None:
while True:
now = arrow.utcnow().datetime
if now >= possible_future_date:
return
await asyncio.sleep(1)


class DeferredManager: # pylint:disable=too-many-instance-attributes
def __init__(
self,
Expand Down Expand Up @@ -297,18 +305,21 @@ async def __start(
subclass = self.__get_subclass(class_unique_reference)
deferred_context = self.__get_deferred_context(start_context)

retry_count = await subclass.get_retries(deferred_context)
task_schedule = TaskScheduleModel(
timeout=await subclass.get_timeout(deferred_context),
execution_attempts=await subclass.get_retries(deferred_context) + 1,
total_attempts=retry_count,
execution_attempts=retry_count + 1,
class_unique_reference=class_unique_reference,
start_context=start_context,
state=TaskState.SCHEDULED,
)

await self._task_tracker.save(task_uid, task_schedule)

with log_catch(_logger, reraise=False):
await subclass.on_created(task_uid, deferred_context)

await self._task_tracker.save(task_uid, task_schedule)
_logger.debug("Scheduled task '%s' with entry: %s", task_uid, task_schedule)
await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SCHEDULED)

Expand Down Expand Up @@ -460,7 +471,29 @@ async def _fs_handle_error_result( # pylint:disable=method-hidden
task_schedule.result, TaskResultCancelledError
):
_logger.debug("Schedule retry attempt for task_uid '%s'", task_uid)
# does not retry if task was cancelled

# resilenet wait before retrying
if task_schedule.wait_cancellation_until is None:
# save the new one
subclass = self.__get_subclass(task_schedule.class_unique_reference)
deferred_context = self.__get_deferred_context(
task_schedule.start_context
)
sleep_interval = await subclass.get_retry_delay(
context=deferred_context,
remaining_attempts=task_schedule.execution_attempts,
total_attempts=task_schedule.total_attempts,
)
task_schedule.wait_cancellation_until = (
arrow.utcnow().datetime + sleep_interval
)
await self._task_tracker.save(task_uid, task_schedule)

await _wait_until_future_date(task_schedule.wait_cancellation_until)
task_schedule.wait_cancellation_until = None
await self._task_tracker.save(task_uid, task_schedule)

# waiting is done can proceed with retry
task_schedule.state = TaskState.SUBMIT_TASK
await self._task_tracker.save(task_uid, task_schedule)
await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SUBMIT_TASK)
Expand Down Expand Up @@ -570,6 +603,11 @@ async def _fs_handle_manually_cancelled( # pylint:disable=method-hidden
_logger.info("Found and cancelled run for '%s'", task_uid)
await self.__remove_task(task_uid, task_schedule)

subclass = self.__get_subclass(task_schedule.class_unique_reference)
deferred_context = self.__get_deferred_context(task_schedule.start_context)
with log_catch(_logger, reraise=False):
await subclass.on_cancelled(deferred_context)

async def __is_present(self, task_uid: TaskUID) -> bool:
task_schedule: TaskScheduleModel | None = await self._task_tracker.get(task_uid)
return task_schedule is not None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from datetime import datetime, timedelta
from enum import Enum
from typing import Annotated

import arrow
from common_library.basic_types import DEFAULT_FACTORY
from pydantic import BaseModel, Field, NonNegativeInt

from ._base_deferred_handler import StartContext
Expand All @@ -23,37 +25,64 @@ class TaskState(str, Enum):


class TaskScheduleModel(BaseModel):
timeout: timedelta = Field(
..., description="Amount of time after which the task execution will time out"
)
class_unique_reference: ClassUniqueReference = Field(
...,
description="reference to the class containing the code and handlers for the execution of the task",
)
start_context: StartContext = Field(
...,
description="data used to assemble the ``StartContext``",
)

state: TaskState = Field(
..., description="represents the execution step of the task"
)

execution_attempts: NonNegativeInt = Field(
...,
description="remaining attempts to run the code, only retries if this is > 0",
)

time_started: datetime = Field(
default_factory=lambda: arrow.utcnow().datetime,
description="time when task schedule was created, used for statistics",
)

result: TaskExecutionResult | None = Field(
default=None,
description=(
f"Populated by {TaskState.WORKER}. It always has a value after worker handles it."
"Will be used "
timeout: Annotated[
timedelta,
Field(
description="Amount of time after which the task execution will time out"
),
discriminator="result_type",
)
]
class_unique_reference: Annotated[
ClassUniqueReference,
Field(
description="reference to the class containing the code and handlers for the execution of the task",
),
]
start_context: Annotated[
StartContext,
Field(
description="data used to assemble the ``StartContext``",
),
]

state: Annotated[
TaskState, Field(description="represents the execution step of the task")
]

total_attempts: Annotated[
NonNegativeInt,
Field(
description="maximum number of attempts before giving up (0 means no retries)"
),
]

execution_attempts: Annotated[
NonNegativeInt,
Field(
description="remaining attempts to run the code, only retries if this is > 0",
),
]

wait_cancellation_until: Annotated[
datetime | None,
Field(description="when set has to wait till this before cancelling the task"),
] = None

time_started: Annotated[
datetime,
Field(
default_factory=lambda: arrow.utcnow().datetime,
description="time when task schedule was created, used for statistics",
),
] = DEFAULT_FACTORY

result: Annotated[
TaskExecutionResult | None,
Field(
default=None,
description=(
f"Populated by {TaskState.WORKER}. It always has a value after worker handles it."
"Will be used "
),
discriminator="result_type",
),
] = None
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MockKeys(StrAutoEnum):
RUN_DEFERRED_AFTER_HANDLER = auto()
ON_DEFERRED_RESULT = auto()
ON_FINISHED_WITH_ERROR = auto()
ON_CANCELLED = auto()


@pytest.fixture
Expand Down Expand Up @@ -132,6 +133,10 @@ async def run(cls, context: DeferredContext) -> Any:
async def on_result(cls, result: Any, context: DeferredContext) -> None:
mocks[MockKeys.ON_DEFERRED_RESULT](result, context)

@classmethod
async def on_cancelled(cls, context: DeferredContext) -> None:
mocks[MockKeys.ON_CANCELLED](context)

@classmethod
async def on_finished_with_error(
cls, error: TaskResultError, context: DeferredContext
Expand Down Expand Up @@ -324,6 +329,7 @@ async def _run_to_cancel(_: DeferredContext) -> None:
await _assert_mock_call(mocks, key=MockKeys.RUN_DEFERRED_BEFORE_HANDLER, count=1)
await mocked_deferred_handler.cancel(task_uid)

await _assert_mock_call(mocks, key=MockKeys.ON_CANCELLED, count=1)
await _assert_mock_call(mocks, key=MockKeys.ON_FINISHED_WITH_ERROR, count=0)

assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def task_schedule() -> TaskScheduleModel:
return TypeAdapter(TaskScheduleModel).validate_python(
{
"timeout": timedelta(seconds=1),
"execution_attempts": 1,
"total_attempts": 1,
"execution_attempts": 2,
"class_unique_reference": "mock",
"start_context": {},
"state": TaskState.SCHEDULED,
Expand Down
2 changes: 2 additions & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ services:
DIRECTOR_V2_PROFILING: ${DIRECTOR_V2_PROFILING}
DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL: ${DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL}

DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED: ${DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED}

DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED: ${DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED}
DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS: ${DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS}
DYNAMIC_SIDECAR_IMAGE: ${DYNAMIC_SIDECAR_IMAGE}
Expand Down
15 changes: 8 additions & 7 deletions services/dynamic-scheduler/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ grpcio==1.70.0
h11==0.16.0
# via
# httpcore
# nicegui
# uvicorn
# wsproto
h2==4.2.0
Expand Down Expand Up @@ -232,7 +233,7 @@ multidict==6.1.0
# via
# aiohttp
# yarl
nicegui==2.12.1
nicegui==2.23.3
# via -r requirements/_base.in
opentelemetry-api==1.34.1
# via
Expand Down Expand Up @@ -441,8 +442,10 @@ python-dotenv==1.0.1
# via
# pydantic-settings
# uvicorn
python-engineio==4.11.2
# via python-socketio
python-engineio==4.12.2
# via
# nicegui
# python-socketio
python-multipart==0.0.20
# via
# fastapi
Expand Down Expand Up @@ -505,9 +508,7 @@ referencing==0.35.1
# jsonschema
# jsonschema-specifications
requests==2.32.4
# via
# nicegui
# opentelemetry-exporter-otlp-proto-http
# via opentelemetry-exporter-otlp-proto-http
rich==14.1.0
# via
# -r requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/_base.in
Expand Down Expand Up @@ -571,6 +572,7 @@ starlette==0.47.2
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../requirements/constraints.txt
# fastapi
# nicegui
stream-zip==0.0.83
# via -r requirements/../../../packages/service-library/requirements/_base.in
tenacity==9.0.0
Expand Down Expand Up @@ -628,7 +630,6 @@ urllib3==2.5.0
# -c requirements/../../../packages/settings-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
# -c requirements/../../../requirements/constraints.txt
# nicegui
# requests
# sentry-sdk
uvicorn==0.34.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ async def delete(self, node_id: NodeID) -> None:
await self.redis_client_sdk.redis.delete(_get_key(node_id))

async def all(self) -> dict[NodeID, TrackedServiceModel]:
found_keys = await self.redis_client_sdk.redis.keys(f"{_KEY_PREFIX}*")
found_keys = [
x
async for x in self.redis_client_sdk.redis.scan_iter(
match=f"{_KEY_PREFIX}*"
)
]
found_values = await self.redis_client_sdk.redis.mget(found_keys)

return {
Expand Down
Loading