Skip to content

Commit aadb582

Browse files
GitHKAndrei Neagu
andauthored
🎨 adds required initial operation context key (#8495)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent fd9641d commit aadb582

File tree

8 files changed

+123
-10
lines changed

8 files changed

+123
-10
lines changed

packages/models-library/tests/test__pydantic_models_and_enums.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def test_equivalent_enums_are_not_strictly_equal():
3030
#
3131
# Here two equivalent enum BUT of type str-enum
3232
#
33-
# SEE from models_library.utils.enums.AutoStrEnum
33+
# SEE from models_library.utils.enums.StrAutoEnum
3434
# SEE https://docs.pydantic.dev/dev-v2/usage/types/enums/
3535
#
3636

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
get_step_group_proxy,
1111
get_step_store_proxy,
1212
)
13+
from ._errors import NoDataFoundError
1314
from ._event_after_registration import (
1415
register_to_start_after_on_executed_completed,
1516
register_to_start_after_on_reverted_completed,
@@ -24,21 +25,28 @@
2425
)
2526
from ._operation import (
2627
BaseStep,
28+
BaseStepGroup,
2729
Operation,
2830
OperationRegistry,
2931
ParallelStepGroup,
3032
SingleStepGroup,
3133
)
32-
from ._store import OperationContextProxy, StepGroupProxy, StepStoreProxy
34+
from ._store import (
35+
OperationContextProxy,
36+
StepGroupProxy,
37+
StepStoreProxy,
38+
)
3339

3440
__all__: tuple[str, ...] = (
3541
"BaseStep",
42+
"BaseStepGroup",
3643
"cancel_operation",
3744
"generic_scheduler_lifespan",
3845
"get_operation_context_proxy",
3946
"get_operation_name_or_none",
4047
"get_step_group_proxy",
4148
"get_step_store_proxy",
49+
"NoDataFoundError",
4250
"Operation",
4351
"OperationContextProxy",
4452
"OperationName",

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ._errors import (
3131
CannotCancelWhileWaitingForManualInterventionError,
3232
NoDataFoundError,
33+
OperationInitialContextKeyNotFoundError,
3334
OperationNotCancellableError,
3435
StepNameNotInCurrentGroupError,
3536
StepNotInErrorStateError,
@@ -106,6 +107,12 @@ async def start_operation(
106107
# check if operation is registered
107108
operation = OperationRegistry.get_operation(operation_name)
108109

110+
for required_key in operation.initial_context_required_keys:
111+
if required_key not in initial_operation_context:
112+
raise OperationInitialContextKeyNotFoundError(
113+
operation_name=operation_name, required_key=required_key
114+
)
115+
109116
# NOTE: to ensure reproducibility of operations, the
110117
# operation steps cannot overwrite keys in the
111118
# initial context with their results
@@ -237,6 +244,8 @@ async def restart_operation_step_stuck_in_error(
237244
"""
238245
Force a step stuck in an error state to retry.
239246
Will raise errors if step cannot be retried.
247+
248+
raises NoDataFoundError
240249
"""
241250
schedule_data_proxy = ScheduleDataStoreProxy(
242251
store=self._store, schedule_id=schedule_id
@@ -742,6 +751,8 @@ async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None:
742751
743752
`reverting` refers to the act of reverting the effects of a step
744753
that has already been completed (eg: remove a created network)
754+
755+
raises NoDataFoundError
745756
"""
746757
await Core.get_from_app_state(app).cancel_operation(schedule_id)
747758

@@ -763,6 +774,8 @@ async def restart_operation_step_stuck_in_manual_intervention_during_execute(
763774
`waiting for manual intervention` refers to a step that has failed and exhausted
764775
all retries and is now waiting for a human to fix the issue (eg: storage service
765776
is reachable once again)
777+
778+
raises NoDataFoundError
766779
"""
767780
await Core.get_from_app_state(app).restart_operation_step_stuck_in_error(
768781
schedule_id, step_name, in_manual_intervention=True
@@ -778,6 +791,8 @@ async def restart_operation_step_stuck_during_revert(
778791
`stuck step` is a step that has failed and exhausted all retries
779792
`reverting` refers to the act of reverting the effects of a step
780793
that has already been completed (eg: remove a created network)
794+
795+
raises NoDataFoundError
781796
"""
782797
await Core.get_from_app_state(app).restart_operation_step_stuck_in_error(
783798
schedule_id, step_name, in_manual_intervention=False

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@ class OperationNotFoundError(BaseGenericSchedulerError):
1919
)
2020

2121

22-
class StepNotFoundInoperationError(BaseGenericSchedulerError):
22+
class OperationInitialContextKeyNotFoundError(BaseGenericSchedulerError):
23+
msg_template: str = (
24+
"Operation '{operation_name}' required_key='{required_key}' not in initial_operation_context"
25+
)
26+
27+
28+
class StepNotFoundInOperationError(BaseGenericSchedulerError):
2329
msg_template: str = (
2430
"Step '{step_name}' not found steps_names='{steps_names}' for operation '{operation_name}'"
2531
)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from servicelib.logging_utils import log_context
66

77
from ._core import start_operation
8+
from ._errors import OperationInitialContextKeyNotFoundError
89
from ._models import (
910
EventType,
1011
OperationContext,
@@ -50,7 +51,12 @@ async def register_to_start_after(
5051
return
5152

5253
# ensure operation exists
53-
OperationRegistry.get_operation(to_start.operation_name)
54+
operation = OperationRegistry.get_operation(to_start.operation_name)
55+
for required_key in operation.initial_context_required_keys:
56+
if required_key not in to_start.initial_context:
57+
raise OperationInitialContextKeyNotFoundError(
58+
operation_name=to_start.operation_name, required_key=required_key
59+
)
5460

5561
await events_proxy.create_or_update_multiple(
5662
{

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from ._errors import (
1010
OperationAlreadyRegisteredError,
1111
OperationNotFoundError,
12-
StepNotFoundInoperationError,
12+
StepNotFoundInOperationError,
1313
)
1414
from ._models import (
1515
ALL_RESERVED_CONTEXT_KEYS,
@@ -232,9 +232,17 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup:
232232

233233
class Operation:
234234
def __init__(
235-
self, *step_groups: BaseStepGroup, is_cancellable: bool = True
235+
self,
236+
*step_groups: BaseStepGroup,
237+
initial_context_required_keys: set[str] | None = None,
238+
is_cancellable: bool = True,
236239
) -> None:
237240
self.step_groups = list(step_groups)
241+
self.initial_context_required_keys = (
242+
set()
243+
if initial_context_required_keys is None
244+
else initial_context_required_keys
245+
)
238246
self.is_cancellable = is_cancellable
239247

240248
def __repr__(self) -> str:
@@ -382,7 +390,7 @@ def get_step(
382390

383391
steps_names = set(cls._OPERATIONS[operation_name]["steps"].keys())
384392
if step_name not in steps_names:
385-
raise StepNotFoundInoperationError(
393+
raise StepNotFoundInOperationError(
386394
step_name=step_name,
387395
operation_name=operation_name,
388396
steps_names=steps_names,

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import (
66
OperationAlreadyRegisteredError,
77
OperationNotFoundError,
8-
StepNotFoundInoperationError,
8+
StepNotFoundInOperationError,
99
)
1010
from simcore_service_dynamic_scheduler.services.generic_scheduler._models import (
1111
ALL_RESERVED_CONTEXT_KEYS,
@@ -237,7 +237,7 @@ def test_operation_registry_raises_errors():
237237
with pytest.raises(OperationNotFoundError):
238238
OperationRegistry.get_step("non_existing", "BS1")
239239

240-
with pytest.raises(StepNotFoundInoperationError):
240+
with pytest.raises(StepNotFoundInOperationError):
241241
OperationRegistry.get_step("op1", "non_existing")
242242

243243

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
register_to_start_after_on_reverted_completed,
4141
start_operation,
4242
)
43+
from simcore_service_dynamic_scheduler.services.generic_scheduler._core import (
44+
OperationContext,
45+
)
46+
from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import (
47+
OperationInitialContextKeyNotFoundError,
48+
)
4349
from utils import (
4450
BaseExpectedStepOrder,
4551
ExecuteRandom,
@@ -438,8 +444,8 @@ async def test_can_recover_from_interruption(
438444
],
439445
)
440446
async def test_run_operation_after(
441-
app: FastAPI,
442447
preserve_caplog_for_async_logging: None,
448+
app: FastAPI,
443449
steps_call_order: list[tuple[str, str]],
444450
register_operation: Callable[[OperationName, Operation], None],
445451
register_at_creation: bool,
@@ -481,3 +487,67 @@ async def test_run_operation_after(
481487

482488
await ensure_expected_order(steps_call_order, expected_order)
483489
await ensure_keys_in_store(app, expected_keys=set())
490+
491+
492+
async def test_missing_initial_context_key_from_operation(
493+
preserve_caplog_for_async_logging: None,
494+
app: FastAPI,
495+
register_operation: Callable[[OperationName, Operation], None],
496+
):
497+
good_operation_name: OperationName = "good"
498+
bad_operation_name: OperationName = "bad"
499+
500+
operation = Operation(
501+
SingleStepGroup(_ShortSleep), initial_context_required_keys={"required_key"}
502+
)
503+
register_operation(good_operation_name, operation)
504+
register_operation(bad_operation_name, operation)
505+
506+
common_initial_context = {"unused1": "value1", "unused2": "value2"}
507+
good_initial_context: OperationContext = {
508+
"required_key": "some_value",
509+
**common_initial_context,
510+
}
511+
bad_initial_context: OperationContext = {**common_initial_context}
512+
513+
bad_operation_to_start = OperationToStart(
514+
operation_name=bad_operation_name, initial_context=bad_initial_context
515+
)
516+
517+
# 1. check it works
518+
await start_operation(app, bad_operation_name, good_initial_context)
519+
520+
# 2. check it raises with a bad context
521+
with pytest.raises(OperationInitialContextKeyNotFoundError):
522+
await start_operation(app, bad_operation_name, bad_initial_context)
523+
524+
with pytest.raises(OperationInitialContextKeyNotFoundError):
525+
await start_operation(
526+
app,
527+
good_operation_name,
528+
good_initial_context,
529+
on_execute_completed=bad_operation_to_start,
530+
on_revert_completed=None,
531+
)
532+
533+
with pytest.raises(OperationInitialContextKeyNotFoundError):
534+
await start_operation(
535+
app,
536+
good_operation_name,
537+
good_initial_context,
538+
on_execute_completed=None,
539+
on_revert_completed=bad_operation_to_start,
540+
)
541+
542+
# 3. register_to_start_after... raises with a bad context
543+
schedule_id = await start_operation(app, bad_operation_name, good_initial_context)
544+
545+
with pytest.raises(OperationInitialContextKeyNotFoundError):
546+
await register_to_start_after_on_executed_completed(
547+
app, schedule_id, to_start=bad_operation_to_start
548+
)
549+
550+
with pytest.raises(OperationInitialContextKeyNotFoundError):
551+
await register_to_start_after_on_reverted_completed(
552+
app, schedule_id, to_start=bad_operation_to_start
553+
)

0 commit comments

Comments
 (0)