Skip to content

Commit c8f61c1

Browse files
GitHKAndrei Neagu
andauthored
✨ dynamic-scheduler prerequisites ⚠️🚨 (#8287)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 62244f5 commit c8f61c1

File tree

10 files changed

+157
-51
lines changed

10 files changed

+157
-51
lines changed

.env-devel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS=[]
129129
DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS={}
130130
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT=01:00:00
131131
DIRECTOR_V2_TRACING={}
132+
DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED=1
132133

133134
# DYNAMIC_SCHEDULER ----
134135
DYNAMIC_SCHEDULER_LOGLEVEL=INFO

packages/service-library/src/servicelib/deferred_tasks/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@
2222
- `cancel`: (called by the user) [optional]:
2323
send a message to cancel the current task. A warning will be logged but no call to either
2424
`on_result` or `on_finished_with_error` will occur.
25+
- `on_cancelled` (called by state `ManuallyCancelled`) [optional] {can be overwritten by the user}:
26+
called after the cancellation is handled by the worker executing the `run`
2527
2628
2729
## DeferredHandler lifecycle
2830
2931
```mermaid
3032
stateDiagram-v2
31-
* --> Scheduled: via [start]
32-
** --> ManuallyCancelled: via [cancel]
33+
(1) --> Scheduled: via [start]
34+
(2) --> ManuallyCancelled: via [cancel]
3335
3436
ManuallyCancelled --> Worker: attempts to cancel task in
3537
@@ -41,9 +43,10 @@
4143
ErrorResult --> FinishedWithError: gives up when out of retries or if cancelled
4244
Worker --> DeferredResult: success
4345
44-
DeferredResult --> °: calls [on_result]
45-
FinishedWithError --> °°: calls [on_finished_with_error]
46-
Worker --> °°°: task cancelled
46+
DeferredResult --> (3): calls [on_result]
47+
FinishedWithError --> (4): calls [on_finished_with_error]
48+
Worker --> Removed*: task cancelled
49+
Removed* --> (5): calls [on_cancelled]
4750
```
4851
4952
### States
@@ -57,6 +60,7 @@
5760
- `FinishedWIthError`: logs error, invokes `on_finished_with_error` and removes the schedule
5861
- `DeferredResult`: invokes `on_result` and removes the schedule
5962
- `ManuallyCancelled`: sends message to all instances to cancel. The instance handling the task will cancel the task and remove the schedule
63+
- `Removed*`: is a fake state that does not exist only used to convey the information that the cancellation event is triggered after removal
6064
"""
6165

6266
from ._base_deferred_handler import (

packages/service-library/src/servicelib/deferred_tasks/_base_deferred_handler.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ async def get_retries(cls, context: DeferredContext) -> NonNegativeInt:
4545
assert context # nosec
4646
return 0
4747

48+
@classmethod
49+
async def get_retry_delay(
50+
cls,
51+
context: DeferredContext,
52+
remaining_attempts: NonNegativeInt,
53+
total_attempts: NonNegativeInt,
54+
) -> timedelta:
55+
"""
56+
returns: the delay between eatch retry attempt (default: 0s)
57+
"""
58+
assert context # nosec
59+
assert remaining_attempts # nosec
60+
assert total_attempts # nosec
61+
return timedelta(seconds=0)
62+
4863
@classmethod
4964
@abstractmethod
5065
async def get_timeout(cls, context: DeferredContext) -> timedelta:
@@ -84,6 +99,11 @@ async def on_finished_with_error(
8499
NOTE: by design the default action is to do nothing
85100
"""
86101

102+
@classmethod
103+
@abstractmethod
104+
async def on_cancelled(cls, context: DeferredContext) -> None:
105+
"""called after handling ``cancel`` request by the copy executing ``run``"""
106+
87107
@classmethod
88108
async def cancel(cls, task_uid: TaskUID) -> None:
89109
"""cancels a deferred"""

packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import inspect
33
import logging
44
from collections.abc import Awaitable, Callable, Iterable
5-
from datetime import timedelta
5+
from datetime import datetime, timedelta
66
from enum import Enum
77
from typing import Any, Final
88

@@ -118,6 +118,14 @@ def _raise_if_not_type(task_result: Any, expected_types: Iterable[type]) -> None
118118
raise TypeError(msg)
119119

120120

121+
async def _wait_until_future_date(possible_future_date: datetime) -> None:
122+
while True:
123+
now = arrow.utcnow().datetime
124+
if now >= possible_future_date:
125+
return
126+
await asyncio.sleep(1)
127+
128+
121129
class DeferredManager: # pylint:disable=too-many-instance-attributes
122130
def __init__(
123131
self,
@@ -297,18 +305,21 @@ async def __start(
297305
subclass = self.__get_subclass(class_unique_reference)
298306
deferred_context = self.__get_deferred_context(start_context)
299307

308+
retry_count = await subclass.get_retries(deferred_context)
300309
task_schedule = TaskScheduleModel(
301310
timeout=await subclass.get_timeout(deferred_context),
302-
execution_attempts=await subclass.get_retries(deferred_context) + 1,
311+
total_attempts=retry_count,
312+
execution_attempts=retry_count + 1,
303313
class_unique_reference=class_unique_reference,
304314
start_context=start_context,
305315
state=TaskState.SCHEDULED,
306316
)
307317

318+
await self._task_tracker.save(task_uid, task_schedule)
319+
308320
with log_catch(_logger, reraise=False):
309321
await subclass.on_created(task_uid, deferred_context)
310322

311-
await self._task_tracker.save(task_uid, task_schedule)
312323
_logger.debug("Scheduled task '%s' with entry: %s", task_uid, task_schedule)
313324
await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SCHEDULED)
314325

@@ -460,7 +471,29 @@ async def _fs_handle_error_result( # pylint:disable=method-hidden
460471
task_schedule.result, TaskResultCancelledError
461472
):
462473
_logger.debug("Schedule retry attempt for task_uid '%s'", task_uid)
463-
# does not retry if task was cancelled
474+
475+
# resilenet wait before retrying
476+
if task_schedule.wait_cancellation_until is None:
477+
# save the new one
478+
subclass = self.__get_subclass(task_schedule.class_unique_reference)
479+
deferred_context = self.__get_deferred_context(
480+
task_schedule.start_context
481+
)
482+
sleep_interval = await subclass.get_retry_delay(
483+
context=deferred_context,
484+
remaining_attempts=task_schedule.execution_attempts,
485+
total_attempts=task_schedule.total_attempts,
486+
)
487+
task_schedule.wait_cancellation_until = (
488+
arrow.utcnow().datetime + sleep_interval
489+
)
490+
await self._task_tracker.save(task_uid, task_schedule)
491+
492+
await _wait_until_future_date(task_schedule.wait_cancellation_until)
493+
task_schedule.wait_cancellation_until = None
494+
await self._task_tracker.save(task_uid, task_schedule)
495+
496+
# waiting is done can proceed with retry
464497
task_schedule.state = TaskState.SUBMIT_TASK
465498
await self._task_tracker.save(task_uid, task_schedule)
466499
await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SUBMIT_TASK)
@@ -570,6 +603,11 @@ async def _fs_handle_manually_cancelled( # pylint:disable=method-hidden
570603
_logger.info("Found and cancelled run for '%s'", task_uid)
571604
await self.__remove_task(task_uid, task_schedule)
572605

606+
subclass = self.__get_subclass(task_schedule.class_unique_reference)
607+
deferred_context = self.__get_deferred_context(task_schedule.start_context)
608+
with log_catch(_logger, reraise=False):
609+
await subclass.on_cancelled(deferred_context)
610+
573611
async def __is_present(self, task_uid: TaskUID) -> bool:
574612
task_schedule: TaskScheduleModel | None = await self._task_tracker.get(task_uid)
575613
return task_schedule is not None
Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from datetime import datetime, timedelta
22
from enum import Enum
3+
from typing import Annotated
34

45
import arrow
6+
from common_library.basic_types import DEFAULT_FACTORY
57
from pydantic import BaseModel, Field, NonNegativeInt
68

79
from ._base_deferred_handler import StartContext
@@ -23,37 +25,63 @@ class TaskState(str, Enum):
2325

2426

2527
class TaskScheduleModel(BaseModel):
26-
timeout: timedelta = Field(
27-
..., description="Amount of time after which the task execution will time out"
28-
)
29-
class_unique_reference: ClassUniqueReference = Field(
30-
...,
31-
description="reference to the class containing the code and handlers for the execution of the task",
32-
)
33-
start_context: StartContext = Field(
34-
...,
35-
description="data used to assemble the ``StartContext``",
36-
)
37-
38-
state: TaskState = Field(
39-
..., description="represents the execution step of the task"
40-
)
41-
42-
execution_attempts: NonNegativeInt = Field(
43-
...,
44-
description="remaining attempts to run the code, only retries if this is > 0",
45-
)
46-
47-
time_started: datetime = Field(
48-
default_factory=lambda: arrow.utcnow().datetime,
49-
description="time when task schedule was created, used for statistics",
50-
)
51-
52-
result: TaskExecutionResult | None = Field(
53-
default=None,
54-
description=(
55-
f"Populated by {TaskState.WORKER}. It always has a value after worker handles it."
56-
"Will be used "
28+
timeout: Annotated[
29+
timedelta,
30+
Field(
31+
description="Amount of time after which the task execution will time out"
5732
),
58-
discriminator="result_type",
59-
)
33+
]
34+
class_unique_reference: Annotated[
35+
ClassUniqueReference,
36+
Field(
37+
description="reference to the class containing the code and handlers for the execution of the task",
38+
),
39+
]
40+
start_context: Annotated[
41+
StartContext,
42+
Field(
43+
description="data used to assemble the ``StartContext``",
44+
),
45+
]
46+
47+
state: Annotated[
48+
TaskState, Field(description="represents the execution step of the task")
49+
]
50+
51+
total_attempts: Annotated[
52+
NonNegativeInt,
53+
Field(
54+
description="maximum number of attempts before giving up (0 means no retries)"
55+
),
56+
]
57+
58+
execution_attempts: Annotated[
59+
NonNegativeInt,
60+
Field(
61+
description="remaining attempts to run the code, only retries if this is > 0",
62+
),
63+
]
64+
65+
wait_cancellation_until: Annotated[
66+
datetime | None,
67+
Field(description="when set has to wait till this before cancelling the task"),
68+
] = None
69+
70+
time_started: Annotated[
71+
datetime,
72+
Field(
73+
default_factory=lambda: arrow.utcnow().datetime,
74+
description="time when task schedule was created, used for statistics",
75+
),
76+
] = DEFAULT_FACTORY
77+
78+
result: Annotated[
79+
TaskExecutionResult | None,
80+
Field(
81+
description=(
82+
f"Populated by {TaskState.WORKER}. It always has a value after worker handles it."
83+
"Will be used "
84+
),
85+
discriminator="result_type",
86+
),
87+
] = None

packages/service-library/tests/deferred_tasks/test__base_deferred_handler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class MockKeys(StrAutoEnum):
4646
RUN_DEFERRED_AFTER_HANDLER = auto()
4747
ON_DEFERRED_RESULT = auto()
4848
ON_FINISHED_WITH_ERROR = auto()
49+
ON_CANCELLED = auto()
4950

5051

5152
@pytest.fixture
@@ -132,6 +133,10 @@ async def run(cls, context: DeferredContext) -> Any:
132133
async def on_result(cls, result: Any, context: DeferredContext) -> None:
133134
mocks[MockKeys.ON_DEFERRED_RESULT](result, context)
134135

136+
@classmethod
137+
async def on_cancelled(cls, context: DeferredContext) -> None:
138+
mocks[MockKeys.ON_CANCELLED](context)
139+
135140
@classmethod
136141
async def on_finished_with_error(
137142
cls, error: TaskResultError, context: DeferredContext
@@ -324,6 +329,7 @@ async def _run_to_cancel(_: DeferredContext) -> None:
324329
await _assert_mock_call(mocks, key=MockKeys.RUN_DEFERRED_BEFORE_HANDLER, count=1)
325330
await mocked_deferred_handler.cancel(task_uid)
326331

332+
await _assert_mock_call(mocks, key=MockKeys.ON_CANCELLED, count=1)
327333
await _assert_mock_call(mocks, key=MockKeys.ON_FINISHED_WITH_ERROR, count=0)
328334

329335
assert (

packages/service-library/tests/deferred_tasks/test__redis_task_tracker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def task_schedule() -> TaskScheduleModel:
2222
return TypeAdapter(TaskScheduleModel).validate_python(
2323
{
2424
"timeout": timedelta(seconds=1),
25-
"execution_attempts": 1,
25+
"total_attempts": 1,
26+
"execution_attempts": 2,
2627
"class_unique_reference": "mock",
2728
"start_context": {},
2829
"state": TaskState.SCHEDULED,

services/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,8 @@ services:
353353
DIRECTOR_V2_PROFILING: ${DIRECTOR_V2_PROFILING}
354354
DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL: ${DIRECTOR_V2_DYNAMIC_SIDECAR_SLEEP_AFTER_CONTAINER_REMOVAL}
355355

356+
DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED: ${DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED}
357+
356358
DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED: ${DYNAMIC_SIDECAR_ENDPOINT_SPECS_MODE_DNSRR_ENABLED}
357359
DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS: ${DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS}
358360
DYNAMIC_SIDECAR_IMAGE: ${DYNAMIC_SIDECAR_IMAGE}

services/dynamic-scheduler/requirements/_base.txt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ grpcio==1.70.0
132132
h11==0.16.0
133133
# via
134134
# httpcore
135+
# nicegui
135136
# uvicorn
136137
# wsproto
137138
h2==4.2.0
@@ -232,7 +233,7 @@ multidict==6.1.0
232233
# via
233234
# aiohttp
234235
# yarl
235-
nicegui==2.12.1
236+
nicegui==2.23.3
236237
# via -r requirements/_base.in
237238
opentelemetry-api==1.34.1
238239
# via
@@ -441,8 +442,10 @@ python-dotenv==1.0.1
441442
# via
442443
# pydantic-settings
443444
# uvicorn
444-
python-engineio==4.11.2
445-
# via python-socketio
445+
python-engineio==4.12.2
446+
# via
447+
# nicegui
448+
# python-socketio
446449
python-multipart==0.0.20
447450
# via
448451
# fastapi
@@ -505,9 +508,7 @@ referencing==0.35.1
505508
# jsonschema
506509
# jsonschema-specifications
507510
requests==2.32.4
508-
# via
509-
# nicegui
510-
# opentelemetry-exporter-otlp-proto-http
511+
# via opentelemetry-exporter-otlp-proto-http
511512
rich==14.1.0
512513
# via
513514
# -r requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/_base.in
@@ -571,6 +572,7 @@ starlette==0.47.2
571572
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
572573
# -c requirements/../../../requirements/constraints.txt
573574
# fastapi
575+
# nicegui
574576
stream-zip==0.0.83
575577
# via -r requirements/../../../packages/service-library/requirements/_base.in
576578
tenacity==9.0.0
@@ -628,7 +630,6 @@ urllib3==2.5.0
628630
# -c requirements/../../../packages/settings-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt
629631
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
630632
# -c requirements/../../../requirements/constraints.txt
631-
# nicegui
632633
# requests
633634
# sentry-sdk
634635
uvicorn==0.34.2

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_tracker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ async def delete(self, node_id: NodeID) -> None:
3434
await self.redis_client_sdk.redis.delete(_get_key(node_id))
3535

3636
async def all(self) -> dict[NodeID, TrackedServiceModel]:
37-
found_keys = await self.redis_client_sdk.redis.keys(f"{_KEY_PREFIX}*")
37+
found_keys = [
38+
x
39+
async for x in self.redis_client_sdk.redis.scan_iter(
40+
match=f"{_KEY_PREFIX}*"
41+
)
42+
]
3843
found_values = await self.redis_client_sdk.redis.mget(found_keys)
3944

4045
return {

0 commit comments

Comments
 (0)