Skip to content

Commit 760bcbe

Browse files
author
Andrei Neagu
committed
first working version
1 parent 7d9939c commit 760bcbe

File tree

9 files changed

+222
-70
lines changed

9 files changed

+222
-70
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import logging
33
from collections.abc import AsyncIterator, Iterable
4+
from contextlib import asynccontextmanager
45
from datetime import datetime, timedelta
56
from typing import Final
67
from uuid import uuid4
@@ -14,7 +15,7 @@
1415
from servicelib.utils import limited_gather
1516

1617
from ._deferred_runner import DeferredRunner
17-
from ._dependencies import enqueue_event
18+
from ._dependencies import enqueue_schedule_event
1819
from ._errors import UnexpectedStepHandlingError
1920
from ._models import (
2021
OperationContext,
@@ -28,6 +29,7 @@
2829
from ._store import (
2930
KeyNotFoundInHashError,
3031
ScheduleDataStoreProxy,
32+
StepGroupProxy,
3133
StepStoreProxy,
3234
Store,
3335
get_store,
@@ -92,14 +94,18 @@ async def _is_operation_in_progress_status(
9294

9395

9496
async def _start_and_mark_as_started(
95-
step_proxy: StepStoreProxy, *, is_creating: bool
97+
step_proxy: StepStoreProxy,
98+
*,
99+
is_creating: bool,
100+
expected_steps_count: NonNegativeInt,
96101
) -> None:
97102
await DeferredRunner.start(
98103
schedule_id=step_proxy.schedule_id,
99104
operation_name=step_proxy.operation_name,
100105
step_group_name=step_proxy.step_group_name,
101106
step_name=step_proxy.step_name,
102107
is_creating=is_creating,
108+
expected_steps_count=expected_steps_count,
103109
)
104110
await step_proxy.set_multiple(
105111
{
@@ -150,16 +156,13 @@ async def create(
150156
"is_creating": True,
151157
}
152158
)
153-
await enqueue_event(self.app, schedule_id)
159+
await enqueue_schedule_event(self.app, schedule_id)
154160
return schedule_id
155161

156-
async def safe_on_schedule_event(self, schedule_id: ScheduleId) -> None:
157-
"""
158-
NOTE: do not call this directly, you are doing something wrong
159-
runs scheduling actions for a given schedule_id
160-
"""
162+
@asynccontextmanager
163+
async def _safe_event(self, schedule_id: ScheduleId) -> AsyncIterator[None]:
161164
try:
162-
await self._on_schedule_event(schedule_id)
165+
yield
163166
except KeyNotFoundInHashError as err:
164167
_logger.debug(
165168
"Cannot process schedule_id='%s' since it's data was not found: %s",
@@ -182,6 +185,11 @@ async def safe_on_schedule_event(self, schedule_id: ScheduleId) -> None:
182185
message=log_kwargs["msg"],
183186
)
184187

188+
async def safe_on_schedule_event(self, schedule_id: ScheduleId) -> None:
189+
# NOTE: do not call this directly, you are doing something wrong
190+
async with self._safe_event(schedule_id):
191+
await self._on_schedule_event(schedule_id)
192+
185193
async def cancel_schedule(self, schedule_id: ScheduleId) -> None:
186194
"""
187195
Cancels and runs destruction of the operation
@@ -282,6 +290,7 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
282290
step_group=step_group,
283291
is_creating=is_creating,
284292
)
293+
group_step_count = len(step_group)
285294

286295
# get steps to start
287296
if to_start_step_proxies := await _get_steps_to_start(
@@ -297,7 +306,11 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
297306
):
298307
await limited_gather(
299308
*(
300-
_start_and_mark_as_started(step_proxy, is_creating=is_creating)
309+
_start_and_mark_as_started(
310+
step_proxy,
311+
is_creating=is_creating,
312+
expected_steps_count=group_step_count,
313+
)
301314
for step_proxy in to_start_step_proxies
302315
),
303316
limit=_PARALLEL_STATUS_REQUESTS,
@@ -356,7 +369,7 @@ async def _continue_handling_as_creation(
356369
# does a next group exist?
357370
_ = operation[next_group_index]
358371
await schedule_data_proxy.set("group_index", value=next_group_index)
359-
await enqueue_event(self.app, schedule_id)
372+
await enqueue_schedule_event(self.app, schedule_id)
360373
except IndexError:
361374
# does the step need repeating?
362375
if current_step_group.repeat_steps is True:
@@ -381,7 +394,18 @@ async def _continue_handling_as_creation(
381394
)
382395
await step_rproxy.remove()
383396

384-
await enqueue_event(self.app, schedule_id)
397+
group_proxy = StepGroupProxy(
398+
store=self._store,
399+
schedule_id=schedule_id,
400+
operation_name=operation_name,
401+
step_group_name=current_step_group.get_step_group_name(
402+
index=group_index
403+
),
404+
is_creating=True,
405+
)
406+
await group_proxy.remove()
407+
408+
await enqueue_schedule_event(self.app, schedule_id)
385409
else:
386410
# TODO: the end has bean reached, do nothing from now on
387411
_logger.debug(
@@ -439,7 +463,7 @@ async def _continue_handling_as_creation(
439463
f"{operation_name=} was not successfull: {steps_statuses=}, moving to revert",
440464
):
441465
await schedule_data_proxy.set("is_creating", value=False)
442-
await enqueue_event(self.app, schedule_id)
466+
await enqueue_schedule_event(self.app, schedule_id)
443467
return
444468

445469
raise UnexpectedStepHandlingError(
@@ -461,7 +485,7 @@ async def _continue_handling_as_reverting(
461485
return
462486

463487
await schedule_data_proxy.set("group_index", value=previous_group_index)
464-
await enqueue_event(self.app, schedule_id)
488+
await enqueue_schedule_event(self.app, schedule_id)
465489
return
466490

467491
# if any in FAILED this is unexpected falg to be investigated

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from datetime import timedelta
22

33
from fastapi import FastAPI
4+
from pydantic import NonNegativeInt
45
from servicelib.deferred_tasks import BaseDeferredHandler, DeferredContext, TaskUID
56
from servicelib.deferred_tasks._models import TaskResultError
67

7-
from ._dependencies import enqueue_event
8+
from ._dependencies import enqueue_schedule_event
89
from ._models import OperationName, ScheduleId, StepGroupName, StepName, StepStatus
910
from ._operation import BaseStep, OperationRegistry
10-
from ._store import StepStoreProxy, Store, get_store
11+
from ._store import StepGroupProxy, StepStoreProxy, Store, get_store
1112

1213

1314
def _get_step_store_proxy(context: DeferredContext) -> StepStoreProxy:
@@ -29,16 +30,39 @@ def _get_step_store_proxy(context: DeferredContext) -> StepStoreProxy:
2930
)
3031

3132

33+
def _get_step_group_proxy(context: DeferredContext) -> StepGroupProxy:
34+
app: FastAPI = context["app"]
35+
schedule_id: ScheduleId = context["schedule_id"]
36+
operation_name: OperationName = context["operation_name"]
37+
step_group_name: StepGroupName = context["step_group_name"]
38+
is_creating = context["is_creating"]
39+
40+
store: Store = get_store(app)
41+
return StepGroupProxy(
42+
store=store,
43+
schedule_id=schedule_id,
44+
operation_name=operation_name,
45+
step_group_name=step_group_name,
46+
is_creating=is_creating,
47+
)
48+
49+
3250
def _get_step(context: DeferredContext) -> type[BaseStep]:
3351
operation_name: OperationName = context["operation_name"]
3452
step_name: StepName = context["step_name"]
3553
return OperationRegistry.get_step(operation_name, step_name)
3654

3755

38-
async def _send_schedule_event(context: DeferredContext) -> None:
56+
async def _send_group_done_check_event(context: DeferredContext) -> None:
3957
app: FastAPI = context["app"]
4058
schedule_id: ScheduleId = context["schedule_id"]
41-
await enqueue_event(app, schedule_id)
59+
expected_steps_count: NonNegativeInt = context["expected_steps_count"]
60+
61+
if (
62+
await _get_step_group_proxy(context).increment_and_get_done_steps_count()
63+
== expected_steps_count
64+
):
65+
await enqueue_schedule_event(app, schedule_id)
4266

4367

4468
class DeferredRunner(BaseDeferredHandler[None]):
@@ -51,13 +75,15 @@ async def start( # type:ignore[override] # pylint:disable=arguments-differ
5175
step_group_name: StepGroupName,
5276
step_name: StepName,
5377
is_creating: bool,
78+
expected_steps_count: NonNegativeInt,
5479
) -> DeferredContext:
5580
return {
5681
"schedule_id": schedule_id,
5782
"operation_name": operation_name,
5883
"step_group_name": step_group_name,
5984
"step_name": step_name,
6085
"is_creating": is_creating,
86+
"expected_steps_count": expected_steps_count,
6187
}
6288

6389
@classmethod
@@ -104,7 +130,8 @@ async def run(cls, context: DeferredContext) -> None:
104130
async def on_result(cls, result: None, context: DeferredContext) -> None:
105131
_ = result
106132
await _get_step_store_proxy(context).set("status", StepStatus.SUCCESS)
107-
await _send_schedule_event(context)
133+
134+
await _send_group_done_check_event(context)
108135

109136
@classmethod
110137
async def on_finished_with_error(
@@ -113,9 +140,11 @@ async def on_finished_with_error(
113140
await _get_step_store_proxy(context).set_multiple(
114141
{"status": StepStatus.FAILED, "error_traceback": error.format_error()}
115142
)
116-
await _send_schedule_event(context)
143+
144+
await _send_group_done_check_event(context)
117145

118146
@classmethod
119147
async def on_cancelled(cls, context: DeferredContext) -> None:
120148
await _get_step_store_proxy(context).set("status", StepStatus.CANCELLED)
121-
await _send_schedule_event(context)
149+
150+
await _send_group_done_check_event(context)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_dependencies.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@
88
from ._event_scheduler import EventScheduler
99

1010

11-
async def enqueue_event(app: FastAPI, schedule_id: ScheduleId) -> None:
11+
async def enqueue_schedule_event(app: FastAPI, schedule_id: ScheduleId) -> None:
1212
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
13-
await event_scheduler.enqueue_event(schedule_id)
13+
await event_scheduler.enqueue_schedule_event(schedule_id)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_scheduler.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import functools
22
import logging
33
from collections.abc import AsyncIterator
4-
from typing import Final
4+
from typing import Final, NotRequired, TypedDict
55

66
from fastapi import FastAPI
77
from fastapi_lifespan_manager import State
@@ -13,6 +13,7 @@
1313
RabbitQueue,
1414
RabbitRouter,
1515
)
16+
from faststream.rabbit.schemas.queue import ClassicQueueArgs
1617

1718
from ...core.settings import ApplicationSettings
1819
from ._core import get_core
@@ -21,13 +22,20 @@
2122
_logger = logging.getLogger(__name__)
2223

2324

25+
class QueuConfiguration(TypedDict):
26+
queue_name: str
27+
arguments: NotRequired[ClassicQueueArgs | None]
28+
29+
2430
_EXCHANGE_NAME: Final[str] = __name__
25-
_QUEUE_NAME: Final[str] = "event_scheduler"
26-
_NO_PARALLE_MESSAGE_CONSUMING: Final[int] = 1
2731

2832

29-
def _get_global_queue(queue_name: str) -> RabbitQueue:
30-
return RabbitQueue(f"{_EXCHANGE_NAME}_{queue_name}", durable=True)
33+
def _get_global_queue(
34+
queue_name: str, arguments: ClassicQueueArgs | None = None
35+
) -> RabbitQueue:
36+
return RabbitQueue(
37+
f"{_EXCHANGE_NAME}_{queue_name}", durable=True, arguments=arguments
38+
)
3139

3240

3341
def _stop_retry_for_unintended_errors(func):
@@ -66,47 +74,45 @@ def __init__(self, app: FastAPI) -> None:
6674

6775
settings: ApplicationSettings = app.state.settings
6876

69-
self.broker: RabbitBroker = RabbitBroker(
70-
settings.DYNAMIC_SCHEDULER_RABBITMQ.dsn,
71-
log_level=logging.DEBUG,
72-
max_consumers=_NO_PARALLE_MESSAGE_CONSUMING,
77+
self._broker: RabbitBroker = RabbitBroker(
78+
settings.DYNAMIC_SCHEDULER_RABBITMQ.dsn, log_level=logging.DEBUG
7379
)
74-
self.router: RabbitRouter = RabbitRouter()
75-
self.exchange = RabbitExchange(
80+
self._router: RabbitRouter = RabbitRouter()
81+
self._exchange = RabbitExchange(
7682
_EXCHANGE_NAME, durable=True, type=ExchangeType.DIRECT
7783
)
84+
self._queue_schedule_event = _get_global_queue(queue_name="schedule_queue")
7885

7986
@_stop_retry_for_unintended_errors
80-
async def _on_secure_schedule_event( # pylint:disable=method-hidden
87+
async def _on_schedule_secure_event( # pylint:disable=method-hidden
8188
self, schedule_id: ScheduleId
8289
) -> None:
83-
# advance operation
84-
# NOTE: should no longer forward operation if nothing needs doing
85-
# an unexpected error might be raised
8690
await get_core(self.app).safe_on_schedule_event(schedule_id)
8791

88-
async def enqueue_event(self, schedule_id: ScheduleId) -> None:
89-
await self.broker.publish(
90-
schedule_id, queue=_get_global_queue(_QUEUE_NAME), exchange=self.exchange
92+
async def enqueue_schedule_event(self, schedule_id: ScheduleId) -> None:
93+
await self._broker.publish(
94+
schedule_id,
95+
queue=self._queue_schedule_event,
96+
exchange=self._exchange,
9197
)
9298

9399
def _register_subscribers(self) -> None:
94100
# pylint:disable=unexpected-keyword-arg
95101
# pylint:disable=no-value-for-parameter
96-
self._on_secure_schedule_event = self.router.subscriber(
97-
queue=_get_global_queue(_QUEUE_NAME),
98-
exchange=self.exchange,
102+
self._on_schedule_secure_event = self._router.subscriber(
103+
queue=self._queue_schedule_event,
104+
exchange=self._exchange,
99105
retry=True,
100-
)(self._on_secure_schedule_event)
106+
)(self._on_schedule_secure_event)
101107

102108
async def setup(self) -> None:
103109
self._register_subscribers()
104-
self.broker.include_router(self.router)
110+
self._broker.include_router(self._router)
105111

106-
await self.broker.start()
112+
await self._broker.start()
107113

108114
async def shutdown(self) -> None:
109-
await self.broker.close()
115+
await self._broker.close()
110116

111117

112118
async def lifespan(app: FastAPI) -> AsyncIterator[State]:

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ def __init__(self, *, repeat_steps: bool, wait_before_repeat: timedelta) -> None
9191
self.repeat_steps = repeat_steps
9292
self.wait_before_repeat = wait_before_repeat
9393

94+
@abstractmethod
95+
def __len__(self) -> int:
96+
"""number of steps in this group"""
97+
9498
@abstractmethod
9599
def get_step_group_name(self, *, index: NonNegativeInt) -> StepGroupName:
96100
"""returns the name of this step group"""
@@ -121,6 +125,9 @@ def __init__(
121125
repeat_steps=repeat_steps, wait_before_repeat=wait_before_repeat
122126
)
123127

128+
def __len__(self) -> int:
129+
return 1
130+
124131
def get_step_group_name(self, *, index: NonNegativeInt) -> StepGroupName:
125132
return f"{index}S{'R' if self.repeat_steps else ''}"
126133

@@ -148,6 +155,9 @@ def __init__(
148155
repeat_steps=repeat_steps, wait_before_repeat=wait_before_repeat
149156
)
150157

158+
def __len__(self) -> int:
159+
return len(self._steps)
160+
151161
@property
152162
def steps(self) -> list[type[BaseStep]]:
153163
return self._steps

0 commit comments

Comments
 (0)