From 3f11102444d40c9abccb81a1d4db9f033e0dd77a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:41:42 +0100 Subject: [PATCH 01/10] added cancellation mark --- .../8bfe65a5e294_add_cancellation_mark.py | 29 +++++++++++++++++++ .../models/comp_runs.py | 6 ++++ 2 files changed, 35 insertions(+) create mode 100644 packages/postgres-database/src/simcore_postgres_database/migration/versions/8bfe65a5e294_add_cancellation_mark.py diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/8bfe65a5e294_add_cancellation_mark.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/8bfe65a5e294_add_cancellation_mark.py new file mode 100644 index 000000000000..ecbe20b40e81 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/8bfe65a5e294_add_cancellation_mark.py @@ -0,0 +1,29 @@ +"""add cancellation mark + +Revision ID: 8bfe65a5e294 +Revises: 5ad02358751a +Create Date: 2024-11-08 14:40:59.266181+00:00 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "8bfe65a5e294" +down_revision = "5ad02358751a" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "comp_runs", sa.Column("cancelled", sa.DateTime(timezone=True), nullable=True) + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("comp_runs", "cancelled") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py index e402a1715624..eb84cefaa764 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/comp_runs.py @@ -99,6 +99,12 @@ nullable=True, doc="When the run was finished", ), + sa.Column( + "cancelled", + sa.DateTime(timezone=True), + nullable=True, + doc="If filled, when cancellation was requested", + ), sa.Column("metadata", JSONB, nullable=True, doc="the run optional metadata"), sa.Column( "use_on_demand_clusters", From df6811f8dea41adcf5274c4208f46b5e3029f647 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:44:06 +0100 Subject: [PATCH 02/10] added cancellation mark to model --- .../src/simcore_service_director_v2/models/comp_runs.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py index 1d7800b97884..b67bb7e8cec5 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py @@ -46,6 +46,7 @@ class CompRunsAtDB(BaseModel): modified: datetime.datetime started: datetime.datetime | None ended: datetime.datetime | None + cancelled: datetime.datetime | None metadata: RunMetadataDict = RunMetadataDict() use_on_demand_clusters: bool @@ -96,6 +97,7 @@ class Config: "result": "NOT_STARTED", "created": "2021-03-01 13:07:34.19161", "modified": "2021-03-01 13:07:34.19161", + "cancelled": None, "use_on_demand_clusters": False, }, { @@ -109,6 +111,7 @@ class Config: "modified": "2021-03-01 13:07:34.19161", "started": "2021-03-01 8:07:34.19161", "ended": "2021-03-01 13:07:34.10", + "cancelled": None, "metadata": { "node_id_names_map": {}, "product_name": "osparc", From 0f62a9d4092667f8279b2706555212c5ea073a62 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:56:09 +0100 Subject: [PATCH 03/10] comp_runs model fully covered --- .../models/comp_runs.py | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py index b67bb7e8cec5..2af0646c3d33 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/models/comp_runs.py @@ -73,7 +73,7 @@ def convert_null_to_default_cluster_id(cls, v): @classmethod def ensure_utc(cls, v: datetime.datetime | None) -> datetime.datetime | None: if v is not None and v.tzinfo is None: - v = v.replace(tzinfo=datetime.timezone.utc) + v = v.replace(tzinfo=datetime.UTC) return v @validator("metadata", pre=True) @@ -94,6 +94,18 @@ class Config: "user_id": 132, "cluster_id": 0, "iteration": 42, + "result": "UNKNOWN", + "created": "2021-03-01 13:07:34.19161", + "modified": "2021-03-01 13:07:34.19161", + "cancelled": None, + "use_on_demand_clusters": False, + }, + { + "run_id": 432, + "project_uuid": "65fee9d2-e030-452c-a29c-45d288577ca5", + "user_id": 132, + "cluster_id": None, # this default to DEFAULT_CLUSTER_ID + "iteration": 42, "result": "NOT_STARTED", "created": "2021-03-01 13:07:34.19161", "modified": "2021-03-01 13:07:34.19161", @@ -121,5 +133,20 @@ class Config: }, "use_on_demand_clusters": False, }, + { + "run_id": 43243, + "project_uuid": "65fee9d2-e030-452c-a29c-45d288577ca5", + "user_id": 132, + "cluster_id": 123, + "iteration": 12, + "result": "SUCCESS", + "created": "2021-03-01 13:07:34.19161", + "modified": "2021-03-01 13:07:34.19161", + "started": "2021-03-01 8:07:34.19161", + "ended": "2021-03-01 13:07:34.10", + "cancelled": None, + "metadata": None, + "use_on_demand_clusters": False, + }, ] } From 61a9cc6979114e6730d3060389ee7c55a5065074 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:15:59 +0100 Subject: [PATCH 04/10] removed deprecation --- services/director-v2/tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index 4e4152544862..63abe3d09842 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -218,7 +218,7 @@ async def initialized_app(mock_env: EnvVarsDict) -> AsyncIterable[FastAPI]: @pytest.fixture() async def async_client(initialized_app: FastAPI) -> AsyncIterable[httpx.AsyncClient]: async with httpx.AsyncClient( - app=initialized_app, + transport=httpx.ASGITransport(app=initialized_app), base_url="http://director-v2.testserver.io", headers={"Content-Type": "application/json"}, ) as client: From b24b89226a374e10ba29929da9bbc842aee1f9e4 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:16:25 +0100 Subject: [PATCH 05/10] ruff --- .../with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index fbc90204f836..d33867fc96f3 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -103,9 +103,9 @@ def _assert_dask_client_correctly_initialized( ) mocked_dask_client.register_handlers.assert_called_once_with( TaskHandlers( - cast( + cast( # noqa: SLF001 DaskScheduler, scheduler - )._task_progress_change_handler, # noqa: SLF001 + )._task_progress_change_handler, cast(DaskScheduler, scheduler)._task_log_change_handler, # noqa: SLF001 ) ) From 7d5e19d26f9a9510a94435f760ee7e4f2b6d9aa5 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:17:40 +0100 Subject: [PATCH 06/10] ruff --- .../test_modules_comp_scheduler_dask_scheduler.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index d33867fc96f3..c56735182e8e 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -280,9 +280,10 @@ def test_scheduler_raises_exception_for_missing_dependencies( settings = AppSettings.create_from_envs() app = init_app(settings) - with pytest.raises(ConfigurationError): - with TestClient(app, raise_server_exceptions=True) as _: - pass + with pytest.raises(ConfigurationError), TestClient( + app, raise_server_exceptions=True + ) as _: + pass async def test_empty_pipeline_is_not_scheduled( @@ -1029,11 +1030,9 @@ async def test_task_progress_triggers( parent_project_id=None, ), ) - await cast( + await cast( # noqa: SLF001 DaskScheduler, scheduler - )._task_progress_change_handler( # noqa: SLF001 - progress_event.json() - ) + )._task_progress_change_handler(progress_event.json()) # NOTE: not sure whether it should switch to STARTED.. it would make sense await _assert_comp_tasks_db( aiopg_engine, From 9e9ea6db489d6d6ba7e3360447418afac95a0807 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:41:06 +0100 Subject: [PATCH 07/10] cancellation is now a datetime --- .../modules/comp_scheduler/_base_scheduler.py | 26 ++++++++++++------- .../comp_scheduler/_scheduler_factory.py | 2 +- .../modules/db/repositories/comp_runs.py | 13 +++++++++- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py index 08396686e431..cae539596d41 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py @@ -47,7 +47,7 @@ ) from ...core.settings import ComputationalBackendSettings from ...models.comp_pipelines import CompPipelineAtDB -from ...models.comp_runs import CompRunsAtDB, RunMetadataDict +from ...models.comp_runs import RunMetadataDict from ...models.comp_tasks import CompTaskAtDB from ...utils.comp_scheduler import ( COMPLETED_STATES, @@ -131,7 +131,7 @@ async def _triage_changed_tasks( class ScheduledPipelineParams: cluster_id: ClusterID run_metadata: RunMetadataDict - mark_for_cancellation: bool = False + mark_for_cancellation: datetime.datetime | None use_on_demand_clusters: bool @@ -169,7 +169,7 @@ async def run_new_pipeline( return runs_repo = CompRunsRepository.instance(self.db_engine) - new_run: CompRunsAtDB = await runs_repo.create( + new_run = await runs_repo.create( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -182,6 +182,7 @@ async def run_new_pipeline( cluster_id=cluster_id, run_metadata=new_run.metadata, use_on_demand_clusters=use_on_demand_clusters, + mark_for_cancellation=None, ) await publish_project_log( self.rabbitmq_client, @@ -212,11 +213,18 @@ async def stop_pipeline( selected_iteration = iteration # mark the scheduled pipeline for stopping - self.scheduled_pipelines[ - (user_id, project_id, selected_iteration) - ].mark_for_cancellation = True - # ensure the scheduler starts right away - self._wake_up_scheduler_now() + updated_comp_run = await CompRunsRepository.instance( + self.db_engine + ).mark_for_cancellation( + user_id=user_id, project_id=project_id, iteration=selected_iteration + ) + if updated_comp_run: + assert updated_comp_run.cancelled is not None # nosec + self.scheduled_pipelines[ + (user_id, project_id, selected_iteration) + ].mark_for_cancellation = updated_comp_run.cancelled + # ensure the scheduler starts right away + self._wake_up_scheduler_now() async def schedule_all_pipelines(self) -> None: self.wake_up_event.clear() @@ -343,7 +351,7 @@ def _need_heartbeat(task: CompTaskAtDB) -> bool: if task.last_heartbeat is None: assert task.start # nosec return bool( - (utc_now - task.start.replace(tzinfo=datetime.timezone.utc)) + (utc_now - task.start.replace(tzinfo=datetime.UTC)) > self.service_runtime_heartbeat_interval ) return bool( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py index 458950e9798d..8dc6c347018b 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py @@ -47,7 +47,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler: r.cluster_id if r.cluster_id is not None else DEFAULT_CLUSTER_ID ), run_metadata=r.metadata, - mark_for_cancellation=False, + mark_for_cancellation=bool(r.cancelled is not None), use_on_demand_clusters=r.use_on_demand_clusters, ) for r in runs diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py index 4f9a8e42b53c..955b9dd5858e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py @@ -3,6 +3,7 @@ from collections import deque from typing import Any +import arrow import sqlalchemy as sa from aiopg.sa.result import RowProxy from models_library.clusters import DEFAULT_CLUSTER_ID, ClusterID @@ -146,10 +147,20 @@ async def set_run_result( ) -> CompRunsAtDB | None: values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]} if final_state: - values.update({"ended": datetime.datetime.now(tz=datetime.UTC)}) + values.update({"ended": arrow.utcnow().datetime}) return await self.update( user_id, project_id, iteration, **values, ) + + async def mark_for_cancellation( + self, *, user_id: UserID, project_id: ProjectID, iteration: PositiveInt + ) -> CompRunsAtDB | None: + return await self.update( + user_id, + project_id, + iteration, + cancelled=arrow.utcnow().datetime, + ) From cc7105464c8627e44257f2d0bc924da11794c69b Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:37:52 +0100 Subject: [PATCH 08/10] test cancellation is done properly --- .../tests/unit/with_dbs/conftest.py | 47 +++++++++- ...t_modules_comp_scheduler_dask_scheduler.py | 89 ++++++++++++++++++- 2 files changed, 133 insertions(+), 3 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 8dd5527f00a3..516730d4e14b 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -11,6 +11,7 @@ from typing import Any, cast from uuid import uuid4 +import arrow import pytest import sqlalchemy as sa from _helpers import PublishedProject, RunningProject @@ -318,6 +319,7 @@ async def running_project( ) -> RunningProject: user = registered_user() created_project = await project(user, workbench=fake_workbench_without_outputs) + now_time = arrow.utcnow().datetime return RunningProject( project=created_project, pipeline=pipeline( @@ -329,9 +331,50 @@ async def running_project( project=created_project, state=StateType.RUNNING, progress=0.0, - start=datetime.datetime.now(tz=datetime.UTC), + start=now_time, + ), + runs=runs( + user=user, + project=created_project, + started=now_time, + result=StateType.RUNNING, + ), + ) + + +@pytest.fixture +async def running_project_mark_for_cancellation( + registered_user: Callable[..., dict[str, Any]], + project: Callable[..., Awaitable[ProjectAtDB]], + pipeline: Callable[..., CompPipelineAtDB], + tasks: Callable[..., list[CompTaskAtDB]], + runs: Callable[..., CompRunsAtDB], + fake_workbench_without_outputs: dict[str, Any], + fake_workbench_adjacency: dict[str, Any], +) -> RunningProject: + user = registered_user() + created_project = await project(user, workbench=fake_workbench_without_outputs) + now_time = arrow.utcnow().datetime + return RunningProject( + project=created_project, + pipeline=pipeline( + project_id=f"{created_project.uuid}", + dag_adjacency_list=fake_workbench_adjacency, + ), + tasks=tasks( + user=user, + project=created_project, + state=StateType.RUNNING, + progress=0.0, + start=now_time, + ), + runs=runs( + user=user, + project=created_project, + result=StateType.RUNNING, + started=now_time, + cancelled=now_time + datetime.timedelta(seconds=5), ), - runs=runs(user=user, project=created_project, result=StateType.RUNNING), ) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index c56735182e8e..14746f914f3b 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -1206,7 +1206,7 @@ async def test_handling_scheduling_after_reboot( mocked_clean_task_output_fct: mock.MagicMock, reboot_state: RebootState, ): - """After the dask client is rebooted, or that the director-v2 reboots the scheduler + """After the dask client is rebooted, or that the director-v2 reboots the dv-2 internal scheduler shall continue scheduling correctly. Even though the task might have continued to run in the dask-scheduler.""" @@ -1278,6 +1278,93 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData: ) +async def test_handling_cancellation_of_jobs_after_reboot( + with_disabled_scheduler_task: None, + mocked_dask_client: mock.MagicMock, + aiopg_engine: aiopg.sa.engine.Engine, + running_project_mark_for_cancellation: RunningProject, + scheduler: BaseCompScheduler, + mocked_parse_output_data_fct: mock.MagicMock, + mocked_clean_task_output_fct: mock.MagicMock, +): + """A running pipeline was cancelled by a user and the DV-2 was restarted BEFORE + It could actually cancel the task. On reboot the DV-2 shall recover + and actually cancel the pipeline properly""" + + # check initial status + await _assert_comp_run_db( + aiopg_engine, running_project_mark_for_cancellation, RunningState.STARTED + ) + await _assert_comp_tasks_db( + aiopg_engine, + running_project_mark_for_cancellation.project.uuid, + [t.node_id for t in running_project_mark_for_cancellation.tasks], + expected_state=RunningState.STARTED, + expected_progress=0, + ) + + # the backend shall report the tasks as running + async def mocked_get_tasks_status(job_ids: list[str]) -> list[DaskClientTaskState]: + return [DaskClientTaskState.PENDING_OR_STARTED for j in job_ids] + + mocked_dask_client.get_tasks_status.side_effect = mocked_get_tasks_status + # Running the scheduler, should actually cancel the run now + await run_comp_scheduler(scheduler) + mocked_dask_client.abort_computation_task.assert_called() + assert mocked_dask_client.abort_computation_task.call_count == len( + [ + t.node_id + for t in running_project_mark_for_cancellation.tasks + if t.node_class == NodeClass.COMPUTATIONAL + ] + ) + # in the DB they are still running, they will be stopped in the next iteration + await _assert_comp_tasks_db( + aiopg_engine, + running_project_mark_for_cancellation.project.uuid, + [ + t.node_id + for t in running_project_mark_for_cancellation.tasks + if t.node_class == NodeClass.COMPUTATIONAL + ], + expected_state=RunningState.STARTED, + expected_progress=0, + ) + await _assert_comp_run_db( + aiopg_engine, running_project_mark_for_cancellation, RunningState.STARTED + ) + + # the backend shall now report the tasks as aborted + async def mocked_get_tasks_status_aborted( + job_ids: list[str], + ) -> list[DaskClientTaskState]: + return [DaskClientTaskState.ABORTED for j in job_ids] + + mocked_dask_client.get_tasks_status.side_effect = mocked_get_tasks_status_aborted + + async def _return_random_task_result(job_id) -> TaskOutputData: + raise TaskCancelledError + + mocked_dask_client.get_task_result.side_effect = _return_random_task_result + await run_comp_scheduler(scheduler) + # now should be stopped + await _assert_comp_tasks_db( + aiopg_engine, + running_project_mark_for_cancellation.project.uuid, + [ + t.node_id + for t in running_project_mark_for_cancellation.tasks + if t.node_class == NodeClass.COMPUTATIONAL + ], + expected_state=RunningState.ABORTED, + expected_progress=1, + ) + await _assert_comp_run_db( + aiopg_engine, running_project_mark_for_cancellation, RunningState.ABORTED + ) + mocked_clean_task_output_fct.assert_called() + + @pytest.fixture def with_fast_service_heartbeat_s(monkeypatch: pytest.MonkeyPatch) -> int: seconds = 1 From 17364693e3ac6890090073d30eec232e0b29243c Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:42:25 +0100 Subject: [PATCH 09/10] wrong syntax --- .../modules/comp_scheduler/_scheduler_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py index 8dc6c347018b..f8b648eaf489 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py @@ -47,7 +47,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler: r.cluster_id if r.cluster_id is not None else DEFAULT_CLUSTER_ID ), run_metadata=r.metadata, - mark_for_cancellation=bool(r.cancelled is not None), + mark_for_cancellation=r.cancelled, use_on_demand_clusters=r.use_on_demand_clusters, ) for r in runs From 1f981f90766c5499b9770b6a500da36585cae62a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 8 Nov 2024 18:10:22 +0100 Subject: [PATCH 10/10] fix assertion --- .../with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 14746f914f3b..f9e5ff33c4b3 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -368,7 +368,7 @@ async def test_misconfigured_pipeline_is_not_scheduled( assert u_id == user["id"] assert p_id == sleepers_project.uuid assert it > 0 - assert params.mark_for_cancellation is False + assert params.mark_for_cancellation is None # check the database was properly updated async with aiopg_engine.acquire() as conn: result = await conn.execute( @@ -419,7 +419,7 @@ async def _assert_start_pipeline( assert u_id == published_project.project.prj_owner assert p_id == published_project.project.uuid assert it > 0 - assert params.mark_for_cancellation is False + assert params.mark_for_cancellation is None assert params.run_metadata == run_metadata # check the database is correctly updated, the run is published