3030 ScheduleId ,
3131 SingleStepGroup ,
3232 cancel_operation ,
33+ restart_create_operation_step_in_manual_intervention ,
3334 start_operation ,
3435)
3536from simcore_service_dynamic_scheduler .services .generic_scheduler ._errors import (
7879
7980
8081_PARALLEL_APP_CREATION : Final [NonNegativeInt ] = 5
82+ _PARALLEL_RESTARTS : Final [NonNegativeInt ] = 5
8183
8284
8385@pytest .fixture
@@ -206,7 +208,27 @@ async def create(
206208 await asyncio .sleep (1e10 )
207209
208210
209- class _WaitManualInerventionBS (_RevertBS ):
211+ class _GlobalManualInterventionTracker :
212+ raise_on_create : bool = True
213+
214+
215+ @pytest .fixture
216+ def reset_raise_on_create () -> Iterable [None ]:
217+ _GlobalManualInterventionTracker .raise_on_create = True
218+ yield
219+ _GlobalManualInterventionTracker .raise_on_create = True
220+
221+
222+ class _WaitManualInerventionBS (_BS ):
223+ @classmethod
224+ async def create (
225+ cls , app : FastAPI , required_context : RequiredOperationContext
226+ ) -> ProvidedOperationContext | None :
227+ await super ().create (app , required_context )
228+ if _GlobalManualInterventionTracker .raise_on_create :
229+ msg = "always fails only on CREATE"
230+ raise RuntimeError (msg )
231+
210232 @classmethod
211233 def wait_for_manual_intervention (cls ) -> bool :
212234 return True
@@ -423,6 +445,20 @@ class _WMI1(_WaitManualInerventionBS): ...
423445class _WMI2 (_WaitManualInerventionBS ): ...
424446
425447
448+ class _WMI3 (_WaitManualInerventionBS ): ...
449+
450+
451+ def _get_wait_manaul_intervention_steps (
452+ operation : Operation ,
453+ ) -> list [type [_WaitManualInerventionBS ]]:
454+ result : list [type [_WaitManualInerventionBS ]] = []
455+ for group in operation :
456+ for step in group .get_step_subgroup_to_run ():
457+ if issubclass (step , _WaitManualInerventionBS ):
458+ result .append (step )
459+ return result
460+
461+
426462# Below steps which require and provide context keys
427463
428464
@@ -919,13 +955,16 @@ async def test_repeating_step(
919955
920956@pytest .mark .parametrize ("app_count" , [10 ])
921957@pytest .mark .parametrize (
922- "operation, expected_order, expected_keys" ,
958+ "operation, expected_order, expected_keys, after_restart_expected_order " ,
923959 [
924960 pytest .param (
925961 [
926962 SingleStepGroup (_S1 ),
927963 ParallelStepGroup (_S2 , _S3 , _S4 ),
928964 SingleStepGroup (_WMI1 ),
965+ # below are not included when waiting for manual intervention
966+ ParallelStepGroup (_S5 , _S6 ),
967+ SingleStepGroup (_S7 ),
929968 ],
930969 [
931970 CreateSequence (_S1 ),
@@ -943,20 +982,29 @@ async def test_repeating_step(
943982 "SCH:{schedule_id}:STEPS:test_op:1P:C:_S4" ,
944983 "SCH:{schedule_id}:STEPS:test_op:2S:C:_WMI1" ,
945984 },
985+ [
986+ CreateSequence (_S1 ),
987+ CreateRandom (_S2 , _S3 , _S4 ),
988+ CreateSequence (_WMI1 ),
989+ CreateSequence (_WMI1 ), # retried step
990+ CreateRandom (_S5 , _S6 ), # it is completed now
991+ CreateSequence (_S7 ), # it is completed now
992+ ],
946993 id = "s1-p3-s1(1mi)" ,
947994 ),
948995 pytest .param (
949996 [
950997 SingleStepGroup (_S1 ),
951998 ParallelStepGroup (_S2 , _S3 , _S4 ),
952- ParallelStepGroup (_WMI1 , _WMI2 , _S5 , _S6 , _S7 ),
953- SingleStepGroup (_S8 ), # will be ignored
954- ParallelStepGroup (_S9 , _S10 ), # will be ignored
999+ ParallelStepGroup (_WMI1 , _WMI2 , _WMI3 , _S5 , _S6 , _S7 ),
1000+ # below are not included when waiting for manual intervention
1001+ SingleStepGroup (_S8 ),
1002+ ParallelStepGroup (_S9 , _S10 ),
9551003 ],
9561004 [
9571005 CreateSequence (_S1 ),
9581006 CreateRandom (_S2 , _S3 , _S4 ),
959- CreateRandom (_WMI1 , _WMI2 , _S5 , _S6 , _S7 ),
1007+ CreateRandom (_WMI1 , _WMI2 , _WMI3 , _S5 , _S6 , _S7 ),
9601008 ],
9611009 {
9621010 "SCH:{schedule_id}" ,
@@ -972,12 +1020,22 @@ async def test_repeating_step(
9721020 "SCH:{schedule_id}:STEPS:test_op:2P:C:_S7" ,
9731021 "SCH:{schedule_id}:STEPS:test_op:2P:C:_WMI1" ,
9741022 "SCH:{schedule_id}:STEPS:test_op:2P:C:_WMI2" ,
1023+ "SCH:{schedule_id}:STEPS:test_op:2P:C:_WMI3" ,
9751024 },
976- id = "s1-p3-p5(2mi)" ,
1025+ [
1026+ CreateSequence (_S1 ),
1027+ CreateRandom (_S2 , _S3 , _S4 ),
1028+ CreateRandom (_WMI1 , _WMI2 , _WMI3 , _S5 , _S6 , _S7 ),
1029+ CreateRandom (_WMI1 , _WMI2 , _WMI3 ), # retried steps
1030+ CreateSequence (_S8 ), # it is completed now
1031+ CreateRandom (_S9 , _S10 ), # it is completed now
1032+ ],
1033+ id = "s1-p3-p6(3mi)" ,
9771034 ),
9781035 ],
9791036)
9801037async def test_wait_for_manual_intervention (
1038+ reset_raise_on_create : None ,
9811039 preserve_caplog_for_async_logging : None ,
9821040 steps_call_order : list [tuple [str , str ]],
9831041 selected_app : FastAPI ,
@@ -986,6 +1044,7 @@ async def test_wait_for_manual_intervention(
9861044 operation_name : OperationName ,
9871045 expected_order : list [BaseExpectedStepOrder ],
9881046 expected_keys : set [str ],
1047+ after_restart_expected_order : list [BaseExpectedStepOrder ],
9891048):
9901049 register_operation (operation_name , operation )
9911050
@@ -1005,6 +1064,25 @@ async def test_wait_for_manual_intervention(
10051064 await asyncio .sleep (0.1 )
10061065 await _ensure_keys_in_store (selected_app , expected_keys = formatted_expected_keys )
10071066
1067+ # unblock all waiting for manual intervention steps and restart them
1068+ steps_to_restart = _get_wait_manaul_intervention_steps (operation )
1069+ _GlobalManualInterventionTracker .raise_on_create = False
1070+ await limited_gather (
1071+ * (
1072+ restart_create_operation_step_in_manual_intervention (
1073+ selected_app , schedule_id , step .get_step_name ()
1074+ )
1075+ for step in steps_to_restart
1076+ ),
1077+ limit = _PARALLEL_RESTARTS ,
1078+ )
1079+ # should finish normally
1080+ await ensure_expected_order (steps_call_order , after_restart_expected_order )
1081+ await _ensure_keys_in_store (selected_app , expected_keys = set ())
1082+
1083+
1084+ # TODO: restart_revert_operation_step_in_error
1085+
10081086
10091087@pytest .mark .parametrize ("app_count" , [10 ])
10101088@pytest .mark .parametrize (
0 commit comments