diff --git a/packages/models-library/tests/test__pydantic_models_and_enums.py b/packages/models-library/tests/test__pydantic_models_and_enums.py index 00c67c32c9b..5de9ff8b430 100644 --- a/packages/models-library/tests/test__pydantic_models_and_enums.py +++ b/packages/models-library/tests/test__pydantic_models_and_enums.py @@ -30,7 +30,7 @@ def test_equivalent_enums_are_not_strictly_equal(): # # Here two equivalent enum BUT of type str-enum # -# SEE from models_library.utils.enums.AutoStrEnum +# SEE from models_library.utils.enums.StrAutoEnum # SEE https://docs.pydantic.dev/dev-v2/usage/types/enums/ # diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py index 6344c6018a2..5af60ab2ead 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py @@ -10,6 +10,7 @@ get_step_group_proxy, get_step_store_proxy, ) +from ._errors import NoDataFoundError from ._event_after_registration import ( register_to_start_after_on_executed_completed, register_to_start_after_on_reverted_completed, @@ -24,21 +25,28 @@ ) from ._operation import ( BaseStep, + BaseStepGroup, Operation, OperationRegistry, ParallelStepGroup, SingleStepGroup, ) -from ._store import OperationContextProxy, StepGroupProxy, StepStoreProxy +from ._store import ( + OperationContextProxy, + StepGroupProxy, + StepStoreProxy, +) __all__: tuple[str, ...] = ( "BaseStep", + "BaseStepGroup", "cancel_operation", "generic_scheduler_lifespan", "get_operation_context_proxy", "get_operation_name_or_none", "get_step_group_proxy", "get_step_store_proxy", + "NoDataFoundError", "Operation", "OperationContextProxy", "OperationName", diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py index 1742332f756..3db3478252d 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py @@ -30,6 +30,7 @@ from ._errors import ( CannotCancelWhileWaitingForManualInterventionError, NoDataFoundError, + OperationInitialContextKeyNotFoundError, OperationNotCancellableError, StepNameNotInCurrentGroupError, StepNotInErrorStateError, @@ -106,6 +107,12 @@ async def start_operation( # check if operation is registered operation = OperationRegistry.get_operation(operation_name) + for required_key in operation.initial_context_required_keys: + if required_key not in initial_operation_context: + raise OperationInitialContextKeyNotFoundError( + operation_name=operation_name, required_key=required_key + ) + # NOTE: to ensure reproducibility of operations, the # operation steps cannot overwrite keys in the # initial context with their results @@ -237,6 +244,8 @@ async def restart_operation_step_stuck_in_error( """ Force a step stuck in an error state to retry. Will raise errors if step cannot be retried. + + raises NoDataFoundError """ schedule_data_proxy = ScheduleDataStoreProxy( store=self._store, schedule_id=schedule_id @@ -742,6 +751,8 @@ async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None: `reverting` refers to the act of reverting the effects of a step that has already been completed (eg: remove a created network) + + raises NoDataFoundError """ await Core.get_from_app_state(app).cancel_operation(schedule_id) @@ -763,6 +774,8 @@ async def restart_operation_step_stuck_in_manual_intervention_during_execute( `waiting for manual intervention` refers to a step that has failed and exhausted all retries and is now waiting for a human to fix the issue (eg: storage service is reachable once again) + + raises NoDataFoundError """ await Core.get_from_app_state(app).restart_operation_step_stuck_in_error( schedule_id, step_name, in_manual_intervention=True @@ -778,6 +791,8 @@ async def restart_operation_step_stuck_during_revert( `stuck step` is a step that has failed and exhausted all retries `reverting` refers to the act of reverting the effects of a step that has already been completed (eg: remove a created network) + + raises NoDataFoundError """ await Core.get_from_app_state(app).restart_operation_step_stuck_in_error( schedule_id, step_name, in_manual_intervention=False diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py index fc3b8e7d31e..32357a413b2 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py @@ -19,7 +19,13 @@ class OperationNotFoundError(BaseGenericSchedulerError): ) -class StepNotFoundInoperationError(BaseGenericSchedulerError): +class OperationInitialContextKeyNotFoundError(BaseGenericSchedulerError): + msg_template: str = ( + "Operation '{operation_name}' required_key='{required_key}' not in initial_operation_context" + ) + + +class StepNotFoundInOperationError(BaseGenericSchedulerError): msg_template: str = ( "Step '{step_name}' not found steps_names='{steps_names}' for operation '{operation_name}'" ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py index adfe9fbcf3c..9194c11e1cc 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py @@ -5,6 +5,7 @@ from servicelib.logging_utils import log_context from ._core import start_operation +from ._errors import OperationInitialContextKeyNotFoundError from ._models import ( EventType, OperationContext, @@ -50,7 +51,12 @@ async def register_to_start_after( return # ensure operation exists - OperationRegistry.get_operation(to_start.operation_name) + operation = OperationRegistry.get_operation(to_start.operation_name) + for required_key in operation.initial_context_required_keys: + if required_key not in to_start.initial_context: + raise OperationInitialContextKeyNotFoundError( + operation_name=to_start.operation_name, required_key=required_key + ) await events_proxy.create_or_update_multiple( { diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py index 5fa8c55f6c3..f9c5165ad23 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py @@ -9,7 +9,7 @@ from ._errors import ( OperationAlreadyRegisteredError, OperationNotFoundError, - StepNotFoundInoperationError, + StepNotFoundInOperationError, ) from ._models import ( ALL_RESERVED_CONTEXT_KEYS, @@ -232,9 +232,17 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup: class Operation: def __init__( - self, *step_groups: BaseStepGroup, is_cancellable: bool = True + self, + *step_groups: BaseStepGroup, + initial_context_required_keys: set[str] | None = None, + is_cancellable: bool = True, ) -> None: self.step_groups = list(step_groups) + self.initial_context_required_keys = ( + set() + if initial_context_required_keys is None + else initial_context_required_keys + ) self.is_cancellable = is_cancellable def __repr__(self) -> str: @@ -382,7 +390,7 @@ def get_step( steps_names = set(cls._OPERATIONS[operation_name]["steps"].keys()) if step_name not in steps_names: - raise StepNotFoundInoperationError( + raise StepNotFoundInOperationError( step_name=step_name, operation_name=operation_name, steps_names=steps_names, diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py index ee99b210b91..0c15d8517fc 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py @@ -5,7 +5,7 @@ from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import ( OperationAlreadyRegisteredError, OperationNotFoundError, - StepNotFoundInoperationError, + StepNotFoundInOperationError, ) from simcore_service_dynamic_scheduler.services.generic_scheduler._models import ( ALL_RESERVED_CONTEXT_KEYS, @@ -237,7 +237,7 @@ def test_operation_registry_raises_errors(): with pytest.raises(OperationNotFoundError): OperationRegistry.get_step("non_existing", "BS1") - with pytest.raises(StepNotFoundInoperationError): + with pytest.raises(StepNotFoundInOperationError): OperationRegistry.get_step("op1", "non_existing") diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py index 9678df96ec6..f22dee36bbf 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py @@ -40,6 +40,12 @@ register_to_start_after_on_reverted_completed, start_operation, ) +from simcore_service_dynamic_scheduler.services.generic_scheduler._core import ( + OperationContext, +) +from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import ( + OperationInitialContextKeyNotFoundError, +) from utils import ( BaseExpectedStepOrder, ExecuteRandom, @@ -438,8 +444,8 @@ async def test_can_recover_from_interruption( ], ) async def test_run_operation_after( - app: FastAPI, preserve_caplog_for_async_logging: None, + app: FastAPI, steps_call_order: list[tuple[str, str]], register_operation: Callable[[OperationName, Operation], None], register_at_creation: bool, @@ -481,3 +487,67 @@ async def test_run_operation_after( await ensure_expected_order(steps_call_order, expected_order) await ensure_keys_in_store(app, expected_keys=set()) + + +async def test_missing_initial_context_key_from_operation( + preserve_caplog_for_async_logging: None, + app: FastAPI, + register_operation: Callable[[OperationName, Operation], None], +): + good_operation_name: OperationName = "good" + bad_operation_name: OperationName = "bad" + + operation = Operation( + SingleStepGroup(_ShortSleep), initial_context_required_keys={"required_key"} + ) + register_operation(good_operation_name, operation) + register_operation(bad_operation_name, operation) + + common_initial_context = {"unused1": "value1", "unused2": "value2"} + good_initial_context: OperationContext = { + "required_key": "some_value", + **common_initial_context, + } + bad_initial_context: OperationContext = {**common_initial_context} + + bad_operation_to_start = OperationToStart( + operation_name=bad_operation_name, initial_context=bad_initial_context + ) + + # 1. check it works + await start_operation(app, bad_operation_name, good_initial_context) + + # 2. check it raises with a bad context + with pytest.raises(OperationInitialContextKeyNotFoundError): + await start_operation(app, bad_operation_name, bad_initial_context) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await start_operation( + app, + good_operation_name, + good_initial_context, + on_execute_completed=bad_operation_to_start, + on_revert_completed=None, + ) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await start_operation( + app, + good_operation_name, + good_initial_context, + on_execute_completed=None, + on_revert_completed=bad_operation_to_start, + ) + + # 3. register_to_start_after... raises with a bad context + schedule_id = await start_operation(app, bad_operation_name, good_initial_context) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await register_to_start_after_on_executed_completed( + app, schedule_id, to_start=bad_operation_to_start + ) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await register_to_start_after_on_reverted_completed( + app, schedule_id, to_start=bad_operation_to_start + )