Skip to content

Commit 2a5e2dc

Browse files
author
Andrei Neagu
committed
added step retry
1 parent 5d1675f commit 2a5e2dc

File tree

1 file changed

+38
-6
lines changed
  • services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler

1 file changed

+38
-6
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
get_operation_provided_context_keys,
4141
)
4242
from ._store import (
43+
DeleteStepKeys,
4344
OperationContextProxy,
4445
OperationRemovalProxy,
4546
ScheduleDataStoreProxy,
@@ -125,7 +126,7 @@ async def _start_and_mark_as_started(
125126
{
126127
"deferred_created": True,
127128
"status": StepStatus.SCHEDULED,
128-
"success_processed": False,
129+
"success_processed": False, # TODO: remove this one, not used more than likely
129130
}
130131
)
131132

@@ -371,6 +372,11 @@ async def restart_operation_step_in_error(
371372
except KeyNotFoundInHashError as exc:
372373
raise StepNotInErrorStateError(step_name=step_name) from exc
373374

375+
step_keys_to_remove: list[DeleteStepKeys] = [
376+
"deferred_created",
377+
"error_traceback",
378+
"deferred_task_uid",
379+
]
374380
if in_manual_intervention:
375381
requires_manual_intervention: bool = False
376382
with suppress(KeyNotFoundInHashError):
@@ -381,14 +387,40 @@ async def restart_operation_step_in_error(
381387
if requires_manual_intervention is False:
382388
raise StepNotWaitingForManualInterventionError(step_name=step_name)
383389

384-
await step_proxy.delete("error_traceback", "requires_manual_intervention")
385-
else:
386-
await step_proxy.delete("error_traceback")
390+
step_keys_to_remove.append("requires_manual_intervention")
387391

392+
# reset previous Run and restart this step
393+
schedule_data_proxy = ScheduleDataStoreProxy(
394+
store=self._store, schedule_id=schedule_id
395+
)
396+
group_proxy = StepGroupProxy(
397+
store=self._store,
398+
schedule_id=schedule_id,
399+
operation_name=operation_name,
400+
step_group_name=step_group_name,
401+
is_creating=is_creating,
402+
)
403+
404+
# remove previus entries for the step
405+
await step_proxy.delete(*step_keys_to_remove)
406+
await schedule_data_proxy.delete(
407+
"operation_error_type", "operation_error_message"
408+
)
409+
await group_proxy.decrement_and_get_done_steps_count()
410+
411+
_logger.debug(
412+
"Restarting step_name='%s' of operation_name='%s' for schedule_id='%s' after '%s'",
413+
step_name,
414+
operation_name,
415+
schedule_id,
416+
"manual intervention" if in_manual_intervention else "error in revert",
417+
)
418+
# restart only this step
388419
await _start_and_mark_as_started(
389-
step_proxy, is_creating=True, expected_steps_count=len(step_group)
420+
step_proxy,
421+
is_creating=is_creating,
422+
expected_steps_count=len(step_group),
390423
)
391-
await enqueue_schedule_event(self.app, schedule_id)
392424

393425
async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
394426
schedule_data_proxy = ScheduleDataStoreProxy(

0 commit comments

Comments
 (0)