Skip to content

Commit 009d00c

Browse files
committed
Merge branch 'master' into 1090-implement-sampling-tracing-strategy
2 parents 00e643a + 98bd68a commit 009d00c

File tree

10 files changed

+169
-35
lines changed

10 files changed

+169
-35
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from ._core import (
22
cancel_operation,
3+
get_operation_name_or_none,
34
restart_operation_step_stuck_during_revert,
45
restart_operation_step_stuck_in_manual_intervention_during_execute,
56
start_operation,
@@ -35,6 +36,7 @@
3536
"cancel_operation",
3637
"generic_scheduler_lifespan",
3738
"get_operation_context_proxy",
39+
"get_operation_name_or_none",
3840
"get_step_group_proxy",
3941
"get_step_store_proxy",
4042
"Operation",

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
OperationErrorType,
5252
OperationName,
5353
OperationToStart,
54+
ReservedContextKeys,
5455
ScheduleId,
5556
StepName,
5657
StepStatus,
@@ -100,6 +101,8 @@ async def start_operation(
100101
"""start an operation by it's given name and providing an initial context"""
101102
schedule_id: ScheduleId = f"{uuid4()}"
102103

104+
initial_operation_context[ReservedContextKeys.SCHEDULE_ID] = schedule_id
105+
103106
# check if operation is registered
104107
operation = OperationRegistry.get_operation(operation_name)
105108

@@ -213,6 +216,17 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
213216
limit=PARALLEL_REQUESTS,
214217
)
215218

219+
async def get_operation_name_or_none(
220+
self, schedule_id: ScheduleId
221+
) -> OperationName | None:
222+
schedule_data_proxy = ScheduleDataStoreProxy(
223+
store=self._store, schedule_id=schedule_id
224+
)
225+
try:
226+
return await schedule_data_proxy.read("operation_name")
227+
except NoDataFoundError:
228+
return None
229+
216230
async def restart_operation_step_stuck_in_error(
217231
self,
218232
schedule_id: ScheduleId,
@@ -732,6 +746,13 @@ async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None:
732746
await Core.get_from_app_state(app).cancel_operation(schedule_id)
733747

734748

749+
async def get_operation_name_or_none(
750+
app: FastAPI, schedule_id: ScheduleId
751+
) -> OperationName | None:
752+
"""returns the name of the operation or None if not found"""
753+
return await Core.get_from_app_state(app).get_operation_name_or_none(schedule_id)
754+
755+
735756
async def restart_operation_step_stuck_in_manual_intervention_during_execute(
736757
app: FastAPI, schedule_id: ScheduleId, step_name: StepName
737758
) -> None:

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,23 @@ async def register_to_start_after(
3535
schedule_id: ScheduleId,
3636
event_type: EventType,
3737
*,
38-
to_start: OperationToStart,
38+
to_start: OperationToStart | None,
3939
) -> None:
40+
41+
events_proxy = OperationEventsProxy(self._store, schedule_id, event_type)
42+
if to_start is None:
43+
# unregister any previously registered operation
44+
await events_proxy.delete()
45+
_logger.debug(
46+
"Unregistered event_type='%s' to_start for schedule_id='%s'",
47+
event_type,
48+
schedule_id,
49+
)
50+
return
51+
4052
# ensure operation exists
4153
OperationRegistry.get_operation(to_start.operation_name)
4254

43-
events_proxy = OperationEventsProxy(self._store, schedule_id, event_type)
4455
await events_proxy.create_or_update_multiple(
4556
{
4657
"initial_context": to_start.initial_context,

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after_registration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ def _get_after_event_manager(app: FastAPI) -> "AfterEventManager":
1616

1717

1818
async def register_to_start_after_on_executed_completed(
19-
app: FastAPI, schedule_id: ScheduleId, *, to_start: OperationToStart
19+
app: FastAPI, schedule_id: ScheduleId, *, to_start: OperationToStart | None
2020
) -> None:
2121
await _get_after_event_manager(app).register_to_start_after(
2222
schedule_id, EventType.ON_EXECUTEDD_COMPLETED, to_start=to_start
2323
)
2424

2525

2626
async def register_to_start_after_on_reverted_completed(
27-
app: FastAPI, schedule_id: ScheduleId, *, to_start: OperationToStart
27+
app: FastAPI, schedule_id: ScheduleId, *, to_start: OperationToStart | None
2828
) -> None:
2929
await _get_after_event_manager(app).register_to_start_after(
3030
schedule_id, EventType.ON_REVERT_COMPLETED, to_start=to_start

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_models.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass
2-
from enum import auto
2+
from enum import Enum, auto
33
from typing import Annotated, Any, Final, TypeAlias
44

55
from models_library.basic_types import UUIDStr
@@ -51,3 +51,10 @@ class EventType(StrAutoEnum):
5151
class OperationToStart:
5252
operation_name: OperationName
5353
initial_context: OperationContext
54+
55+
56+
class ReservedContextKeys(str, Enum):
57+
SCHEDULE_ID = "_schedule_id"
58+
59+
60+
ALL_RESERVED_CONTEXT_KEYS: Final[set[str]] = {x.value for x in ReservedContextKeys}

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
StepNotFoundInoperationError,
1313
)
1414
from ._models import (
15+
ALL_RESERVED_CONTEXT_KEYS,
1516
OperationName,
1617
ProvidedOperationContext,
1718
RequiredOperationContext,
@@ -245,7 +246,7 @@ def _has_abstract_methods(cls: type[object]) -> bool:
245246

246247

247248
@validate_call(config={"arbitrary_types_allowed": True})
248-
def _validate_operation( # noqa: C901
249+
def _validate_operation( # noqa: C901, PLR0912 # pylint: disable=too-many-branches
249250
operation: Operation,
250251
) -> dict[StepName, type[BaseStep]]:
251252
if len(operation.step_groups) == 0:
@@ -285,14 +286,27 @@ def _validate_operation( # noqa: C901
285286
detected_steps_names[step_name] = step
286287

287288
for key in step.get_execute_provides_context_keys():
289+
if key in ALL_RESERVED_CONTEXT_KEYS:
290+
msg = (
291+
f"Step {step_name=} provides {key=} which is part of reserved keys "
292+
f"{ALL_RESERVED_CONTEXT_KEYS=}"
293+
)
294+
raise ValueError(msg)
288295
if key in execute_provided_keys:
289296
msg = (
290297
f"Step {step_name=} provides already provided {key=} in "
291298
f"{step.get_execute_provides_context_keys.__name__}()"
292299
)
293300
raise ValueError(msg)
294301
execute_provided_keys.add(key)
302+
295303
for key in step.get_revert_provides_context_keys():
304+
if key in ALL_RESERVED_CONTEXT_KEYS:
305+
msg = (
306+
f"Step {step_name=} provides {key=} which is part of reserved keys "
307+
f"{ALL_RESERVED_CONTEXT_KEYS=}"
308+
)
309+
raise ValueError(msg)
296310
if key in revert_provided_keys:
297311
msg = (
298312
f"Step {step_name=} provides already provided {key=} in "

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__core.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
SingleStepGroup,
3333
StepStoreProxy,
3434
cancel_operation,
35+
get_operation_name_or_none,
3536
restart_operation_step_stuck_during_revert,
3637
restart_operation_step_stuck_in_manual_intervention_during_execute,
3738
start_operation,
@@ -722,6 +723,7 @@ async def test_execute_revert_order(
722723
],
723724
{
724725
"SCH:{schedule_id}",
726+
"SCH:{schedule_id}:OP_CTX:test_op",
725727
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
726728
"SCH:{schedule_id}:GROUPS:test_op:0S:R",
727729
"SCH:{schedule_id}:STEPS:test_op:0S:E:_FCR1",
@@ -740,6 +742,7 @@ async def test_execute_revert_order(
740742
],
741743
{
742744
"SCH:{schedule_id}",
745+
"SCH:{schedule_id}:OP_CTX:test_op",
743746
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
744747
"SCH:{schedule_id}:GROUPS:test_op:1S:E",
745748
"SCH:{schedule_id}:GROUPS:test_op:1S:R",
@@ -761,6 +764,7 @@ async def test_execute_revert_order(
761764
],
762765
{
763766
"SCH:{schedule_id}",
767+
"SCH:{schedule_id}:OP_CTX:test_op",
764768
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
765769
"SCH:{schedule_id}:GROUPS:test_op:1P:E",
766770
"SCH:{schedule_id}:GROUPS:test_op:1P:R",
@@ -786,6 +790,7 @@ async def test_execute_revert_order(
786790
],
787791
{
788792
"SCH:{schedule_id}",
793+
"SCH:{schedule_id}:OP_CTX:test_op",
789794
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
790795
"SCH:{schedule_id}:GROUPS:test_op:1P:E",
791796
"SCH:{schedule_id}:GROUPS:test_op:1P:R",
@@ -1014,6 +1019,7 @@ async def test_repeating_step(
10141019
],
10151020
{
10161021
"SCH:{schedule_id}",
1022+
"SCH:{schedule_id}:OP_CTX:test_op",
10171023
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
10181024
"SCH:{schedule_id}:GROUPS:test_op:1P:E",
10191025
"SCH:{schedule_id}:GROUPS:test_op:2S:E",
@@ -1049,6 +1055,7 @@ async def test_repeating_step(
10491055
],
10501056
{
10511057
"SCH:{schedule_id}",
1058+
"SCH:{schedule_id}:OP_CTX:test_op",
10521059
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
10531060
"SCH:{schedule_id}:GROUPS:test_op:1P:E",
10541061
"SCH:{schedule_id}:GROUPS:test_op:2P:E",
@@ -1174,6 +1181,7 @@ async def test_operation_is_not_cancellable(
11741181
],
11751182
{
11761183
"SCH:{schedule_id}",
1184+
"SCH:{schedule_id}:OP_CTX:test_op",
11771185
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
11781186
"SCH:{schedule_id}:GROUPS:test_op:1P:E",
11791187
"SCH:{schedule_id}:GROUPS:test_op:2S:E",
@@ -1213,6 +1221,7 @@ async def test_operation_is_not_cancellable(
12131221
],
12141222
{
12151223
"SCH:{schedule_id}",
1224+
"SCH:{schedule_id}:OP_CTX:test_op",
12161225
"SCH:{schedule_id}:GROUPS:test_op:0S:E",
12171226
"SCH:{schedule_id}:GROUPS:test_op:1P:E",
12181227
"SCH:{schedule_id}:GROUPS:test_op:2P:E",
@@ -1753,3 +1762,23 @@ async def test_step_does_not_provide_declared_key_or_is_none(
17531762

17541763
formatted_expected_keys = {k.format(schedule_id=schedule_id) for k in expected_keys}
17551764
await ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)
1765+
1766+
1767+
@pytest.mark.parametrize("app_count", [10])
1768+
async def test_get_operation_name_or_none(
1769+
preserve_caplog_for_async_logging: None,
1770+
operation_name: OperationName,
1771+
selected_app: FastAPI,
1772+
register_operation: Callable[[OperationName, Operation], None],
1773+
):
1774+
assert (
1775+
await get_operation_name_or_none(selected_app, "non_existing_schedule_id")
1776+
is None
1777+
)
1778+
1779+
operation = Operation(SingleStepGroup(_S1))
1780+
register_operation(operation_name, operation)
1781+
1782+
schedule_id = await start_operation(selected_app, operation_name, {})
1783+
1784+
assert await get_operation_name_or_none(selected_app, schedule_id) == operation_name

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
StepNotFoundInoperationError,
99
)
1010
from simcore_service_dynamic_scheduler.services.generic_scheduler._models import (
11+
ALL_RESERVED_CONTEXT_KEYS,
1112
ProvidedOperationContext,
1213
RequiredOperationContext,
14+
ReservedContextKeys,
1315
)
1416
from simcore_service_dynamic_scheduler.services.generic_scheduler._operation import (
1517
BaseStep,
@@ -57,6 +59,12 @@ def get_execute_provides_context_keys(cls) -> set[str]:
5759
return {"execute_key"}
5860

5961

62+
class WrongBS3C(BaseBS):
63+
@classmethod
64+
def get_execute_provides_context_keys(cls) -> set[str]:
65+
return {ReservedContextKeys.SCHEDULE_ID}
66+
67+
6068
class WrongBS1R(BaseBS):
6169
@classmethod
6270
def get_revert_provides_context_keys(cls) -> set[str]:
@@ -69,6 +77,22 @@ def get_revert_provides_context_keys(cls) -> set[str]:
6977
return {"revert_key"}
7078

7179

80+
class WrongBS3R(BaseBS):
81+
@classmethod
82+
def get_revert_provides_context_keys(cls) -> set[str]:
83+
return {ReservedContextKeys.SCHEDULE_ID}
84+
85+
86+
class AllowedKeysBS(BaseBS):
87+
@classmethod
88+
def get_execute_requires_context_keys(cls) -> set[str]:
89+
return {ReservedContextKeys.SCHEDULE_ID}
90+
91+
@classmethod
92+
def get_revert_requires_context_keys(cls) -> set[str]:
93+
return {ReservedContextKeys.SCHEDULE_ID}
94+
95+
7296
@pytest.mark.parametrize(
7397
"operation",
7498
[
@@ -104,6 +128,9 @@ def get_revert_provides_context_keys(cls) -> set[str]:
104128
Operation(
105129
ParallelStepGroup(BS1, BS3, repeat_steps=True),
106130
),
131+
Operation(
132+
SingleStepGroup(AllowedKeysBS),
133+
),
107134
],
108135
)
109136
def test_validate_operation_passes(operation: Operation):
@@ -166,6 +193,14 @@ def test_validate_operation_passes(operation: Operation):
166193
),
167194
"cannot have steps that require manual intervention",
168195
),
196+
(
197+
Operation(SingleStepGroup(WrongBS3C)),
198+
"which is part of reserved keys ALL_RESERVED_CONTEXT_KEYS",
199+
),
200+
(
201+
Operation(SingleStepGroup(WrongBS3R)),
202+
"which is part of reserved keys ALL_RESERVED_CONTEXT_KEYS",
203+
),
169204
],
170205
)
171206
def test_validate_operations_fails(operation: Operation, match: str):
@@ -204,3 +239,9 @@ def test_operation_registry_raises_errors():
204239

205240
with pytest.raises(StepNotFoundInoperationError):
206241
OperationRegistry.get_step("op1", "non_existing")
242+
243+
244+
def test_reserved_context_keys_existence():
245+
for e in ReservedContextKeys:
246+
assert e.value in ALL_RESERVED_CONTEXT_KEYS
247+
assert "missing_key" not in ALL_RESERVED_CONTEXT_KEYS

0 commit comments

Comments
 (0)