From d5e65ccc67580f7887dd3ea559b359914da2ecd7 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 10 Oct 2025 09:55:36 +0200 Subject: [PATCH 1/5] added initial operation context key requirements --- .../services/generic_scheduler/_core.py | 7 ++ .../services/generic_scheduler/_errors.py | 6 ++ .../generic_scheduler/_event_after.py | 8 ++- .../services/generic_scheduler/_operation.py | 10 ++- .../test_generic_scheduler.py | 72 ++++++++++++++++++- 5 files changed, 100 insertions(+), 3 deletions(-) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py index 1742332f756..50a251f3080 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py @@ -30,6 +30,7 @@ from ._errors import ( CannotCancelWhileWaitingForManualInterventionError, NoDataFoundError, + OperationInitialContextKeyNotFoundError, OperationNotCancellableError, StepNameNotInCurrentGroupError, StepNotInErrorStateError, @@ -106,6 +107,12 @@ async def start_operation( # check if operation is registered operation = OperationRegistry.get_operation(operation_name) + for required_key in operation.initial_context_required_keys: + if required_key not in initial_operation_context: + raise OperationInitialContextKeyNotFoundError( + operation_name=operation_name, required_key=required_key + ) + # NOTE: to ensure reproducibility of operations, the # operation steps cannot overwrite keys in the # initial context with their results diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py index fc3b8e7d31e..978b27cb51e 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py @@ -19,6 +19,12 @@ class OperationNotFoundError(BaseGenericSchedulerError): ) +class OperationInitialContextKeyNotFoundError(BaseGenericSchedulerError): + msg_template: str = ( + "Operation '{operation_name}' required_key='{required_key}' not in initial_operation_context" + ) + + class StepNotFoundInoperationError(BaseGenericSchedulerError): msg_template: str = ( "Step '{step_name}' not found steps_names='{steps_names}' for operation '{operation_name}'" diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py index adfe9fbcf3c..9194c11e1cc 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py @@ -5,6 +5,7 @@ from servicelib.logging_utils import log_context from ._core import start_operation +from ._errors import OperationInitialContextKeyNotFoundError from ._models import ( EventType, OperationContext, @@ -50,7 +51,12 @@ async def register_to_start_after( return # ensure operation exists - OperationRegistry.get_operation(to_start.operation_name) + operation = OperationRegistry.get_operation(to_start.operation_name) + for required_key in operation.initial_context_required_keys: + if required_key not in to_start.initial_context: + raise OperationInitialContextKeyNotFoundError( + operation_name=to_start.operation_name, required_key=required_key + ) await events_proxy.create_or_update_multiple( { diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py index 5fa8c55f6c3..bf934d08276 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py @@ -232,9 +232,17 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup: class Operation: def __init__( - self, *step_groups: BaseStepGroup, is_cancellable: bool = True + self, + *step_groups: BaseStepGroup, + initial_context_required_keys: set[str] | None = None, + is_cancellable: bool = True, ) -> None: self.step_groups = list(step_groups) + self.initial_context_required_keys = ( + set() + if initial_context_required_keys is None + else initial_context_required_keys + ) self.is_cancellable = is_cancellable def __repr__(self) -> str: diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py index 9678df96ec6..36c8ef7a030 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py @@ -40,6 +40,12 @@ register_to_start_after_on_reverted_completed, start_operation, ) +from simcore_service_dynamic_scheduler.services.generic_scheduler._core import ( + OperationContext, +) +from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import ( + OperationInitialContextKeyNotFoundError, +) from utils import ( BaseExpectedStepOrder, ExecuteRandom, @@ -438,8 +444,8 @@ async def test_can_recover_from_interruption( ], ) async def test_run_operation_after( - app: FastAPI, preserve_caplog_for_async_logging: None, + app: FastAPI, steps_call_order: list[tuple[str, str]], register_operation: Callable[[OperationName, Operation], None], register_at_creation: bool, @@ -481,3 +487,67 @@ async def test_run_operation_after( await ensure_expected_order(steps_call_order, expected_order) await ensure_keys_in_store(app, expected_keys=set()) + + +async def test_missing_initial_context_key_from_operation( + preserve_caplog_for_async_logging: None, + app: FastAPI, + register_operation: Callable[[OperationName, Operation], None], +): + good_operation_name: OperationName = "good" + bad_operation_name: OperationName = "bad" + + operation = Operation( + SingleStepGroup(_ShortSleep), initial_context_required_keys={"required_key"} + ) + register_operation(good_operation_name, operation) + register_operation(bad_operation_name, operation) + + common_initial_context = {"unsued1": "value1", "unsued2": "value2"} + good_initial_context: OperationContext = { + "required_key": "soeme_value", + **common_initial_context, + } + bad_initial_context: OperationContext = {**common_initial_context} + + bad_operation_to_start = OperationToStart( + operation_name=bad_operation_name, initial_context=bad_initial_context + ) + + # 1. check it works + await start_operation(app, bad_operation_name, good_initial_context) + + # 2. check it raises with a bad context + with pytest.raises(OperationInitialContextKeyNotFoundError): + await start_operation(app, bad_operation_name, bad_initial_context) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await start_operation( + app, + good_operation_name, + good_initial_context, + on_execute_completed=bad_operation_to_start, + on_revert_completed=None, + ) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await start_operation( + app, + good_operation_name, + good_initial_context, + on_execute_completed=None, + on_revert_completed=bad_operation_to_start, + ) + + # 3. register_to_start_after... raises with a bad context + schedule_id = await start_operation(app, bad_operation_name, good_initial_context) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await register_to_start_after_on_executed_completed( + app, schedule_id, to_start=bad_operation_to_start + ) + + with pytest.raises(OperationInitialContextKeyNotFoundError): + await register_to_start_after_on_reverted_completed( + app, schedule_id, to_start=bad_operation_to_start + ) From 84dc9aec77af17e821f50f4062c47fc6f72c750b Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 10 Oct 2025 09:59:34 +0200 Subject: [PATCH 2/5] typos --- .../services/generic_scheduler/_errors.py | 2 +- .../services/generic_scheduler/_operation.py | 4 ++-- .../tests/unit/services/generic_scheduler/test__operation.py | 4 ++-- .../unit/services/generic_scheduler/test_generic_scheduler.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py index 978b27cb51e..32357a413b2 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py @@ -25,7 +25,7 @@ class OperationInitialContextKeyNotFoundError(BaseGenericSchedulerError): ) -class StepNotFoundInoperationError(BaseGenericSchedulerError): +class StepNotFoundInOperationError(BaseGenericSchedulerError): msg_template: str = ( "Step '{step_name}' not found steps_names='{steps_names}' for operation '{operation_name}'" ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py index bf934d08276..f9c5165ad23 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py @@ -9,7 +9,7 @@ from ._errors import ( OperationAlreadyRegisteredError, OperationNotFoundError, - StepNotFoundInoperationError, + StepNotFoundInOperationError, ) from ._models import ( ALL_RESERVED_CONTEXT_KEYS, @@ -390,7 +390,7 @@ def get_step( steps_names = set(cls._OPERATIONS[operation_name]["steps"].keys()) if step_name not in steps_names: - raise StepNotFoundInoperationError( + raise StepNotFoundInOperationError( step_name=step_name, operation_name=operation_name, steps_names=steps_names, diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py index ee99b210b91..0c15d8517fc 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__operation.py @@ -5,7 +5,7 @@ from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import ( OperationAlreadyRegisteredError, OperationNotFoundError, - StepNotFoundInoperationError, + StepNotFoundInOperationError, ) from simcore_service_dynamic_scheduler.services.generic_scheduler._models import ( ALL_RESERVED_CONTEXT_KEYS, @@ -237,7 +237,7 @@ def test_operation_registry_raises_errors(): with pytest.raises(OperationNotFoundError): OperationRegistry.get_step("non_existing", "BS1") - with pytest.raises(StepNotFoundInoperationError): + with pytest.raises(StepNotFoundInOperationError): OperationRegistry.get_step("op1", "non_existing") diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py index 36c8ef7a030..f22dee36bbf 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py @@ -503,9 +503,9 @@ async def test_missing_initial_context_key_from_operation( register_operation(good_operation_name, operation) register_operation(bad_operation_name, operation) - common_initial_context = {"unsued1": "value1", "unsued2": "value2"} + common_initial_context = {"unused1": "value1", "unused2": "value2"} good_initial_context: OperationContext = { - "required_key": "soeme_value", + "required_key": "some_value", **common_initial_context, } bad_initial_context: OperationContext = {**common_initial_context} From b13337f7ad737ac93cfefb5a63f6e422aeb49c35 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 10 Oct 2025 10:03:49 +0200 Subject: [PATCH 3/5] expose NoDataFoundError and added note --- .../tests/test__pydantic_models_and_enums.py | 2 +- .../services/generic_scheduler/__init__.py | 8 +++++++- .../services/generic_scheduler/_core.py | 8 ++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/packages/models-library/tests/test__pydantic_models_and_enums.py b/packages/models-library/tests/test__pydantic_models_and_enums.py index 00c67c32c9b..5de9ff8b430 100644 --- a/packages/models-library/tests/test__pydantic_models_and_enums.py +++ b/packages/models-library/tests/test__pydantic_models_and_enums.py @@ -30,7 +30,7 @@ def test_equivalent_enums_are_not_strictly_equal(): # # Here two equivalent enum BUT of type str-enum # -# SEE from models_library.utils.enums.AutoStrEnum +# SEE from models_library.utils.enums.StrAutoEnum # SEE https://docs.pydantic.dev/dev-v2/usage/types/enums/ # diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py index 6344c6018a2..40410765d27 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py @@ -29,7 +29,12 @@ ParallelStepGroup, SingleStepGroup, ) -from ._store import OperationContextProxy, StepGroupProxy, StepStoreProxy +from ._store import ( + NoDataFoundError, + OperationContextProxy, + StepGroupProxy, + StepStoreProxy, +) __all__: tuple[str, ...] = ( "BaseStep", @@ -39,6 +44,7 @@ "get_operation_name_or_none", "get_step_group_proxy", "get_step_store_proxy", + "NoDataFoundError", "Operation", "OperationContextProxy", "OperationName", diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py index 50a251f3080..3db3478252d 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py @@ -244,6 +244,8 @@ async def restart_operation_step_stuck_in_error( """ Force a step stuck in an error state to retry. Will raise errors if step cannot be retried. + + raises NoDataFoundError """ schedule_data_proxy = ScheduleDataStoreProxy( store=self._store, schedule_id=schedule_id @@ -749,6 +751,8 @@ async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None: `reverting` refers to the act of reverting the effects of a step that has already been completed (eg: remove a created network) + + raises NoDataFoundError """ await Core.get_from_app_state(app).cancel_operation(schedule_id) @@ -770,6 +774,8 @@ async def restart_operation_step_stuck_in_manual_intervention_during_execute( `waiting for manual intervention` refers to a step that has failed and exhausted all retries and is now waiting for a human to fix the issue (eg: storage service is reachable once again) + + raises NoDataFoundError """ await Core.get_from_app_state(app).restart_operation_step_stuck_in_error( schedule_id, step_name, in_manual_intervention=True @@ -785,6 +791,8 @@ async def restart_operation_step_stuck_during_revert( `stuck step` is a step that has failed and exhausted all retries `reverting` refers to the act of reverting the effects of a step that has already been completed (eg: remove a created network) + + raises NoDataFoundError """ await Core.get_from_app_state(app).restart_operation_step_stuck_in_error( schedule_id, step_name, in_manual_intervention=False From 3907368e9eeae2ec0973e3b063114c8f64a1c8c3 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 10 Oct 2025 10:08:46 +0200 Subject: [PATCH 4/5] fixed bad import --- .../services/generic_scheduler/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py index 40410765d27..a1a41f05327 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py @@ -10,6 +10,7 @@ get_step_group_proxy, get_step_store_proxy, ) +from ._errors import NoDataFoundError from ._event_after_registration import ( register_to_start_after_on_executed_completed, register_to_start_after_on_reverted_completed, @@ -30,7 +31,6 @@ SingleStepGroup, ) from ._store import ( - NoDataFoundError, OperationContextProxy, StepGroupProxy, StepStoreProxy, From 6262ef5c09d1e22380a524e34bc8ac247377db99 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 10 Oct 2025 10:53:13 +0200 Subject: [PATCH 5/5] exposed extra required --- .../services/generic_scheduler/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py index a1a41f05327..5af60ab2ead 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py @@ -25,6 +25,7 @@ ) from ._operation import ( BaseStep, + BaseStepGroup, Operation, OperationRegistry, ParallelStepGroup, @@ -38,6 +39,7 @@ __all__: tuple[str, ...] = ( "BaseStep", + "BaseStepGroup", "cancel_operation", "generic_scheduler_lifespan", "get_operation_context_proxy",