Skip to content

Commit 3f79e14

Browse files
author
Andrei Neagu
committed
fixeed cancellation issues
1 parent b781c6a commit 3f79e14

File tree

2 files changed

+22
-11
lines changed

2 files changed

+22
-11
lines changed

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,14 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
180180
if operation.is_cancellable is False:
181181
raise OperationNotCancellableError(operation_name=operation_name)
182182

183-
group = operation.step_groups[group_index]
183+
step_group = operation.step_groups[group_index]
184184

185185
group_step_proxies = get_group_step_proxies(
186186
self._store,
187187
schedule_id=schedule_id,
188188
operation_name=operation_name,
189189
group_index=group_index,
190-
step_group=group,
190+
step_group=step_group,
191191
is_executing=is_executing,
192192
)
193193

@@ -204,6 +204,8 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
204204
schedule_id=schedule_id
205205
)
206206

207+
expected_steps_count = len(step_group)
208+
207209
async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
208210
with log_context( # noqa: SIM117
209211
_logger,
@@ -212,8 +214,25 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
212214
):
213215
with suppress(NoDataFoundError):
214216
deferred_task_uid = await step_proxy.read("deferred_task_uid")
217+
# the deferred task might not be running when this is called
218+
# e.g. cancelling a repeating operation
215219
await DeferredRunner.cancel(deferred_task_uid)
216-
await step_proxy.create_or_update("status", StepStatus.CANCELLED)
220+
221+
await step_proxy.create_or_update("status", StepStatus.CANCELLED)
222+
223+
step_group_name = step_group.get_step_group_name(index=group_index)
224+
group_proxy = StepGroupProxy(
225+
store=self._store,
226+
schedule_id=schedule_id,
227+
operation_name=operation_name,
228+
step_group_name=step_group_name,
229+
is_executing=is_executing,
230+
)
231+
if (
232+
await group_proxy.increment_and_get_done_steps_count()
233+
== expected_steps_count
234+
):
235+
await enqueue_schedule_event(self.app, schedule_id)
217236

218237
await limited_gather(
219238
*(

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,3 @@ async def on_finished_with_error(
222222
)
223223

224224
await _enqueue_schedule_event_if_group_is_done(context)
225-
226-
@classmethod
227-
async def on_cancelled(cls, context: DeferredContext) -> None:
228-
await get_step_store_proxy(context).create_or_update(
229-
"status", StepStatus.CANCELLED
230-
)
231-
232-
await _enqueue_schedule_event_if_group_is_done(context)

0 commit comments

Comments
 (0)