Skip to content

Commit 51fd6c5

Browse files
Merge branch 'master' into introduce-chatbot-client
2 parents 6f6e332 + 7911738 commit 51fd6c5

File tree

6 files changed

+95
-36
lines changed

6 files changed

+95
-36
lines changed

packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -585,20 +585,19 @@ async def _fs_handle_manually_cancelled( # pylint:disable=method-hidden
585585
self, task_uid: TaskUID
586586
) -> None:
587587
_log_state(TaskState.MANUALLY_CANCELLED, task_uid)
588-
_logger.info("Attempting to cancel task_uid '%s'", task_uid)
588+
_logger.info("Recevied a cancel request for task_uid '%s'", task_uid)
589589

590590
task_schedule = await self.__get_task_schedule(
591591
task_uid, expected_state=TaskState.MANUALLY_CANCELLED
592592
)
593593

594-
if task_schedule.state == TaskState.WORKER:
595-
run_was_cancelled = self._worker_tracker.cancel_run(task_uid)
596-
if not run_was_cancelled:
597-
_logger.debug(
598-
"Currently not handling task related to '%s'. Did not cancel it.",
599-
task_uid,
600-
)
601-
return
594+
run_was_cancelled = self._worker_tracker.cancel_run(task_uid)
595+
if not run_was_cancelled:
596+
_logger.debug(
597+
"Currently not handling task related to '%s'. Did not cancel it.",
598+
task_uid,
599+
)
600+
return
602601

603602
_logger.info("Found and cancelled run for '%s'", task_uid)
604603
await self.__remove_task(task_uid, task_schedule)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,14 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
180180
if operation.is_cancellable is False:
181181
raise OperationNotCancellableError(operation_name=operation_name)
182182

183-
group = operation.step_groups[group_index]
183+
step_group = operation.step_groups[group_index]
184184

185185
group_step_proxies = get_group_step_proxies(
186186
self._store,
187187
schedule_id=schedule_id,
188188
operation_name=operation_name,
189189
group_index=group_index,
190-
step_group=group,
190+
step_group=step_group,
191191
is_executing=is_executing,
192192
)
193193

@@ -204,6 +204,8 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
204204
schedule_id=schedule_id
205205
)
206206

207+
expected_steps_count = len(step_group)
208+
207209
async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
208210
with log_context( # noqa: SIM117
209211
_logger,
@@ -212,8 +214,25 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
212214
):
213215
with suppress(NoDataFoundError):
214216
deferred_task_uid = await step_proxy.read("deferred_task_uid")
217+
# the deferred task might not be running when this is called
218+
# e.g. cancelling a repeating operation
215219
await DeferredRunner.cancel(deferred_task_uid)
216-
await step_proxy.create_or_update("status", StepStatus.CANCELLED)
220+
221+
await step_proxy.create_or_update("status", StepStatus.CANCELLED)
222+
223+
step_group_name = step_group.get_step_group_name(index=group_index)
224+
group_proxy = StepGroupProxy(
225+
store=self._store,
226+
schedule_id=schedule_id,
227+
operation_name=operation_name,
228+
step_group_name=step_group_name,
229+
is_executing=is_executing,
230+
)
231+
if (
232+
await group_proxy.increment_and_get_done_steps_count()
233+
== expected_steps_count
234+
):
235+
await enqueue_schedule_event(self.app, schedule_id)
217236

218237
await limited_gather(
219238
*(

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,3 @@ async def on_finished_with_error(
222222
)
223223

224224
await _enqueue_schedule_event_if_group_is_done(context)
225-
226-
@classmethod
227-
async def on_cancelled(cls, context: DeferredContext) -> None:
228-
await get_step_store_proxy(context).create_or_update(
229-
"status", StepStatus.CANCELLED
230-
)
231-
232-
await _enqueue_schedule_event_if_group_is_done(context)

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__core.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,43 @@ async def _esnure_steps_have_status(
399399
raise AssertionError(msg) from None
400400

401401

402+
async def _ensure_one_step_in_manual_intervention(
403+
app: FastAPI,
404+
schedule_id: ScheduleId,
405+
operation_name: OperationName,
406+
*,
407+
step_group_name: StepGroupName,
408+
steps: Iterable[type[BaseStep]],
409+
) -> None:
410+
store_proxies = [
411+
StepStoreProxy(
412+
store=Store.get_from_app_state(app),
413+
schedule_id=schedule_id,
414+
operation_name=operation_name,
415+
step_group_name=step_group_name,
416+
step_name=step.get_step_name(),
417+
is_executing=True,
418+
)
419+
for step in steps
420+
]
421+
422+
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
423+
with attempt:
424+
reuires_intervention = False
425+
for proxy in store_proxies:
426+
try:
427+
requires_manual_intervention = await proxy.read(
428+
"requires_manual_intervention"
429+
)
430+
if requires_manual_intervention:
431+
reuires_intervention = True
432+
break
433+
except NoDataFoundError:
434+
pass
435+
436+
assert reuires_intervention is True
437+
438+
402439
############## TESTS ##############
403440

404441

@@ -853,7 +890,7 @@ async def test_fails_during_revert_is_in_error_state(
853890
RevertRandom(_S2, _S3, _S4),
854891
RevertSequence(_S1),
855892
],
856-
id="s1p3s1(1s)",
893+
id="s1p3s1(1sf)",
857894
),
858895
pytest.param(
859896
Operation(
@@ -870,7 +907,7 @@ async def test_fails_during_revert_is_in_error_state(
870907
RevertRandom(_S2, _S3, _S4, _SF2, _SF1),
871908
RevertSequence(_S1),
872909
],
873-
id="s1p4(1s)",
910+
id="s1p5(2sf)",
874911
),
875912
],
876913
)
@@ -1105,21 +1142,28 @@ async def test_wait_for_manual_intervention(
11051142

11061143
await ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)
11071144

1145+
group_index = len(expected_order) - 1
1146+
step_group_name = operation.step_groups[group_index].get_step_group_name(
1147+
index=group_index
1148+
)
11081149
await _esnure_steps_have_status(
11091150
selected_app,
11101151
schedule_id,
11111152
operation_name,
1112-
step_group_name=operation.step_groups[
1113-
len(expected_order) - 1
1114-
].get_step_group_name(index=len(expected_order) - 1),
1153+
step_group_name=step_group_name,
11151154
steps=expected_order[-1].steps,
11161155
)
11171156

11181157
# even if cancelled, state of waiting for manual intervention remains the same
1119-
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
1120-
with attempt: # noqa: SIM117
1121-
with pytest.raises(CannotCancelWhileWaitingForManualInterventionError):
1122-
await cancel_operation(selected_app, schedule_id)
1158+
await _ensure_one_step_in_manual_intervention(
1159+
selected_app,
1160+
schedule_id,
1161+
operation_name,
1162+
step_group_name=step_group_name,
1163+
steps=expected_order[-1].steps,
1164+
)
1165+
with pytest.raises(CannotCancelWhileWaitingForManualInterventionError):
1166+
await cancel_operation(selected_app, schedule_id)
11231167

11241168
await ensure_keys_in_store(selected_app, expected_keys=formatted_expected_keys)
11251169

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__deferred_runner.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ async def execute(
184184
_ = app
185185
_ = required_context
186186
_StepResultStore.set_result(cls.__name__, "executed")
187-
await asyncio.sleep(10000)
187+
await asyncio.sleep(1e6)
188188
return {}
189189

190190
@classmethod
@@ -194,7 +194,7 @@ async def revert(
194194
_ = app
195195
_ = required_context
196196
_StepResultStore.set_result(cls.__name__, "destroyed")
197-
await asyncio.sleep(10000)
197+
await asyncio.sleep(1e6)
198198
return {}
199199

200200

@@ -240,14 +240,14 @@ def _get_step_group(
240240
Operation(
241241
SingleStepGroup(_StepLongRunningToCancel),
242242
),
243-
StepStatus.CANCELLED,
243+
StepStatus.RUNNING,
244244
_Action.CANCEL,
245245
1,
246246
),
247247
],
248248
)
249249
@pytest.mark.parametrize("is_executing", [True, False])
250-
async def test_something(
250+
async def test_workflow(
251251
mock_enqueue_event: AsyncMock,
252252
registed_operation: None,
253253
app: FastAPI,
@@ -304,7 +304,7 @@ async def test_something(
304304
await asyncio.sleep(0.2) # give it some time to start
305305

306306
task_uid = await step_proxy.read("deferred_task_uid")
307-
await DeferredRunner.cancel(task_uid)
307+
await asyncio.create_task(DeferredRunner.cancel(task_uid))
308308

309309
await _assert_finshed_with_status(step_proxy, expected_step_status)
310310

@@ -317,4 +317,9 @@ async def test_something(
317317
assert "I failed" in error_traceback
318318

319319
# ensure called once with arguments
320-
assert mock_enqueue_event.call_args_list == [((app, schedule_id),)]
320+
321+
assert (
322+
mock_enqueue_event.call_args_list == []
323+
if action == _Action.CANCEL
324+
else [((app, schedule_id),)]
325+
)

services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_after.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ async def execute(
137137
{"key": "value", "dict": {"some": "thing"}, "list": [1, 2, 3]},
138138
],
139139
)
140-
async def test_something(
140+
async def test_workflow(
141141
after_event_manager: AfterEventManager,
142142
store: Store,
143143
schedule_id: ScheduleId,

0 commit comments

Comments
 (0)