Skip to content

Commit 243667a

Browse files
author
Andrei Neagu
committed
rename interface
1 parent 4e05dbf commit 243667a

File tree

2 files changed

+97
-3
lines changed
  • services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler

2 files changed

+97
-3
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
from ._core import cancel_operation, start_operation
1+
from ._core import (
2+
cancel_operation,
3+
restart_create_operation_step_in_manual_intervention,
4+
restart_revert_operation_step_in_error,
5+
start_operation,
6+
)
27
from ._deferred_runner import (
38
get_opration_context_proxy,
49
get_step_group_proxy,
@@ -24,6 +29,8 @@
2429
"BaseStep",
2530
"cancel_operation",
2631
"get_generic_scheduler_lifespans",
32+
"restart_create_operation_step_in_manual_intervention",
33+
"restart_revert_operation_step_in_error",
2734
"get_opration_context_proxy",
2835
"get_step_group_proxy",
2936
"get_step_store_proxy",

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def __init__(
192192
self.unknown_status_wait_before_retry = unknown_status_wait_before_retry
193193
self._store: Store = get_store(app)
194194

195-
async def create(
195+
async def start_operation(
196196
self, operation_name: OperationName, initial_operation_context: OperationContext
197197
) -> ScheduleId:
198198
"""entrypoint for sceduling returns a unique schedule_id"""
@@ -325,6 +325,67 @@ async def cancel_schedule(self, schedule_id: ScheduleId) -> None:
325325
await DeferredRunner.cancel(deferred_task_uid)
326326
await step_proxy.set("status", StepStatus.CANCELLED)
327327

328+
async def restart_operation_step_in_error(
329+
self,
330+
schedule_id: ScheduleId,
331+
step_name: StepName,
332+
*,
333+
in_manual_intervention: bool,
334+
) -> None:
335+
# only if a step is waiting for manual intervention this will restart it
336+
# hwo to check if it's waitin for manual intervention?
337+
schedule_data_proxy = ScheduleDataStoreProxy(
338+
store=self._store, schedule_id=schedule_id
339+
)
340+
is_creating = await schedule_data_proxy.get("is_creating")
341+
operation_name = await schedule_data_proxy.get("operation_name")
342+
group_index = await schedule_data_proxy.get("group_index")
343+
344+
operation = OperationRegistry.get_operation(operation_name)
345+
step_group = operation[group_index]
346+
step_group_name = step_group.get_step_group_name(index=group_index)
347+
348+
if step_name not in {
349+
step.get_step_name() for step in step_group.get_step_subgroup_to_run()
350+
}:
351+
msg = f"step_name='{step_name}' not in current step_group_name='{step_group_name}' of operation_name='{operation_name}'"
352+
raise ValueError(msg)
353+
354+
step_proxy = StepStoreProxy(
355+
store=self._store,
356+
schedule_id=schedule_id,
357+
operation_name=operation_name,
358+
step_group_name=step_group_name,
359+
step_name=step_name,
360+
is_creating=is_creating,
361+
)
362+
363+
try:
364+
await step_proxy.get("error_traceback")
365+
except KeyNotFoundInHashError as exc:
366+
msg = f"Step '{step_name}' is not in error state and cannot be restarted"
367+
raise ValueError(msg) from exc
368+
369+
if in_manual_intervention:
370+
requires_manual_intervention: bool = False
371+
with suppress(KeyNotFoundInHashError):
372+
requires_manual_intervention = await step_proxy.get(
373+
"requires_manual_intervention"
374+
)
375+
376+
if requires_manual_intervention is False:
377+
msg = f"Step '{step_name}' is not waiting for manual intervention"
378+
raise ValueError(msg)
379+
380+
await step_proxy.delete("error_traceback", "requires_manual_intervention")
381+
else:
382+
await step_proxy.delete("error_traceback")
383+
384+
await _start_and_mark_as_started(
385+
step_proxy, is_creating=True, expected_steps_count=len(step_group)
386+
)
387+
await enqueue_schedule_event(self.app, schedule_id)
388+
328389
async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
329390
schedule_data_proxy = ScheduleDataStoreProxy(
330391
store=self._store, schedule_id=schedule_id
@@ -645,8 +706,34 @@ async def start_operation(
645706
operation_name: OperationName,
646707
initial_operation_context: OperationContext,
647708
) -> ScheduleId:
648-
return await get_core(app).create(operation_name, initial_operation_context)
709+
"""starts an operation by it's given name and initial context"""
710+
return await get_core(app).start_operation(
711+
operation_name, initial_operation_context
712+
)
649713

650714

651715
async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None:
716+
"""aborts an opration and triggers revert of all it's executed steps"""
652717
await get_core(app).cancel_schedule(schedule_id)
718+
719+
720+
async def restart_create_operation_step_in_manual_intervention(
721+
app: FastAPI, schedule_id: ScheduleId, step_name: StepName
722+
) -> None:
723+
"""
724+
restarts a step waiting for manual intervention
725+
NOTE: to be used only with steps with wait_for_manual_intervention=True
726+
and only to restart the `create`
727+
"""
728+
await get_core(app).restart_operation_step_in_error(
729+
schedule_id, step_name, in_manual_intervention=True
730+
)
731+
732+
733+
async def restart_revert_operation_step_in_error( # TODO: add test for this
734+
app: FastAPI, schedule_id: ScheduleId, step_name: StepName
735+
) -> None:
736+
"""restarts a step stuck in `revert` in an error state"""
737+
await get_core(app).restart_operation_step_in_error(
738+
schedule_id, step_name, in_manual_intervention=False
739+
)

0 commit comments

Comments
 (0)