Skip to content

Commit 34af74b

Browse files
improve unit tests
1 parent 096a9d5 commit 34af74b

File tree

9 files changed

+57
-41
lines changed

9 files changed

+57
-41
lines changed

packages/postgres-database/src/simcore_postgres_database/utils_comp_run_snapshot_tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,6 @@ async def update_for_run_id_and_node_id(
5555
)
5656
row = await result.one_or_none()
5757
if row is None:
58-
raise ValueError("improve message")
59-
# return None
58+
msg = f"update for run_id={run_id} and node_id={node_id} did not return any row"
59+
raise ValueError(msg)
6060
return row

packages/postgres-database/src/simcore_postgres_database/utils_comp_runs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,6 @@ async def get_latest_run_id_for_project(
6868
result = await _conn.execute(base_select_query)
6969
row = result.one_or_none()
7070
if not row:
71-
raise ValueError("improve")
71+
msg = f"get_latest_run_id_for_project did not return any row for project_id={project_id}"
72+
raise ValueError(msg)
7273
return cast(PositiveInt, row.run_id)

services/director-v2/src/simcore_service_director_v2/api/routes/computations.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async def _try_start_pipeline(
199199
project: ProjectAtDB,
200200
users_repo: UsersRepository,
201201
projects_metadata_repo: ProjectsMetadataRepository,
202-
filtered_comp_tasks_in_db: list[CompTaskAtDB],
202+
tasks_to_run: list[CompTaskAtDB],
203203
) -> None:
204204
if not minimal_dag.nodes():
205205
# 2 options here: either we have cycles in the graph or it's really done
@@ -242,7 +242,7 @@ async def _try_start_pipeline(
242242
)
243243
or {},
244244
use_on_demand_clusters=computation.use_on_demand_clusters,
245-
filtered_comp_tasks_in_db=filtered_comp_tasks_in_db,
245+
tasks_to_run=tasks_to_run,
246246
)
247247

248248

@@ -359,7 +359,7 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
359359
project=project,
360360
users_repo=users_repo,
361361
projects_metadata_repo=projects_metadata_repo,
362-
filtered_comp_tasks_in_db=filtered_tasks,
362+
tasks_to_run=filtered_tasks,
363363
)
364364

365365
pipeline_state = utils.get_pipeline_state_from_task_states(filtered_tasks)

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ async def run_new_pipeline(
4141
project_id: ProjectID,
4242
run_metadata: RunMetadataDict,
4343
use_on_demand_clusters: bool,
44-
filtered_comp_tasks_in_db: list[CompTaskAtDB],
44+
tasks_to_run: list[CompTaskAtDB],
4545
) -> None:
4646
"""Sets a new pipeline to be scheduled on the computational resources."""
4747
# ensure the pipeline exists and is populated with something
@@ -69,7 +69,7 @@ async def run_new_pipeline(
6969
"run_id": new_run.run_id,
7070
# "submit": datetime.fromisoformat(task.submit)
7171
}
72-
for task in filtered_comp_tasks_in_db
72+
for task in tasks_to_run
7373
]
7474
await CompRunsSnapshotTasksRepository.instance(db_engine).batch_create(
7575
data=db_create_snaphot_tasks

services/director-v2/tests/unit/_helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class PublishedProject:
2121
project: ProjectAtDB
2222
pipeline: CompPipelineAtDB
2323
tasks: list[CompTaskAtDB]
24+
tasks_to_run: list[CompTaskAtDB]
2425

2526

2627
@dataclass(kw_only=True)

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async def test_schedule_all_pipelines(
156156
project_id=published_project.project.uuid,
157157
run_metadata=run_metadata,
158158
use_on_demand_clusters=False,
159-
filtered_comp_tasks_in_db=[],
159+
tasks_to_run=published_project.tasks_to_run,
160160
)
161161
# this directly schedule a new pipeline
162162
scheduler_rabbit_client_parser.assert_called_once_with(
@@ -258,7 +258,7 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
258258
project_id=published_project.project.uuid,
259259
run_metadata=run_metadata,
260260
use_on_demand_clusters=False,
261-
filtered_comp_tasks_in_db=[],
261+
tasks_to_run=published_project.tasks_to_run,
262262
)
263263
# this directly schedule a new pipeline
264264
scheduler_rabbit_client_parser.assert_called_once_with(
@@ -342,7 +342,7 @@ async def test_empty_pipeline_is_not_scheduled(
342342
project_id=empty_project.uuid,
343343
run_metadata=run_metadata,
344344
use_on_demand_clusters=False,
345-
filtered_comp_tasks_in_db=[],
345+
tasks_to_run=[],
346346
)
347347
await assert_comp_runs_empty(sqlalchemy_async_engine)
348348
scheduler_rabbit_client_parser.assert_not_called()
@@ -358,7 +358,7 @@ async def test_empty_pipeline_is_not_scheduled(
358358
project_id=empty_project.uuid,
359359
run_metadata=run_metadata,
360360
use_on_demand_clusters=False,
361-
filtered_comp_tasks_in_db=[],
361+
tasks_to_run=[],
362362
)
363363
assert len(caplog.records) == 1
364364
assert "no computational dag defined" in caplog.records[0].message

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_scheduler_dask.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ async def _assert_start_pipeline(
169169
project_id=published_project.project.uuid,
170170
run_metadata=run_metadata,
171171
use_on_demand_clusters=False,
172-
filtered_comp_tasks_in_db=[],
172+
tasks_to_run=published_project.tasks_to_run,
173173
)
174174

175175
# check the database is correctly updated, the run is published
@@ -1125,7 +1125,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
11251125
project_id=sleepers_project.uuid,
11261126
run_metadata=run_metadata,
11271127
use_on_demand_clusters=False,
1128-
filtered_comp_tasks_in_db=[],
1128+
tasks_to_run=[],
11291129
)
11301130
with_disabled_scheduler_publisher.assert_called_once()
11311131
# we shall have a a new comp_runs row with the new pipeline job
@@ -1253,7 +1253,7 @@ async def test_handling_of_disconnected_scheduler_dask(
12531253
project_id=published_project.project.uuid,
12541254
run_metadata=run_metadata,
12551255
use_on_demand_clusters=False,
1256-
filtered_comp_tasks_in_db=[],
1256+
tasks_to_run=published_project.tasks_to_run,
12571257
)
12581258

12591259
# since there is no cluster, there is no dask-scheduler,
@@ -1769,7 +1769,7 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
17691769
project_id=published_project.project.uuid,
17701770
run_metadata=run_metadata,
17711771
use_on_demand_clusters=True,
1772-
filtered_comp_tasks_in_db=[],
1772+
tasks_to_run=published_project.tasks_to_run,
17731773
)
17741774

17751775
# we ask to use an on-demand cluster, therefore the tasks are published first
@@ -1874,7 +1874,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
18741874
project_id=published_project.project.uuid,
18751875
run_metadata=run_metadata,
18761876
use_on_demand_clusters=True,
1877-
filtered_comp_tasks_in_db=[],
1877+
tasks_to_run=published_project.tasks_to_run,
18781878
)
18791879

18801880
# we ask to use an on-demand cluster, therefore the tasks are published first

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def test_worker_properly_autocalls_scheduler_api(
6969
project_id=published_project.project.uuid,
7070
run_metadata=run_metadata,
7171
use_on_demand_clusters=False,
72-
filtered_comp_tasks_in_db=[],
72+
tasks_to_run=published_project.tasks_to_run,
7373
)
7474
mocked_get_scheduler_worker.assert_called_once_with(initialized_app)
7575
mocked_get_scheduler_worker.return_value.apply.assert_called_once_with(
@@ -127,7 +127,7 @@ async def _project_pipeline_creation_workflow() -> None:
127127
project_id=published_project.project.uuid,
128128
run_metadata=run_metadata,
129129
use_on_demand_clusters=False,
130-
filtered_comp_tasks_in_db=[],
130+
tasks_to_run=published_project.tasks_to_run,
131131
)
132132

133133
# whatever scheduling concurrency we call in here, we shall always see the same number of calls to the scheduler

services/director-v2/tests/unit/with_dbs/conftest.py

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from pydantic.main import BaseModel
2323
from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline
2424
from simcore_postgres_database.models.comp_runs import comp_runs
25-
from simcore_postgres_database.models.comp_tasks import comp_tasks
25+
from simcore_postgres_database.models.comp_tasks import NodeClass, comp_tasks
2626
from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB
2727
from simcore_service_director_v2.models.comp_runs import (
2828
CompRunsAtDB,
@@ -231,16 +231,23 @@ async def publish_project(
231231

232232
async def _() -> PublishedProject:
233233
created_project = await project(user, workbench=fake_workbench_without_outputs)
234+
created_pipeline = await create_pipeline(
235+
project_id=f"{created_project.uuid}",
236+
dag_adjacency_list=fake_workbench_adjacency,
237+
)
238+
created_tasks = await create_tasks(
239+
user=user, project=created_project, state=StateType.PUBLISHED
240+
)
241+
tasks_to_run = [
242+
t for t in created_tasks if t.node_class == NodeClass.COMPUTATIONAL
243+
]
244+
234245
return PublishedProject(
235246
user=user,
236247
project=created_project,
237-
pipeline=await create_pipeline(
238-
project_id=f"{created_project.uuid}",
239-
dag_adjacency_list=fake_workbench_adjacency,
240-
),
241-
tasks=await create_tasks(
242-
user=user, project=created_project, state=StateType.PUBLISHED
243-
),
248+
pipeline=created_pipeline,
249+
tasks=created_tasks,
250+
tasks_to_run=tasks_to_run,
244251
)
245252

246253
return _
@@ -266,27 +273,31 @@ async def running_project(
266273
user = create_registered_user()
267274
created_project = await project(user, workbench=fake_workbench_without_outputs)
268275
now_time = arrow.utcnow().datetime
276+
created_tasks = await create_tasks(
277+
user=user,
278+
project=created_project,
279+
state=StateType.RUNNING,
280+
progress=0.0,
281+
start=now_time,
282+
)
283+
tasks_to_run = [t for t in created_tasks if t.node_class == NodeClass.COMPUTATIONAL]
284+
269285
return RunningProject(
270286
user=user,
271287
project=created_project,
272288
pipeline=await create_pipeline(
273289
project_id=f"{created_project.uuid}",
274290
dag_adjacency_list=fake_workbench_adjacency,
275291
),
276-
tasks=await create_tasks(
277-
user=user,
278-
project=created_project,
279-
state=StateType.RUNNING,
280-
progress=0.0,
281-
start=now_time,
282-
),
292+
tasks=created_tasks,
283293
runs=await create_comp_run(
284294
user=user,
285295
project=created_project,
286296
started=now_time,
287297
result=StateType.RUNNING,
288298
),
289299
task_to_callback_mapping={},
300+
tasks_to_run=tasks_to_run,
290301
)
291302

292303

@@ -302,6 +313,14 @@ async def running_project_mark_for_cancellation(
302313
) -> RunningProject:
303314
user = create_registered_user()
304315
created_project = await project(user, workbench=fake_workbench_without_outputs)
316+
created_tasks = await create_tasks(
317+
user=user,
318+
project=created_project,
319+
state=StateType.RUNNING,
320+
progress=0.0,
321+
start=now_time,
322+
)
323+
tasks_to_run = [t for t in created_tasks if t.node_class == NodeClass.COMPUTATIONAL]
305324
now_time = arrow.utcnow().datetime
306325
return RunningProject(
307326
user=user,
@@ -310,13 +329,7 @@ async def running_project_mark_for_cancellation(
310329
project_id=f"{created_project.uuid}",
311330
dag_adjacency_list=fake_workbench_adjacency,
312331
),
313-
tasks=await create_tasks(
314-
user=user,
315-
project=created_project,
316-
state=StateType.RUNNING,
317-
progress=0.0,
318-
start=now_time,
319-
),
332+
tasks=created_tasks,
320333
runs=await create_comp_run(
321334
user=user,
322335
project=created_project,
@@ -325,6 +338,7 @@ async def running_project_mark_for_cancellation(
325338
cancelled=now_time + datetime.timedelta(seconds=5),
326339
),
327340
task_to_callback_mapping={},
341+
tasks_to_run=tasks_to_run,
328342
)
329343

330344

0 commit comments

Comments
 (0)