1111
1212import asyncio
1313import datetime
14+ from calendar import c
1415from collections .abc import AsyncIterator , Awaitable , Callable
1516from copy import deepcopy
1617from dataclasses import dataclass
3233from dask_task_models_library .container_tasks .protocol import TaskOwner
3334from faker import Faker
3435from fastapi .applications import FastAPI
36+ from models_library .computations import CollectionRunID
3537from models_library .projects import ProjectAtDB , ProjectID
3638from models_library .projects_nodes_io import NodeID
3739from models_library .projects_state import RunningState
@@ -164,6 +166,7 @@ async def _assert_start_pipeline(
164166 published_project : PublishedProject ,
165167 run_metadata : RunMetadataDict ,
166168 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
169+ fake_collection_run_id : CollectionRunID ,
167170) -> tuple [CompRunsAtDB , list [CompTaskAtDB ]]:
168171 exp_published_tasks = deepcopy (published_project .tasks )
169172 assert published_project .project .prj_owner
@@ -173,6 +176,7 @@ async def _assert_start_pipeline(
173176 project_id = published_project .project .uuid ,
174177 run_metadata = run_metadata ,
175178 use_on_demand_clusters = False ,
179+ collection_run_id = fake_collection_run_id ,
176180 )
177181
178182 # check the database is correctly updated, the run is published
@@ -472,6 +476,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
472476 resource_tracking_rabbit_client_parser : mock .AsyncMock ,
473477 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
474478 run_metadata : RunMetadataDict ,
479+ fake_collection_run_id : CollectionRunID ,
475480):
476481 with_disabled_auto_scheduling .assert_called_once ()
477482 _with_mock_send_computation_tasks (published_project .tasks , mocked_dask_client )
@@ -485,6 +490,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
485490 published_project = published_project ,
486491 run_metadata = run_metadata ,
487492 computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
493+ collection_run_id = fake_collection_run_id ,
488494 )
489495 with_disabled_scheduler_publisher .assert_called ()
490496
@@ -965,6 +971,7 @@ async def with_started_project(
965971 instrumentation_rabbit_client_parser : mock .AsyncMock ,
966972 resource_tracking_rabbit_client_parser : mock .AsyncMock ,
967973 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
974+ fake_collection_run_id : CollectionRunID ,
968975) -> RunningProject :
969976 with_disabled_auto_scheduling .assert_called_once ()
970977 published_project = await publish_project ()
@@ -977,6 +984,7 @@ async def with_started_project(
977984 published_project = published_project ,
978985 run_metadata = run_metadata ,
979986 computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
987+ collection_run_id = fake_collection_run_id ,
980988 )
981989 with_disabled_scheduler_publisher .assert_called_once ()
982990
@@ -1210,6 +1218,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
12101218 sqlalchemy_async_engine : AsyncEngine ,
12111219 run_metadata : RunMetadataDict ,
12121220 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
1221+ fake_collection_run_id : CollectionRunID ,
12131222):
12141223 """A pipeline which comp_tasks are missing should not be scheduled.
12151224 It shall be aborted and shown as such in the comp_runs db"""
@@ -1230,6 +1239,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
12301239 project_id = sleepers_project .uuid ,
12311240 run_metadata = run_metadata ,
12321241 use_on_demand_clusters = False ,
1242+ collection_run_id = fake_collection_run_id ,
12331243 )
12341244 with_disabled_scheduler_publisher .assert_called_once ()
12351245 # we shall have a a new comp_runs row with the new pipeline job
@@ -1282,6 +1292,7 @@ async def test_task_progress_triggers(
12821292 mocked_clean_task_output_and_log_files_if_invalid : mock .Mock ,
12831293 run_metadata : RunMetadataDict ,
12841294 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
1295+ fake_collection_run_id : CollectionRunID ,
12851296):
12861297 _with_mock_send_computation_tasks (published_project .tasks , mocked_dask_client )
12871298 _run_in_db , expected_published_tasks = await _assert_start_pipeline (
@@ -1290,6 +1301,7 @@ async def test_task_progress_triggers(
12901301 published_project = published_project ,
12911302 run_metadata = run_metadata ,
12921303 computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
1304+ collection_run_id = fake_collection_run_id ,
12931305 )
12941306
12951307 # -------------------------------------------------------------------------------
@@ -1357,6 +1369,7 @@ async def test_handling_of_disconnected_scheduler_dask(
13571369 backend_error : ComputationalSchedulerError ,
13581370 run_metadata : RunMetadataDict ,
13591371 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
1372+ fake_collection_run_id : CollectionRunID ,
13601373):
13611374 # this will create a non connected backend issue that will trigger re-connection
13621375 mocked_dask_client_send_task = mocker .patch (
@@ -1373,6 +1386,7 @@ async def test_handling_of_disconnected_scheduler_dask(
13731386 project_id = published_project .project .uuid ,
13741387 run_metadata = run_metadata ,
13751388 use_on_demand_clusters = False ,
1389+ collection_run_id = fake_collection_run_id ,
13761390 )
13771391 await _assert_message_received (
13781392 computational_pipeline_rabbit_client_parser ,
@@ -1801,6 +1815,7 @@ async def test_running_pipeline_triggers_heartbeat(
18011815 resource_tracking_rabbit_client_parser : mock .AsyncMock ,
18021816 run_metadata : RunMetadataDict ,
18031817 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
1818+ fake_collection_run_id : CollectionRunID ,
18041819):
18051820 _with_mock_send_computation_tasks (published_project .tasks , mocked_dask_client )
18061821 run_in_db , expected_published_tasks = await _assert_start_pipeline (
@@ -1809,6 +1824,7 @@ async def test_running_pipeline_triggers_heartbeat(
18091824 published_project = published_project ,
18101825 run_metadata = run_metadata ,
18111826 computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
1827+ collection_run_id = fake_collection_run_id ,
18121828 )
18131829 # -------------------------------------------------------------------------------
18141830 # 1. first run will move comp_tasks to PENDING so the dask-worker can take them
@@ -1918,6 +1934,7 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
19181934 mocked_get_or_create_cluster : mock .Mock ,
19191935 faker : Faker ,
19201936 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
1937+ fake_collection_run_id : CollectionRunID ,
19211938):
19221939 mocked_get_or_create_cluster .side_effect = (
19231940 ComputationalBackendOnDemandNotReadyError (
@@ -1932,6 +1949,7 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
19321949 project_id = published_project .project .uuid ,
19331950 run_metadata = run_metadata ,
19341951 use_on_demand_clusters = True ,
1952+ collection_run_id = fake_collection_run_id ,
19351953 )
19361954
19371955 # we ask to use an on-demand cluster, therefore the tasks are published first
@@ -2038,6 +2056,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
20382056 mocked_get_or_create_cluster : mock .Mock ,
20392057 get_or_create_exception : Exception ,
20402058 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
2059+ fake_collection_run_id : CollectionRunID ,
20412060):
20422061 # needs to change: https://github.com/ITISFoundation/osparc-simcore/issues/6817
20432062
@@ -2050,6 +2069,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
20502069 project_id = published_project .project .uuid ,
20512070 run_metadata = run_metadata ,
20522071 use_on_demand_clusters = True ,
2072+ collection_run_id = fake_collection_run_id ,
20532073 )
20542074
20552075 # we ask to use an on-demand cluster, therefore the tasks are published first
@@ -2151,6 +2171,7 @@ async def test_run_new_pipeline_called_twice_prevents_duplicate_runs(
21512171 published_project : PublishedProject ,
21522172 run_metadata : RunMetadataDict ,
21532173 computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
2174+ fake_collection_run_id : CollectionRunID ,
21542175):
21552176 # Ensure we start with an empty database
21562177 await assert_comp_runs_empty (sqlalchemy_async_engine )
@@ -2163,6 +2184,7 @@ async def test_run_new_pipeline_called_twice_prevents_duplicate_runs(
21632184 project_id = published_project .project .uuid ,
21642185 run_metadata = run_metadata ,
21652186 use_on_demand_clusters = False ,
2187+ collection_run_id = fake_collection_run_id ,
21662188 )
21672189
21682190 # Verify first run was created and published
@@ -2191,6 +2213,7 @@ async def test_run_new_pipeline_called_twice_prevents_duplicate_runs(
21912213 project_id = published_project .project .uuid ,
21922214 run_metadata = run_metadata ,
21932215 use_on_demand_clusters = False ,
2216+ collection_run_id = fake_collection_run_id ,
21942217 )
21952218
21962219 # Verify still only one run exists with same run_id
0 commit comments