Skip to content

Commit e9a8f0e

Browse files
author
Andrei Neagu
committed
added tests for manual intervention
1 parent 0b3b9b2 commit e9a8f0e

File tree

3 files changed

+121
-3
lines changed
  • services/dynamic-scheduler

3 files changed

+121
-3
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from ._deferred_runner import DeferredRunner
1818
from ._dependencies import enqueue_schedule_event
1919
from ._errors import (
20+
CannotCancelWhileWaitingForManualInterventionError,
2021
InitialOperationContextKeyNotAllowedError,
2122
KeyNotFoundInHashError,
2223
UnexpectedStepHandlingError,
@@ -244,6 +245,14 @@ async def cancel_schedule(self, schedule_id: ScheduleId) -> None:
244245
operation = OperationRegistry.get_operation(operation_name)
245246
group = operation[group_index]
246247

248+
if any(
249+
step.wait_for_manual_intervention()
250+
for step in group.get_step_subgroup_to_run()
251+
):
252+
raise CannotCancelWhileWaitingForManualInterventionError(
253+
schedule_id=schedule_id
254+
)
255+
247256
for step in group.get_step_subgroup_to_run():
248257
step_name = step.get_step_name()
249258
step_proxy = StepStoreProxy(

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,9 @@ class InitialOperationContextKeyNotAllowedError(BaseGenericSchedulerError):
4646
msg_template: str = (
4747
"Initial operation context cannot contain key '{key}' since it is provided by the operation: {operation}"
4848
)
49+
50+
51+
class CannotCancelWhileWaitingForManualInterventionError(BaseGenericSchedulerError):
52+
msg_template: str = (
53+
"Cannot cancel schedule_id='{schedule_id}' while one or more steps are waiting for manual intervention."
54+
)

services/dynamic-scheduler/tests/unit/service_generic_scheduler/test__core.py

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
from simcore_service_dynamic_scheduler.services.generic_scheduler._core import (
2323
get_core,
2424
)
25+
from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import (
26+
CannotCancelWhileWaitingForManualInterventionError,
27+
)
2528
from simcore_service_dynamic_scheduler.services.generic_scheduler._models import (
2629
OperationName,
2730
ProvidedOperationContext,
@@ -207,6 +210,12 @@ async def create(
207210
await asyncio.sleep(1e10)
208211

209212

213+
class _WaitManualInerventionBS(_RevertBS):
214+
@classmethod
215+
def wait_for_manual_intervention(cls) -> bool:
216+
return True
217+
218+
210219
class _BaseExpectedStepOrder:
211220
def __init__(self, *steps: type[BaseStep]) -> None:
212221
self.steps = steps
@@ -412,6 +421,15 @@ class _SF1(_SleepsForeverBS): ...
412421
class _SF2(_SleepsForeverBS): ...
413422

414423

424+
# Below will wait for manual intervention after it fails on create
425+
426+
427+
class _WMI1(_WaitManualInerventionBS): ...
428+
429+
430+
class _WMI2(_WaitManualInerventionBS): ...
431+
432+
415433
@pytest.mark.parametrize("app_count", [10])
416434
@pytest.mark.parametrize(
417435
"operation, expected_order",
@@ -893,9 +911,94 @@ async def test_repeating_step(
893911
await _ensure_keys_in_store(selected_app, expected_keys=set())
894912

895913

896-
# TODO: test manual intervention
897-
# -> manual intervention an anction that raises an error and has the flag for manual intervention
898-
# -> manual intervention & fail on REVERT (what should happen?)-> this should be an unexpected thing
914+
@pytest.mark.parametrize("app_count", [10])
915+
@pytest.mark.parametrize(
916+
"operation, expected_order, expected_keys",
917+
[
918+
pytest.param(
919+
[
920+
SingleStepGroup(_S1),
921+
ParallelStepGroup(_S2, _S3, _S4),
922+
SingleStepGroup(_WMI1),
923+
],
924+
[
925+
_CreateSequence(_S1),
926+
_CreateRandom(_S2, _S3, _S4),
927+
_CreateSequence(_WMI1),
928+
],
929+
{
930+
"SCH:{schedule_id}",
931+
"SCH:{schedule_id}:GROUPS:test_op:0S:C",
932+
"SCH:{schedule_id}:GROUPS:test_op:1P:C",
933+
"SCH:{schedule_id}:GROUPS:test_op:2S:C",
934+
"SCH:{schedule_id}:STEPS:test_op:0S:C:_S1",
935+
"SCH:{schedule_id}:STEPS:test_op:1P:C:_S2",
936+
"SCH:{schedule_id}:STEPS:test_op:1P:C:_S3",
937+
"SCH:{schedule_id}:STEPS:test_op:1P:C:_S4",
938+
"SCH:{schedule_id}:STEPS:test_op:2S:C:_WMI1",
939+
},
940+
id="s1-p3-s1(1mi)",
941+
),
942+
pytest.param(
943+
[
944+
SingleStepGroup(_S1),
945+
ParallelStepGroup(_S2, _S3, _S4),
946+
ParallelStepGroup(_WMI1, _WMI2, _S5, _S6, _S7),
947+
SingleStepGroup(_S8), # will be ignored
948+
ParallelStepGroup(_S9, _S10), # will be ignored
949+
],
950+
[
951+
_CreateSequence(_S1),
952+
_CreateRandom(_S2, _S3, _S4),
953+
_CreateRandom(_WMI1, _WMI2, _S5, _S6, _S7),
954+
],
955+
{
956+
"SCH:{schedule_id}",
957+
"SCH:{schedule_id}:GROUPS:test_op:0S:C",
958+
"SCH:{schedule_id}:GROUPS:test_op:1P:C",
959+
"SCH:{schedule_id}:GROUPS:test_op:2P:C",
960+
"SCH:{schedule_id}:STEPS:test_op:0S:C:_S1",
961+
"SCH:{schedule_id}:STEPS:test_op:1P:C:_S2",
962+
"SCH:{schedule_id}:STEPS:test_op:1P:C:_S3",
963+
"SCH:{schedule_id}:STEPS:test_op:1P:C:_S4",
964+
"SCH:{schedule_id}:STEPS:test_op:2P:C:_S5",
965+
"SCH:{schedule_id}:STEPS:test_op:2P:C:_S6",
966+
"SCH:{schedule_id}:STEPS:test_op:2P:C:_S7",
967+
"SCH:{schedule_id}:STEPS:test_op:2P:C:_WMI1",
968+
"SCH:{schedule_id}:STEPS:test_op:2P:C:_WMI2",
969+
},
970+
id="s1-p3-p5(2mi)",
971+
),
972+
],
973+
)
974+
async def test_wait_for_manual_intervention(
975+
preserve_caplog_for_async_logging: None,
976+
steps_call_order: list[tuple[str, str]],
977+
selected_app: FastAPI,
978+
register_operation: Callable[[OperationName, Operation], None],
979+
operation: Operation,
980+
operation_name: OperationName,
981+
expected_order: list[_BaseExpectedStepOrder],
982+
expected_keys: set[str],
983+
):
984+
register_operation(operation_name, operation)
985+
986+
core = get_core(selected_app)
987+
schedule_id = await core.create(operation_name, {})
988+
assert isinstance(schedule_id, ScheduleId)
989+
990+
formatted_expected_keys = {k.format(schedule_id=schedule_id) for k in expected_keys}
991+
992+
await _ensure_expected_order(steps_call_order, expected_order)
993+
994+
await _ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)
995+
996+
# even if cancelled, state of waiting for manual intervention remains the same
997+
with pytest.raises(CannotCancelWhileWaitingForManualInterventionError):
998+
await core.cancel_schedule(schedule_id)
999+
# give some time for a "possible cancellation" to be processed
1000+
await asyncio.sleep(0.1)
1001+
await _ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)
8991002

9001003

9011004
# TODO: test something with initial_operation_context that requires data from a previous step,

0 commit comments

Comments
 (0)