Skip to content

Commit 988903e

Browse files
author
Andrei Neagu
committed
refactor
1 parent 767caf0 commit 988903e

File tree

1 file changed

+52
-42
lines changed
  • services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler

1 file changed

+52
-42
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,16 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
349349
_logger.debug("Operation completed: steps_statuses=%s", steps_statuses)
350350

351351
# NOTE: at this point all steps are in a final status
352-
if is_creating:
352+
if step_group.repeat_steps is True:
353+
await self._continue_as_repeat_steps(
354+
schedule_data_proxy,
355+
schedule_id,
356+
operation_name,
357+
group_index,
358+
step_group,
359+
group_step_proxies,
360+
)
361+
elif is_creating:
353362
await self._continue_handling_as_creation(
354363
steps_statuses,
355364
schedule_data_proxy,
@@ -358,7 +367,6 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
358367
group_index,
359368
step_group,
360369
operation,
361-
group_step_proxies,
362370
)
363371
else:
364372
await self._continue_handling_as_reverting(
@@ -370,60 +378,62 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
370378
step_group,
371379
)
372380

373-
async def _continue_handling_as_creation(
381+
async def _continue_as_repeat_steps(
374382
self,
375-
steps_statuses: dict[StepName, StepStatus],
376383
schedule_data_proxy: ScheduleDataStoreProxy,
377384
schedule_id: ScheduleId,
378385
operation_name: OperationName,
379386
group_index: NonNegativeInt,
380387
current_step_group: BaseStepGroup,
381-
operation: Operation,
382388
group_step_proxies: dict[StepName, StepStoreProxy],
383389
) -> None:
384-
# does step require repeating?
385-
if current_step_group.repeat_steps is True:
386-
# TODO: all this could even be a separate function since the repeat looks like a thing on its own
387-
_logger.debug(
388-
"REPEATING step_group='%s' in operation_name='%s' for schedule_id='%s'",
389-
current_step_group.get_step_group_name(index=group_index),
390-
operation_name,
391-
schedule_id,
392-
)
393-
# wait before repeating
394-
await asyncio.sleep(current_step_group.wait_before_repeat.total_seconds())
390+
_logger.debug(
391+
"REPEATING step_group='%s' in operation_name='%s' for schedule_id='%s'",
392+
current_step_group.get_step_group_name(index=group_index),
393+
operation_name,
394+
schedule_id,
395+
)
396+
# wait before repeating
397+
await asyncio.sleep(current_step_group.wait_before_repeat.total_seconds())
395398

396-
step_proxies: Iterable[StepStoreProxy] = group_step_proxies.values()
399+
step_proxies: Iterable[StepStoreProxy] = group_step_proxies.values()
397400

398-
requeired_steps_statuses = await _get_steps_statuses(step_proxies)
399-
if any(
400-
status == StepStatus.CANCELLED
401-
for status in requeired_steps_statuses.values()
402-
):
403-
# was cancelled
404-
await schedule_data_proxy.set("is_creating", value=False)
405-
await enqueue_schedule_event(self.app, schedule_id)
406-
return
401+
requeired_steps_statuses = await _get_steps_statuses(step_proxies)
402+
if any(
403+
status == StepStatus.CANCELLED
404+
for status in requeired_steps_statuses.values()
405+
):
406+
# was cancelled
407+
await schedule_data_proxy.set("is_creating", value=False)
408+
await enqueue_schedule_event(self.app, schedule_id)
409+
return
407410

408-
await limited_gather(
409-
*(x.remove() for x in step_proxies),
410-
limit=_PARALLEL_STATUS_REQUESTS,
411-
)
411+
await limited_gather(
412+
*(x.remove() for x in step_proxies),
413+
limit=_PARALLEL_STATUS_REQUESTS,
414+
)
412415

413-
group_proxy = StepGroupProxy(
414-
store=self._store,
415-
schedule_id=schedule_id,
416-
operation_name=operation_name,
417-
step_group_name=current_step_group.get_step_group_name(
418-
index=group_index
419-
),
420-
is_creating=True,
421-
)
422-
await group_proxy.remove()
416+
group_proxy = StepGroupProxy(
417+
store=self._store,
418+
schedule_id=schedule_id,
419+
operation_name=operation_name,
420+
step_group_name=current_step_group.get_step_group_name(index=group_index),
421+
is_creating=True,
422+
)
423+
await group_proxy.remove()
423424

424-
await enqueue_schedule_event(self.app, schedule_id)
425-
return
425+
await enqueue_schedule_event(self.app, schedule_id)
426426

427+
async def _continue_handling_as_creation(
428+
self,
429+
steps_statuses: dict[StepName, StepStatus],
430+
schedule_data_proxy: ScheduleDataStoreProxy,
431+
schedule_id: ScheduleId,
432+
operation_name: OperationName,
433+
group_index: NonNegativeInt,
434+
current_step_group: BaseStepGroup,
435+
operation: Operation,
436+
) -> None:
427437
# if all in SUUCESS -> move to next
428438
if all(status == StepStatus.SUCCESS for status in steps_statuses.values()):
429439
try:

0 commit comments

Comments
 (0)