Skip to content

Commit d5e65cc

Browse files
author
Andrei Neagu
committed
added initial operation context key requirements
1 parent fd9641d commit d5e65cc

File tree

5 files changed

+100
-3
lines changed

5 files changed

+100
-3
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ._errors import (
3131
CannotCancelWhileWaitingForManualInterventionError,
3232
NoDataFoundError,
33+
OperationInitialContextKeyNotFoundError,
3334
OperationNotCancellableError,
3435
StepNameNotInCurrentGroupError,
3536
StepNotInErrorStateError,
@@ -106,6 +107,12 @@ async def start_operation(
106107
# check if operation is registered
107108
operation = OperationRegistry.get_operation(operation_name)
108109

110+
for required_key in operation.initial_context_required_keys:
111+
if required_key not in initial_operation_context:
112+
raise OperationInitialContextKeyNotFoundError(
113+
operation_name=operation_name, required_key=required_key
114+
)
115+
109116
# NOTE: to ensure reproducibility of operations, the
110117
# operation steps cannot overwrite keys in the
111118
# initial context with their results

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ class OperationNotFoundError(BaseGenericSchedulerError):
1919
)
2020

2121

22+
class OperationInitialContextKeyNotFoundError(BaseGenericSchedulerError):
23+
msg_template: str = (
24+
"Operation '{operation_name}' required_key='{required_key}' not in initial_operation_context"
25+
)
26+
27+
2228
class StepNotFoundInoperationError(BaseGenericSchedulerError):
2329
msg_template: str = (
2430
"Step '{step_name}' not found steps_names='{steps_names}' for operation '{operation_name}'"

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from servicelib.logging_utils import log_context
66

77
from ._core import start_operation
8+
from ._errors import OperationInitialContextKeyNotFoundError
89
from ._models import (
910
EventType,
1011
OperationContext,
@@ -50,7 +51,12 @@ async def register_to_start_after(
5051
return
5152

5253
# ensure operation exists
53-
OperationRegistry.get_operation(to_start.operation_name)
54+
operation = OperationRegistry.get_operation(to_start.operation_name)
55+
for required_key in operation.initial_context_required_keys:
56+
if required_key not in to_start.initial_context:
57+
raise OperationInitialContextKeyNotFoundError(
58+
operation_name=to_start.operation_name, required_key=required_key
59+
)
5460

5561
await events_proxy.create_or_update_multiple(
5662
{

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,17 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup:
232232

233233
class Operation:
234234
def __init__(
235-
self, *step_groups: BaseStepGroup, is_cancellable: bool = True
235+
self,
236+
*step_groups: BaseStepGroup,
237+
initial_context_required_keys: set[str] | None = None,
238+
is_cancellable: bool = True,
236239
) -> None:
237240
self.step_groups = list(step_groups)
241+
self.initial_context_required_keys = (
242+
set()
243+
if initial_context_required_keys is None
244+
else initial_context_required_keys
245+
)
238246
self.is_cancellable = is_cancellable
239247

240248
def __repr__(self) -> str:

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
register_to_start_after_on_reverted_completed,
4141
start_operation,
4242
)
43+
from simcore_service_dynamic_scheduler.services.generic_scheduler._core import (
44+
OperationContext,
45+
)
46+
from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import (
47+
OperationInitialContextKeyNotFoundError,
48+
)
4349
from utils import (
4450
BaseExpectedStepOrder,
4551
ExecuteRandom,
@@ -438,8 +444,8 @@ async def test_can_recover_from_interruption(
438444
],
439445
)
440446
async def test_run_operation_after(
441-
app: FastAPI,
442447
preserve_caplog_for_async_logging: None,
448+
app: FastAPI,
443449
steps_call_order: list[tuple[str, str]],
444450
register_operation: Callable[[OperationName, Operation], None],
445451
register_at_creation: bool,
@@ -481,3 +487,67 @@ async def test_run_operation_after(
481487

482488
await ensure_expected_order(steps_call_order, expected_order)
483489
await ensure_keys_in_store(app, expected_keys=set())
490+
491+
492+
async def test_missing_initial_context_key_from_operation(
493+
preserve_caplog_for_async_logging: None,
494+
app: FastAPI,
495+
register_operation: Callable[[OperationName, Operation], None],
496+
):
497+
good_operation_name: OperationName = "good"
498+
bad_operation_name: OperationName = "bad"
499+
500+
operation = Operation(
501+
SingleStepGroup(_ShortSleep), initial_context_required_keys={"required_key"}
502+
)
503+
register_operation(good_operation_name, operation)
504+
register_operation(bad_operation_name, operation)
505+
506+
common_initial_context = {"unsued1": "value1", "unsued2": "value2"}
507+
good_initial_context: OperationContext = {
508+
"required_key": "soeme_value",
509+
**common_initial_context,
510+
}
511+
bad_initial_context: OperationContext = {**common_initial_context}
512+
513+
bad_operation_to_start = OperationToStart(
514+
operation_name=bad_operation_name, initial_context=bad_initial_context
515+
)
516+
517+
# 1. check it works
518+
await start_operation(app, bad_operation_name, good_initial_context)
519+
520+
# 2. check it raises with a bad context
521+
with pytest.raises(OperationInitialContextKeyNotFoundError):
522+
await start_operation(app, bad_operation_name, bad_initial_context)
523+
524+
with pytest.raises(OperationInitialContextKeyNotFoundError):
525+
await start_operation(
526+
app,
527+
good_operation_name,
528+
good_initial_context,
529+
on_execute_completed=bad_operation_to_start,
530+
on_revert_completed=None,
531+
)
532+
533+
with pytest.raises(OperationInitialContextKeyNotFoundError):
534+
await start_operation(
535+
app,
536+
good_operation_name,
537+
good_initial_context,
538+
on_execute_completed=None,
539+
on_revert_completed=bad_operation_to_start,
540+
)
541+
542+
# 3. register_to_start_after... raises with a bad context
543+
schedule_id = await start_operation(app, bad_operation_name, good_initial_context)
544+
545+
with pytest.raises(OperationInitialContextKeyNotFoundError):
546+
await register_to_start_after_on_executed_completed(
547+
app, schedule_id, to_start=bad_operation_to_start
548+
)
549+
550+
with pytest.raises(OperationInitialContextKeyNotFoundError):
551+
await register_to_start_after_on_reverted_completed(
552+
app, schedule_id, to_start=bad_operation_to_start
553+
)

0 commit comments

Comments
 (0)