Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_equivalent_enums_are_not_strictly_equal():
#
# Here two equivalent enum BUT of type str-enum
#
# SEE from models_library.utils.enums.AutoStrEnum
# SEE from models_library.utils.enums.StrAutoEnum
# SEE https://docs.pydantic.dev/dev-v2/usage/types/enums/
#

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_step_group_proxy,
get_step_store_proxy,
)
from ._errors import NoDataFoundError
from ._event_after_registration import (
register_to_start_after_on_executed_completed,
register_to_start_after_on_reverted_completed,
Expand All @@ -29,7 +30,11 @@
ParallelStepGroup,
SingleStepGroup,
)
from ._store import OperationContextProxy, StepGroupProxy, StepStoreProxy
from ._store import (
OperationContextProxy,
StepGroupProxy,
StepStoreProxy,
)

__all__: tuple[str, ...] = (
"BaseStep",
Expand All @@ -39,6 +44,7 @@
"get_operation_name_or_none",
"get_step_group_proxy",
"get_step_store_proxy",
"NoDataFoundError",
"Operation",
"OperationContextProxy",
"OperationName",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ._errors import (
CannotCancelWhileWaitingForManualInterventionError,
NoDataFoundError,
OperationInitialContextKeyNotFoundError,
OperationNotCancellableError,
StepNameNotInCurrentGroupError,
StepNotInErrorStateError,
Expand Down Expand Up @@ -106,6 +107,12 @@ async def start_operation(
# check if operation is registered
operation = OperationRegistry.get_operation(operation_name)

for required_key in operation.initial_context_required_keys:
if required_key not in initial_operation_context:
raise OperationInitialContextKeyNotFoundError(
operation_name=operation_name, required_key=required_key
)

# NOTE: to ensure reproducibility of operations, the
# operation steps cannot overwrite keys in the
# initial context with their results
Expand Down Expand Up @@ -237,6 +244,8 @@ async def restart_operation_step_stuck_in_error(
"""
Force a step stuck in an error state to retry.
Will raise errors if step cannot be retried.

raises NoDataFoundError
"""
schedule_data_proxy = ScheduleDataStoreProxy(
store=self._store, schedule_id=schedule_id
Expand Down Expand Up @@ -742,6 +751,8 @@ async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None:

`reverting` refers to the act of reverting the effects of a step
that has already been completed (eg: remove a created network)

raises NoDataFoundError
"""
await Core.get_from_app_state(app).cancel_operation(schedule_id)

Expand All @@ -763,6 +774,8 @@ async def restart_operation_step_stuck_in_manual_intervention_during_execute(
`waiting for manual intervention` refers to a step that has failed and exhausted
all retries and is now waiting for a human to fix the issue (eg: storage service
is reachable once again)

raises NoDataFoundError
"""
await Core.get_from_app_state(app).restart_operation_step_stuck_in_error(
schedule_id, step_name, in_manual_intervention=True
Expand All @@ -778,6 +791,8 @@ async def restart_operation_step_stuck_during_revert(
`stuck step` is a step that has failed and exhausted all retries
`reverting` refers to the act of reverting the effects of a step
that has already been completed (eg: remove a created network)

raises NoDataFoundError
"""
await Core.get_from_app_state(app).restart_operation_step_stuck_in_error(
schedule_id, step_name, in_manual_intervention=False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ class OperationNotFoundError(BaseGenericSchedulerError):
)


class StepNotFoundInoperationError(BaseGenericSchedulerError):
class OperationInitialContextKeyNotFoundError(BaseGenericSchedulerError):
msg_template: str = (
"Operation '{operation_name}' required_key='{required_key}' not in initial_operation_context"
)


class StepNotFoundInOperationError(BaseGenericSchedulerError):
msg_template: str = (
"Step '{step_name}' not found steps_names='{steps_names}' for operation '{operation_name}'"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from servicelib.logging_utils import log_context

from ._core import start_operation
from ._errors import OperationInitialContextKeyNotFoundError
from ._models import (
EventType,
OperationContext,
Expand Down Expand Up @@ -50,7 +51,12 @@ async def register_to_start_after(
return

# ensure operation exists
OperationRegistry.get_operation(to_start.operation_name)
operation = OperationRegistry.get_operation(to_start.operation_name)
for required_key in operation.initial_context_required_keys:
if required_key not in to_start.initial_context:
raise OperationInitialContextKeyNotFoundError(
operation_name=to_start.operation_name, required_key=required_key
)

await events_proxy.create_or_update_multiple(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ._errors import (
OperationAlreadyRegisteredError,
OperationNotFoundError,
StepNotFoundInoperationError,
StepNotFoundInOperationError,
)
from ._models import (
ALL_RESERVED_CONTEXT_KEYS,
Expand Down Expand Up @@ -232,9 +232,17 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup:

class Operation:
def __init__(
self, *step_groups: BaseStepGroup, is_cancellable: bool = True
self,
*step_groups: BaseStepGroup,
initial_context_required_keys: set[str] | None = None,
is_cancellable: bool = True,
) -> None:
self.step_groups = list(step_groups)
self.initial_context_required_keys = (
set()
if initial_context_required_keys is None
else initial_context_required_keys
)
self.is_cancellable = is_cancellable

def __repr__(self) -> str:
Expand Down Expand Up @@ -382,7 +390,7 @@ def get_step(

steps_names = set(cls._OPERATIONS[operation_name]["steps"].keys())
if step_name not in steps_names:
raise StepNotFoundInoperationError(
raise StepNotFoundInOperationError(
step_name=step_name,
operation_name=operation_name,
steps_names=steps_names,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import (
OperationAlreadyRegisteredError,
OperationNotFoundError,
StepNotFoundInoperationError,
StepNotFoundInOperationError,
)
from simcore_service_dynamic_scheduler.services.generic_scheduler._models import (
ALL_RESERVED_CONTEXT_KEYS,
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_operation_registry_raises_errors():
with pytest.raises(OperationNotFoundError):
OperationRegistry.get_step("non_existing", "BS1")

with pytest.raises(StepNotFoundInoperationError):
with pytest.raises(StepNotFoundInOperationError):
OperationRegistry.get_step("op1", "non_existing")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
register_to_start_after_on_reverted_completed,
start_operation,
)
from simcore_service_dynamic_scheduler.services.generic_scheduler._core import (
OperationContext,
)
from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import (
OperationInitialContextKeyNotFoundError,
)
from utils import (
BaseExpectedStepOrder,
ExecuteRandom,
Expand Down Expand Up @@ -438,8 +444,8 @@ async def test_can_recover_from_interruption(
],
)
async def test_run_operation_after(
app: FastAPI,
preserve_caplog_for_async_logging: None,
app: FastAPI,
steps_call_order: list[tuple[str, str]],
register_operation: Callable[[OperationName, Operation], None],
register_at_creation: bool,
Expand Down Expand Up @@ -481,3 +487,67 @@ async def test_run_operation_after(

await ensure_expected_order(steps_call_order, expected_order)
await ensure_keys_in_store(app, expected_keys=set())


async def test_missing_initial_context_key_from_operation(
preserve_caplog_for_async_logging: None,
app: FastAPI,
register_operation: Callable[[OperationName, Operation], None],
):
good_operation_name: OperationName = "good"
bad_operation_name: OperationName = "bad"

operation = Operation(
SingleStepGroup(_ShortSleep), initial_context_required_keys={"required_key"}
)
register_operation(good_operation_name, operation)
register_operation(bad_operation_name, operation)

common_initial_context = {"unused1": "value1", "unused2": "value2"}
good_initial_context: OperationContext = {
"required_key": "some_value",
**common_initial_context,
}
bad_initial_context: OperationContext = {**common_initial_context}

bad_operation_to_start = OperationToStart(
operation_name=bad_operation_name, initial_context=bad_initial_context
)

# 1. check it works
await start_operation(app, bad_operation_name, good_initial_context)

# 2. check it raises with a bad context
with pytest.raises(OperationInitialContextKeyNotFoundError):
await start_operation(app, bad_operation_name, bad_initial_context)

with pytest.raises(OperationInitialContextKeyNotFoundError):
await start_operation(
app,
good_operation_name,
good_initial_context,
on_execute_completed=bad_operation_to_start,
on_revert_completed=None,
)

with pytest.raises(OperationInitialContextKeyNotFoundError):
await start_operation(
app,
good_operation_name,
good_initial_context,
on_execute_completed=None,
on_revert_completed=bad_operation_to_start,
)

# 3. register_to_start_after... raises with a bad context
schedule_id = await start_operation(app, bad_operation_name, good_initial_context)

with pytest.raises(OperationInitialContextKeyNotFoundError):
await register_to_start_after_on_executed_completed(
app, schedule_id, to_start=bad_operation_to_start
)

with pytest.raises(OperationInitialContextKeyNotFoundError):
await register_to_start_after_on_reverted_completed(
app, schedule_id, to_start=bad_operation_to_start
)
Loading