Skip to content

Commit aa3667b

Browse files
committed
ensure cancellation of task is not only in 1 process
1 parent 99909bd commit aa3667b

File tree

2 files changed

+4
-8
lines changed

2 files changed

+4
-8
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ async def _triage_changed_tasks(
140140
class ScheduledPipelineParams:
141141
cluster_id: ClusterID
142142
run_metadata: RunMetadataDict
143-
mark_for_cancellation: datetime.datetime | None
144143
use_on_demand_clusters: bool
145144

146145
scheduler_task: asyncio.Task | None = None
@@ -198,7 +197,6 @@ async def run_new_pipeline(
198197
cluster_id=cluster_id,
199198
run_metadata=new_run.metadata,
200199
use_on_demand_clusters=use_on_demand_clusters,
201-
mark_for_cancellation=None,
202200
)
203201
await publish_project_log(
204202
self.rabbitmq_client,
@@ -236,9 +234,6 @@ async def stop_pipeline(
236234
)
237235
if updated_comp_run:
238236
assert updated_comp_run.cancelled is not None # nosec
239-
self.scheduled_pipelines[
240-
(user_id, project_id, selected_iteration)
241-
].mark_for_cancellation = updated_comp_run.cancelled
242237
# ensure the scheduler starts right away
243238
self.scheduled_pipelines[
244239
(user_id, project_id, selected_iteration)
@@ -714,7 +709,10 @@ async def _schedule_pipeline(
714709
project_id, dag
715710
)
716711
# 3. do we want to stop the pipeline now?
717-
if pipeline_params.mark_for_cancellation:
712+
comp_run = await CompRunsRepository.instance(self.db_engine).get(
713+
user_id, project_id, iteration
714+
)
715+
if comp_run.cancelled:
718716
await self._schedule_tasks_to_stop(
719717
user_id, project_id, comp_tasks, pipeline_params
720718
)

services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,6 @@ async def test_misconfigured_pipeline_is_not_scheduled(
402402
assert u_id == user["id"]
403403
assert p_id == sleepers_project.uuid
404404
assert it > 0
405-
assert params.mark_for_cancellation is None
406405
# check the database was properly updated
407406
async with aiopg_engine.acquire() as conn:
408407
result = await conn.execute(
@@ -450,7 +449,6 @@ async def _assert_start_pipeline(
450449
assert u_id == published_project.project.prj_owner
451450
assert p_id == published_project.project.uuid
452451
assert it > 0
453-
assert params.mark_for_cancellation is None
454452
assert params.run_metadata == run_metadata
455453

456454
# check the database is correctly updated, the run is published

0 commit comments

Comments
 (0)