Skip to content

Commit ab119d5

Browse files
committed
Merge branch 'master' into fix_duplicate_map_inputs
2 parents 13a35b5 + 56a28d2 commit ab119d5

File tree

26 files changed

+1995
-919
lines changed

26 files changed

+1995
-919
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
from ._core import (
22
cancel_operation,
3-
restart_operation_step_stuck_during_undo,
4-
restart_operation_step_stuck_in_manual_intervention_during_create,
3+
restart_operation_step_stuck_during_revert,
4+
restart_operation_step_stuck_in_manual_intervention_during_execute,
55
start_operation,
66
)
77
from ._deferred_runner import (
88
get_operation_context_proxy,
99
get_step_group_proxy,
1010
get_step_store_proxy,
1111
)
12+
from ._event_after_registration import (
13+
register_to_start_after_on_executed_completed,
14+
register_to_start_after_on_reverted_completed,
15+
)
1216
from ._lifespan import generic_scheduler_lifespan
1317
from ._models import (
1418
OperationName,
19+
OperationToStart,
1520
ProvidedOperationContext,
1621
RequiredOperationContext,
1722
ScheduleId,
@@ -36,11 +41,14 @@
3641
"OperationContextProxy",
3742
"OperationName",
3843
"OperationRegistry",
44+
"OperationToStart",
3945
"ParallelStepGroup",
4046
"ProvidedOperationContext",
47+
"register_to_start_after_on_executed_completed",
48+
"register_to_start_after_on_reverted_completed",
4149
"RequiredOperationContext",
42-
"restart_operation_step_stuck_during_undo",
43-
"restart_operation_step_stuck_in_manual_intervention_during_create",
50+
"restart_operation_step_stuck_during_revert",
51+
"restart_operation_step_stuck_in_manual_intervention_during_execute",
4452
"ScheduleId",
4553
"SingleStepGroup",
4654
"start_operation",

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 152 additions & 71 deletions
Large diffs are not rendered by default.

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core_utils.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ async def get_steps_statuses(
7575
async def start_and_mark_as_started(
7676
step_proxy: StepStoreProxy,
7777
*,
78-
is_creating: bool,
78+
is_executing: bool,
7979
expected_steps_count: NonNegativeInt,
8080
) -> None:
8181
await DeferredRunner.start(
8282
schedule_id=step_proxy.schedule_id,
8383
operation_name=step_proxy.operation_name,
8484
step_group_name=step_proxy.step_group_name,
8585
step_name=step_proxy.step_name,
86-
is_creating=is_creating,
86+
is_executing=is_executing,
8787
expected_steps_count=expected_steps_count,
8888
)
8989
await step_proxy.create_or_update_multiple(
@@ -117,7 +117,7 @@ async def get_step_error_traceback(
117117
operation_name=operation_name,
118118
step_group_name=current_step_group.get_step_group_name(index=group_index),
119119
step_name=step_name,
120-
is_creating=False,
120+
is_executing=False,
121121
)
122122
return step_name, await step_proxy.read("error_traceback")
123123

@@ -129,7 +129,7 @@ def get_group_step_proxies(
129129
operation_name: OperationName,
130130
group_index: NonNegativeInt,
131131
step_group: BaseStepGroup,
132-
is_creating: bool,
132+
is_executing: bool,
133133
) -> dict[StepName, StepStoreProxy]:
134134
return {
135135
step.get_step_name(): StepStoreProxy(
@@ -138,7 +138,7 @@ def get_group_step_proxies(
138138
operation_name=operation_name,
139139
step_group_name=step_group.get_step_group_name(index=group_index),
140140
step_name=step.get_step_name(),
141-
is_creating=is_creating,
141+
is_executing=is_executing,
142142
)
143143
for step in step_group.get_step_subgroup_to_run()
144144
}
@@ -168,7 +168,7 @@ async def _get_steps_to_start(
168168
async def start_steps_which_were_not_started(
169169
group_step_proxies: dict[StepName, StepStoreProxy],
170170
*,
171-
is_creating: bool,
171+
is_executing: bool,
172172
group_step_count: NonNegativeInt,
173173
) -> bool:
174174
"""retruns True if any step was started"""
@@ -186,7 +186,7 @@ async def start_steps_which_were_not_started(
186186
*(
187187
start_and_mark_as_started(
188188
step_proxy,
189-
is_creating=is_creating,
189+
is_executing=is_executing,
190190
expected_steps_count=group_step_count,
191191
)
192192
for step_proxy in to_start_step_proxies
@@ -198,11 +198,11 @@ async def start_steps_which_were_not_started(
198198

199199

200200
async def cleanup_after_finishing(
201-
store: Store, *, schedule_id: ScheduleId, is_creating: bool
201+
store: Store, *, schedule_id: ScheduleId, is_executing: bool
202202
) -> None:
203203
removal_proxy = OperationRemovalProxy(store=store, schedule_id=schedule_id)
204204
await removal_proxy.delete()
205-
verb = "COMPLETED" if is_creating else "UNDONE"
205+
verb = "COMPLETED" if is_executing else "REVERTED"
206206
_logger.debug("Operation for schedule_id='%s' %s successfully", verb, schedule_id)
207207

208208

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ def get_step_store_proxy(context: DeferredContext) -> StepStoreProxy:
3434
operation_name: OperationName = context["operation_name"]
3535
step_group_name: StepGroupName = context["step_group_name"]
3636
step_name: StepName = context["step_name"]
37-
is_creating = context["is_creating"]
37+
is_executing = context["is_executing"]
3838

3939
return StepStoreProxy(
4040
store=Store.get_from_app_state(app),
4141
schedule_id=schedule_id,
4242
operation_name=operation_name,
4343
step_group_name=step_group_name,
4444
step_name=step_name,
45-
is_creating=is_creating,
45+
is_executing=is_executing,
4646
)
4747

4848

@@ -51,14 +51,14 @@ def get_step_group_proxy(context: DeferredContext) -> StepGroupProxy:
5151
schedule_id: ScheduleId = context["schedule_id"]
5252
operation_name: OperationName = context["operation_name"]
5353
step_group_name: StepGroupName = context["step_group_name"]
54-
is_creating = context["is_creating"]
54+
is_executing = context["is_executing"]
5555

5656
return StepGroupProxy(
5757
store=Store.get_from_app_state(app),
5858
schedule_id=schedule_id,
5959
operation_name=operation_name,
6060
step_group_name=step_group_name,
61-
is_creating=is_creating,
61+
is_executing=is_executing,
6262
)
6363

6464

@@ -124,36 +124,36 @@ async def start( # type:ignore[override] # pylint:disable=arguments-differ
124124
operation_name: OperationName,
125125
step_group_name: StepGroupName,
126126
step_name: StepName,
127-
is_creating: bool,
127+
is_executing: bool,
128128
expected_steps_count: NonNegativeInt,
129129
) -> DeferredContext:
130130
return {
131131
"schedule_id": schedule_id,
132132
"operation_name": operation_name,
133133
"step_group_name": step_group_name,
134134
"step_name": step_name,
135-
"is_creating": is_creating,
135+
"is_executing": is_executing,
136136
"expected_steps_count": expected_steps_count,
137137
}
138138

139139
@classmethod
140140
async def get_retries(cls, context: DeferredContext) -> int:
141-
is_creating = context["is_creating"]
141+
is_executing = context["is_executing"]
142142
step = _get_step(context)
143143
return (
144-
await step.get_create_retries(context)
145-
if is_creating
146-
else await step.get_undo_retries(context)
144+
await step.get_execute_retries(context)
145+
if is_executing
146+
else await step.get_revert_retries(context)
147147
)
148148

149149
@classmethod
150150
async def get_timeout(cls, context: DeferredContext) -> timedelta:
151-
is_creating = context["is_creating"]
151+
is_executing = context["is_executing"]
152152
step = _get_step(context)
153153
return (
154-
await step.get_create_wait_between_attempts(context)
155-
if is_creating
156-
else await step.get_undo_wait_between_attempts(context)
154+
await step.get_execute_wait_between_attempts(context)
155+
if is_executing
156+
else await step.get_revert_wait_between_attempts(context)
157157
)
158158

159159
@classmethod
@@ -165,7 +165,7 @@ async def on_created(cls, task_uid: TaskUID, context: DeferredContext) -> None:
165165
@classmethod
166166
async def run(cls, context: DeferredContext) -> None:
167167
app = context["app"]
168-
is_creating = context["is_creating"]
168+
is_executing = context["is_executing"]
169169

170170
await get_step_store_proxy(context).create_or_update(
171171
"status", StepStatus.RUNNING
@@ -175,31 +175,31 @@ async def run(cls, context: DeferredContext) -> None:
175175

176176
operation_context_proxy = get_operation_context_proxy(context)
177177

178-
if is_creating:
178+
if is_executing:
179179
required_context = await operation_context_proxy.read(
180-
*step.get_create_requires_context_keys()
180+
*step.get_execute_requires_context_keys()
181181
)
182182
_raise_if_any_context_value_is_none(required_context)
183183

184-
step_provided_operation_context = await step.create(app, required_context)
184+
step_provided_operation_context = await step.execute(app, required_context)
185185
provided_operation_context = step_provided_operation_context or {}
186-
create_provides_keys = step.get_create_provides_context_keys()
186+
execute_provides_keys = step.get_execute_provides_context_keys()
187187

188188
_raise_if_provided_context_keys_are_missing_or_none(
189-
provided_operation_context, create_provides_keys
189+
provided_operation_context, execute_provides_keys
190190
)
191191
else:
192192
required_context = await operation_context_proxy.read(
193-
*step.get_undo_requires_context_keys()
193+
*step.get_revert_requires_context_keys()
194194
)
195195
_raise_if_any_context_value_is_none(required_context)
196196

197-
step_provided_operation_context = await step.undo(app, required_context)
197+
step_provided_operation_context = await step.revert(app, required_context)
198198
provided_operation_context = step_provided_operation_context or {}
199-
undo_provides_keys = step.get_undo_provides_context_keys()
199+
revert_provides_keys = step.get_revert_provides_context_keys()
200200

201201
_raise_if_provided_context_keys_are_missing_or_none(
202-
provided_operation_context, undo_provides_keys
202+
provided_operation_context, revert_provides_keys
203203
)
204204

205205
await operation_context_proxy.create_or_update(provided_operation_context)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from typing import TYPE_CHECKING
2+
3+
from fastapi import FastAPI
4+
5+
if TYPE_CHECKING:
6+
from ._core import Core
7+
from ._event_after import AfterEventManager
8+
from ._event_scheduler import EventScheduler
9+
10+
# NOTE:
11+
# Due to circular dependencies it is not possible to use the following:
12+
# - `Core.get_from_app_state(app)`
13+
# - `AfterEventManager.get_from_app_state(app)`
14+
# - `EventScheduler.get_from_app_state(app)`
15+
# This module avoids issues with circular dependencies
16+
17+
18+
def get_core(app: FastAPI) -> "Core":
19+
core: Core = app.state.generic_scheduler_core
20+
return core
21+
22+
23+
def get_after_event_manager(app: FastAPI) -> "AfterEventManager":
24+
after_event_manager: AfterEventManager = app.state.after_event_manager
25+
return after_event_manager
26+
27+
28+
def get_event_scheduler(app: FastAPI) -> "EventScheduler":
29+
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
30+
return event_scheduler

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ class InitialOperationContextKeyNotAllowedError(BaseGenericSchedulerError):
4949
)
5050

5151

52+
class OperationNotCancellableError(BaseGenericSchedulerError):
53+
msg_template: str = "Operation '{operation_name}' is not cancellable"
54+
55+
5256
class CannotCancelWhileWaitingForManualInterventionError(BaseGenericSchedulerError):
5357
msg_template: str = (
5458
"Cannot cancel schedule_id='{schedule_id}' while one or more steps are waiting for manual intervention."
Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,42 @@
1-
from typing import TYPE_CHECKING
2-
31
from fastapi import FastAPI
42

5-
from ._models import ScheduleId
6-
7-
if TYPE_CHECKING:
8-
from ._event_scheduler import EventScheduler
3+
from ._dependencies import get_event_scheduler
4+
from ._event_base_queue import OperationToStartEvent
5+
from ._event_queues import ExecuteCompletedQueue, RevertCompletedQueue, ScheduleQueue
6+
from ._models import OperationContext, OperationName, ScheduleId
97

108

119
async def enqueue_schedule_event(app: FastAPI, schedule_id: ScheduleId) -> None:
12-
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
13-
await event_scheduler.enqueue_schedule_event(schedule_id)
10+
await get_event_scheduler(app).enqueue_message_for(ScheduleQueue, schedule_id)
11+
12+
13+
async def enqueue_execute_completed_event(
14+
app: FastAPI,
15+
schedule_id: ScheduleId,
16+
operation_name: OperationName,
17+
initial_context: OperationContext,
18+
) -> None:
19+
await get_event_scheduler(app).enqueue_message_for(
20+
ExecuteCompletedQueue,
21+
OperationToStartEvent(
22+
schedule_id=schedule_id,
23+
operation_name=operation_name,
24+
initial_context=initial_context,
25+
),
26+
)
27+
28+
29+
async def enqueue_revert_completed_event(
30+
app: FastAPI,
31+
schedule_id: ScheduleId,
32+
operation_name: OperationName,
33+
initial_context: OperationContext,
34+
) -> None:
35+
await get_event_scheduler(app).enqueue_message_for(
36+
RevertCompletedQueue,
37+
OperationToStartEvent(
38+
schedule_id=schedule_id,
39+
operation_name=operation_name,
40+
initial_context=initial_context,
41+
),
42+
)

0 commit comments

Comments
 (0)