Skip to content

Commit 28d1137

Browse files
Merge branch 'master' into make-api-server-celery-concurrency-configurable
2 parents 238d744 + c8f61c1 commit 28d1137

File tree

42 files changed

+1439
-370
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1439
-370
lines changed

.env-devel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ DYNAMIC_SIDECAR_PROMETHEUS_MONITORING_NETWORKS=[]
130130
DYNAMIC_SIDECAR_PROMETHEUS_SERVICE_LABELS={}
131131
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT=01:00:00
132132
DIRECTOR_V2_TRACING={}
133+
DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED=1
133134

134135
# DYNAMIC_SCHEDULER ----
135136
DYNAMIC_SCHEDULER_LOGLEVEL=INFO

packages/models-library/src/models_library/functions.py

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -190,21 +190,12 @@ class FunctionJobBase(BaseModel):
190190
function_class: FunctionClass
191191

192192

193-
class RegisteredFunctionJobBase(FunctionJobBase):
194-
uid: FunctionJobID
195-
created_at: datetime.datetime
196-
197-
198193
class ProjectFunctionJob(FunctionJobBase):
199194
function_class: Literal[FunctionClass.PROJECT] = FunctionClass.PROJECT
200195
project_job_id: ProjectID | None
201196
job_creation_task_id: TaskID | None
202197

203198

204-
class RegisteredProjectFunctionJob(ProjectFunctionJob, RegisteredFunctionJobBase):
205-
pass
206-
207-
208199
class RegisteredProjectFunctionJobPatch(BaseModel):
209200
function_class: Literal[FunctionClass.PROJECT] = FunctionClass.PROJECT
210201
title: str | None
@@ -221,10 +212,6 @@ class SolverFunctionJob(FunctionJobBase):
221212
job_creation_task_id: TaskID | None
222213

223214

224-
class RegisteredSolverFunctionJob(SolverFunctionJob, RegisteredFunctionJobBase):
225-
pass
226-
227-
228215
class RegisteredSolverFunctionJobPatch(BaseModel):
229216
function_class: Literal[FunctionClass.SOLVER] = FunctionClass.SOLVER
230217
title: str | None
@@ -239,10 +226,6 @@ class PythonCodeFunctionJob(FunctionJobBase):
239226
function_class: Literal[FunctionClass.PYTHON_CODE] = FunctionClass.PYTHON_CODE
240227

241228

242-
class RegisteredPythonCodeFunctionJob(PythonCodeFunctionJob, RegisteredFunctionJobBase):
243-
pass
244-
245-
246229
class RegisteredPythonCodeFunctionJobPatch(BaseModel):
247230
function_class: Literal[FunctionClass.PYTHON_CODE] = FunctionClass.PYTHON_CODE
248231
title: str | None
@@ -256,6 +239,24 @@ class RegisteredPythonCodeFunctionJobPatch(BaseModel):
256239
Field(discriminator="function_class"),
257240
]
258241

242+
243+
class RegisteredFunctionJobBase(FunctionJobBase):
244+
uid: FunctionJobID
245+
created_at: datetime.datetime
246+
247+
248+
class RegisteredProjectFunctionJob(ProjectFunctionJob, RegisteredFunctionJobBase):
249+
pass
250+
251+
252+
class RegisteredSolverFunctionJob(SolverFunctionJob, RegisteredFunctionJobBase):
253+
pass
254+
255+
256+
class RegisteredPythonCodeFunctionJob(PythonCodeFunctionJob, RegisteredFunctionJobBase):
257+
pass
258+
259+
259260
RegisteredFunctionJob: TypeAlias = Annotated[
260261
RegisteredProjectFunctionJob
261262
| RegisteredPythonCodeFunctionJob
@@ -275,6 +276,36 @@ class FunctionJobStatus(BaseModel):
275276
status: str
276277

277278

279+
class RegisteredFunctionJobWithStatusBase(RegisteredFunctionJobBase, FunctionJobBase):
280+
status: FunctionJobStatus
281+
282+
283+
class RegisteredProjectFunctionJobWithStatus(
284+
RegisteredProjectFunctionJob, RegisteredFunctionJobWithStatusBase
285+
):
286+
pass
287+
288+
289+
class RegisteredSolverFunctionJobWithStatus(
290+
RegisteredSolverFunctionJob, RegisteredFunctionJobWithStatusBase
291+
):
292+
pass
293+
294+
295+
class RegisteredPythonCodeFunctionJobWithStatus(
296+
RegisteredPythonCodeFunctionJob, RegisteredFunctionJobWithStatusBase
297+
):
298+
pass
299+
300+
301+
RegisteredFunctionJobWithStatus: TypeAlias = Annotated[
302+
RegisteredProjectFunctionJobWithStatus
303+
| RegisteredPythonCodeFunctionJobWithStatus
304+
| RegisteredSolverFunctionJobWithStatus,
305+
Field(discriminator="function_class"),
306+
]
307+
308+
278309
class FunctionJobCollection(BaseModel):
279310
"""Model for a collection of function jobs"""
280311

@@ -309,6 +340,12 @@ class RegisteredFunctionJobDB(FunctionJobDB):
309340
created: datetime.datetime
310341

311342

343+
class RegisteredFunctionJobWithStatusDB(FunctionJobDB):
344+
uuid: FunctionJobID
345+
created: datetime.datetime
346+
status: str
347+
348+
312349
class FunctionDB(BaseModel):
313350
function_class: FunctionClass
314351
title: str = ""

packages/service-library/src/servicelib/deferred_tasks/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@
2222
- `cancel`: (called by the user) [optional]:
2323
send a message to cancel the current task. A warning will be logged but no call to either
2424
`on_result` or `on_finished_with_error` will occur.
25+
- `on_cancelled` (called by state `ManuallyCancelled`) [optional] {can be overwritten by the user}:
26+
called after the cancellation is handled by the worker executing the `run`
2527
2628
2729
## DeferredHandler lifecycle
2830
2931
```mermaid
3032
stateDiagram-v2
31-
* --> Scheduled: via [start]
32-
** --> ManuallyCancelled: via [cancel]
33+
(1) --> Scheduled: via [start]
34+
(2) --> ManuallyCancelled: via [cancel]
3335
3436
ManuallyCancelled --> Worker: attempts to cancel task in
3537
@@ -41,9 +43,10 @@
4143
ErrorResult --> FinishedWithError: gives up when out of retries or if cancelled
4244
Worker --> DeferredResult: success
4345
44-
DeferredResult --> °: calls [on_result]
45-
FinishedWithError --> °°: calls [on_finished_with_error]
46-
Worker --> °°°: task cancelled
46+
DeferredResult --> (3): calls [on_result]
47+
FinishedWithError --> (4): calls [on_finished_with_error]
48+
Worker --> Removed*: task cancelled
49+
Removed* --> (5): calls [on_cancelled]
4750
```
4851
4952
### States
@@ -57,6 +60,7 @@
5760
- `FinishedWIthError`: logs error, invokes `on_finished_with_error` and removes the schedule
5861
- `DeferredResult`: invokes `on_result` and removes the schedule
5962
- `ManuallyCancelled`: sends message to all instances to cancel. The instance handling the task will cancel the task and remove the schedule
63+
- `Removed*`: is a fake state that does not exist only used to convey the information that the cancellation event is triggered after removal
6064
"""
6165

6266
from ._base_deferred_handler import (

packages/service-library/src/servicelib/deferred_tasks/_base_deferred_handler.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ async def get_retries(cls, context: DeferredContext) -> NonNegativeInt:
4545
assert context # nosec
4646
return 0
4747

48+
@classmethod
49+
async def get_retry_delay(
50+
cls,
51+
context: DeferredContext,
52+
remaining_attempts: NonNegativeInt,
53+
total_attempts: NonNegativeInt,
54+
) -> timedelta:
55+
"""
56+
returns: the delay between eatch retry attempt (default: 0s)
57+
"""
58+
assert context # nosec
59+
assert remaining_attempts # nosec
60+
assert total_attempts # nosec
61+
return timedelta(seconds=0)
62+
4863
@classmethod
4964
@abstractmethod
5065
async def get_timeout(cls, context: DeferredContext) -> timedelta:
@@ -84,6 +99,11 @@ async def on_finished_with_error(
8499
NOTE: by design the default action is to do nothing
85100
"""
86101

102+
@classmethod
103+
@abstractmethod
104+
async def on_cancelled(cls, context: DeferredContext) -> None:
105+
"""called after handling ``cancel`` request by the copy executing ``run``"""
106+
87107
@classmethod
88108
async def cancel(cls, task_uid: TaskUID) -> None:
89109
"""cancels a deferred"""

packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import inspect
33
import logging
44
from collections.abc import Awaitable, Callable, Iterable
5-
from datetime import timedelta
5+
from datetime import datetime, timedelta
66
from enum import Enum
77
from typing import Any, Final
88

@@ -118,6 +118,14 @@ def _raise_if_not_type(task_result: Any, expected_types: Iterable[type]) -> None
118118
raise TypeError(msg)
119119

120120

121+
async def _wait_until_future_date(possible_future_date: datetime) -> None:
122+
while True:
123+
now = arrow.utcnow().datetime
124+
if now >= possible_future_date:
125+
return
126+
await asyncio.sleep(1)
127+
128+
121129
class DeferredManager: # pylint:disable=too-many-instance-attributes
122130
def __init__(
123131
self,
@@ -297,18 +305,21 @@ async def __start(
297305
subclass = self.__get_subclass(class_unique_reference)
298306
deferred_context = self.__get_deferred_context(start_context)
299307

308+
retry_count = await subclass.get_retries(deferred_context)
300309
task_schedule = TaskScheduleModel(
301310
timeout=await subclass.get_timeout(deferred_context),
302-
execution_attempts=await subclass.get_retries(deferred_context) + 1,
311+
total_attempts=retry_count,
312+
execution_attempts=retry_count + 1,
303313
class_unique_reference=class_unique_reference,
304314
start_context=start_context,
305315
state=TaskState.SCHEDULED,
306316
)
307317

318+
await self._task_tracker.save(task_uid, task_schedule)
319+
308320
with log_catch(_logger, reraise=False):
309321
await subclass.on_created(task_uid, deferred_context)
310322

311-
await self._task_tracker.save(task_uid, task_schedule)
312323
_logger.debug("Scheduled task '%s' with entry: %s", task_uid, task_schedule)
313324
await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SCHEDULED)
314325

@@ -460,7 +471,29 @@ async def _fs_handle_error_result( # pylint:disable=method-hidden
460471
task_schedule.result, TaskResultCancelledError
461472
):
462473
_logger.debug("Schedule retry attempt for task_uid '%s'", task_uid)
463-
# does not retry if task was cancelled
474+
475+
# resilenet wait before retrying
476+
if task_schedule.wait_cancellation_until is None:
477+
# save the new one
478+
subclass = self.__get_subclass(task_schedule.class_unique_reference)
479+
deferred_context = self.__get_deferred_context(
480+
task_schedule.start_context
481+
)
482+
sleep_interval = await subclass.get_retry_delay(
483+
context=deferred_context,
484+
remaining_attempts=task_schedule.execution_attempts,
485+
total_attempts=task_schedule.total_attempts,
486+
)
487+
task_schedule.wait_cancellation_until = (
488+
arrow.utcnow().datetime + sleep_interval
489+
)
490+
await self._task_tracker.save(task_uid, task_schedule)
491+
492+
await _wait_until_future_date(task_schedule.wait_cancellation_until)
493+
task_schedule.wait_cancellation_until = None
494+
await self._task_tracker.save(task_uid, task_schedule)
495+
496+
# waiting is done can proceed with retry
464497
task_schedule.state = TaskState.SUBMIT_TASK
465498
await self._task_tracker.save(task_uid, task_schedule)
466499
await self.__publish_to_queue(task_uid, _FastStreamRabbitQueue.SUBMIT_TASK)
@@ -570,6 +603,11 @@ async def _fs_handle_manually_cancelled( # pylint:disable=method-hidden
570603
_logger.info("Found and cancelled run for '%s'", task_uid)
571604
await self.__remove_task(task_uid, task_schedule)
572605

606+
subclass = self.__get_subclass(task_schedule.class_unique_reference)
607+
deferred_context = self.__get_deferred_context(task_schedule.start_context)
608+
with log_catch(_logger, reraise=False):
609+
await subclass.on_cancelled(deferred_context)
610+
573611
async def __is_present(self, task_uid: TaskUID) -> bool:
574612
task_schedule: TaskScheduleModel | None = await self._task_tracker.get(task_uid)
575613
return task_schedule is not None
Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from datetime import datetime, timedelta
22
from enum import Enum
3+
from typing import Annotated
34

45
import arrow
6+
from common_library.basic_types import DEFAULT_FACTORY
57
from pydantic import BaseModel, Field, NonNegativeInt
68

79
from ._base_deferred_handler import StartContext
@@ -23,37 +25,63 @@ class TaskState(str, Enum):
2325

2426

2527
class TaskScheduleModel(BaseModel):
26-
timeout: timedelta = Field(
27-
..., description="Amount of time after which the task execution will time out"
28-
)
29-
class_unique_reference: ClassUniqueReference = Field(
30-
...,
31-
description="reference to the class containing the code and handlers for the execution of the task",
32-
)
33-
start_context: StartContext = Field(
34-
...,
35-
description="data used to assemble the ``StartContext``",
36-
)
37-
38-
state: TaskState = Field(
39-
..., description="represents the execution step of the task"
40-
)
41-
42-
execution_attempts: NonNegativeInt = Field(
43-
...,
44-
description="remaining attempts to run the code, only retries if this is > 0",
45-
)
46-
47-
time_started: datetime = Field(
48-
default_factory=lambda: arrow.utcnow().datetime,
49-
description="time when task schedule was created, used for statistics",
50-
)
51-
52-
result: TaskExecutionResult | None = Field(
53-
default=None,
54-
description=(
55-
f"Populated by {TaskState.WORKER}. It always has a value after worker handles it."
56-
"Will be used "
28+
timeout: Annotated[
29+
timedelta,
30+
Field(
31+
description="Amount of time after which the task execution will time out"
5732
),
58-
discriminator="result_type",
59-
)
33+
]
34+
class_unique_reference: Annotated[
35+
ClassUniqueReference,
36+
Field(
37+
description="reference to the class containing the code and handlers for the execution of the task",
38+
),
39+
]
40+
start_context: Annotated[
41+
StartContext,
42+
Field(
43+
description="data used to assemble the ``StartContext``",
44+
),
45+
]
46+
47+
state: Annotated[
48+
TaskState, Field(description="represents the execution step of the task")
49+
]
50+
51+
total_attempts: Annotated[
52+
NonNegativeInt,
53+
Field(
54+
description="maximum number of attempts before giving up (0 means no retries)"
55+
),
56+
]
57+
58+
execution_attempts: Annotated[
59+
NonNegativeInt,
60+
Field(
61+
description="remaining attempts to run the code, only retries if this is > 0",
62+
),
63+
]
64+
65+
wait_cancellation_until: Annotated[
66+
datetime | None,
67+
Field(description="when set has to wait till this before cancelling the task"),
68+
] = None
69+
70+
time_started: Annotated[
71+
datetime,
72+
Field(
73+
default_factory=lambda: arrow.utcnow().datetime,
74+
description="time when task schedule was created, used for statistics",
75+
),
76+
] = DEFAULT_FACTORY
77+
78+
result: Annotated[
79+
TaskExecutionResult | None,
80+
Field(
81+
description=(
82+
f"Populated by {TaskState.WORKER}. It always has a value after worker handles it."
83+
"Will be used "
84+
),
85+
discriminator="result_type",
86+
),
87+
] = None

0 commit comments

Comments
 (0)