@@ -162,6 +162,7 @@ async def _assert_start_pipeline(
162162 sqlalchemy_async_engine : AsyncEngine ,
163163 published_project : PublishedProject ,
164164 run_metadata : RunMetadataDict ,
165+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
165166) -> tuple [CompRunsAtDB , list [CompTaskAtDB ]]:
166167 exp_published_tasks = deepcopy (published_project .tasks )
167168 assert published_project .project .prj_owner
@@ -183,6 +184,11 @@ async def _assert_start_pipeline(
183184 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
184185 ),
185186 )
187+ await _assert_message_received (
188+ computational_pipeline_rabbit_client_parser ,
189+ 1 ,
190+ ComputationalPipelineStatusMessage .model_validate_json ,
191+ )
186192 await assert_comp_tasks_and_comp_run_snapshot_tasks (
187193 sqlalchemy_async_engine ,
188194 project_uuid = published_project .project .uuid ,
@@ -390,9 +396,9 @@ async def _assert_message_received(
390396 print (
391397 f"--> waiting for rabbitmq message [{ attempt .retry_state .attempt_number } , { attempt .retry_state .idle_for } ]"
392398 )
393- assert mocked_message_parser . call_count == expected_call_count , (
394- mocked_message_parser .call_args_list
395- )
399+ assert (
400+ mocked_message_parser .call_count == expected_call_count
401+ ), mocked_message_parser . call_args_list
396402 print (
397403 f"<-- rabbitmq message received after [{ attempt .retry_state .attempt_number } , { attempt .retry_state .idle_for } ]"
398404 )
@@ -477,15 +483,10 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
477483 sqlalchemy_async_engine = sqlalchemy_async_engine ,
478484 published_project = published_project ,
479485 run_metadata = run_metadata ,
486+ computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
480487 )
481488 with_disabled_scheduler_publisher .assert_called ()
482489
483- await _assert_message_received (
484- computational_pipeline_rabbit_client_parser ,
485- 1 ,
486- ComputationalPipelineStatusMessage .model_validate_json ,
487- )
488-
489490 # -------------------------------------------------------------------------------
490491 # 1. first run will move comp_tasks to PENDING so the dask-worker can take them
491492 expected_pending_tasks , _ = await _assert_publish_in_dask_backend (
@@ -974,13 +975,10 @@ async def with_started_project(
974975 sqlalchemy_async_engine = sqlalchemy_async_engine ,
975976 published_project = published_project ,
976977 run_metadata = run_metadata ,
978+ computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
977979 )
978980 with_disabled_scheduler_publisher .assert_called_once ()
979- await _assert_message_received (
980- computational_pipeline_rabbit_client_parser ,
981- 1 ,
982- ComputationalPipelineStatusMessage .model_validate_json ,
983- )
981+
984982 #
985983 # 2. This runs the scheduler until the project is started scheduled in the back-end
986984 #
@@ -1290,12 +1288,9 @@ async def test_task_progress_triggers(
12901288 sqlalchemy_async_engine = sqlalchemy_async_engine ,
12911289 published_project = published_project ,
12921290 run_metadata = run_metadata ,
1291+ computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
12931292 )
1294- await _assert_message_received (
1295- computational_pipeline_rabbit_client_parser ,
1296- 1 ,
1297- ComputationalPipelineStatusMessage .model_validate_json ,
1298- )
1293+
12991294 # -------------------------------------------------------------------------------
13001295 # 1. first run will move comp_tasks to PENDING so the dask-worker can take them
13011296 expected_pending_tasks , _ = await _assert_publish_in_dask_backend (
@@ -1474,7 +1469,6 @@ class RebootState:
14741469 expected_pipeline_state_notification : int
14751470
14761471
1477- @pytest .mark .testit
14781472@pytest .mark .parametrize (
14791473 "reboot_state" ,
14801474 [
@@ -1657,6 +1651,7 @@ async def test_handling_cancellation_of_jobs_after_reboot(
16571651 scheduler_api : BaseCompScheduler ,
16581652 mocked_parse_output_data_fct : mock .Mock ,
16591653 mocked_clean_task_output_fct : mock .Mock ,
1654+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
16601655):
16611656 """A running pipeline was cancelled by a user and the DV-2 was restarted BEFORE
16621657 It could actually cancel the task. On reboot the DV-2 shall recover
@@ -1676,6 +1671,11 @@ async def test_handling_cancellation_of_jobs_after_reboot(
16761671 ),
16771672 )
16781673 )[0 ]
1674+ await _assert_message_received (
1675+ computational_pipeline_rabbit_client_parser ,
1676+ 0 ,
1677+ ComputationalPipelineStatusMessage .model_validate_json ,
1678+ )
16791679
16801680 await assert_comp_tasks_and_comp_run_snapshot_tasks (
16811681 sqlalchemy_async_engine ,
@@ -1772,6 +1772,11 @@ async def _return_random_task_result(job_id) -> TaskOutputData:
17721772 ),
17731773 )
17741774 mocked_clean_task_output_fct .assert_called ()
1775+ await _assert_message_received (
1776+ computational_pipeline_rabbit_client_parser ,
1777+ 1 ,
1778+ ComputationalPipelineStatusMessage .model_validate_json ,
1779+ )
17751780
17761781
17771782@pytest .fixture
@@ -1794,13 +1799,15 @@ async def test_running_pipeline_triggers_heartbeat(
17941799 published_project : PublishedProject ,
17951800 resource_tracking_rabbit_client_parser : mock .AsyncMock ,
17961801 run_metadata : RunMetadataDict ,
1802+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
17971803):
17981804 _with_mock_send_computation_tasks (published_project .tasks , mocked_dask_client )
17991805 run_in_db , expected_published_tasks = await _assert_start_pipeline (
18001806 initialized_app ,
18011807 sqlalchemy_async_engine = sqlalchemy_async_engine ,
18021808 published_project = published_project ,
18031809 run_metadata = run_metadata ,
1810+ computational_pipeline_rabbit_client_parser = computational_pipeline_rabbit_client_parser ,
18041811 )
18051812 # -------------------------------------------------------------------------------
18061813 # 1. first run will move comp_tasks to PENDING so the dask-worker can take them
@@ -1810,6 +1817,7 @@ async def test_running_pipeline_triggers_heartbeat(
18101817 expected_published_tasks ,
18111818 mocked_dask_client ,
18121819 scheduler_api ,
1820+ computational_pipeline_rabbit_client_parser ,
18131821 )
18141822 # -------------------------------------------------------------------------------
18151823 # 2. the "worker" starts processing a task
@@ -1908,6 +1916,7 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
19081916 run_metadata : RunMetadataDict ,
19091917 mocked_get_or_create_cluster : mock .Mock ,
19101918 faker : Faker ,
1919+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
19111920):
19121921 mocked_get_or_create_cluster .side_effect = (
19131922 ComputationalBackendOnDemandNotReadyError (
@@ -1936,6 +1945,11 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
19361945 ),
19371946 )
19381947 )[0 ]
1948+ await _assert_message_received (
1949+ computational_pipeline_rabbit_client_parser ,
1950+ 1 ,
1951+ ComputationalPipelineStatusMessage .model_validate_json ,
1952+ )
19391953 await assert_comp_tasks_and_comp_run_snapshot_tasks (
19401954 sqlalchemy_async_engine ,
19411955 project_uuid = published_project .project .uuid ,
@@ -1967,6 +1981,11 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
19671981 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
19681982 ),
19691983 )
1984+ await _assert_message_received (
1985+ computational_pipeline_rabbit_client_parser ,
1986+ 1 ,
1987+ ComputationalPipelineStatusMessage .model_validate_json ,
1988+ )
19701989 await assert_comp_tasks_and_comp_run_snapshot_tasks (
19711990 sqlalchemy_async_engine ,
19721991 project_uuid = published_project .project .uuid ,
@@ -2017,6 +2036,7 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
20172036 run_metadata : RunMetadataDict ,
20182037 mocked_get_or_create_cluster : mock .Mock ,
20192038 get_or_create_exception : Exception ,
2039+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
20202040):
20212041 # needs to change: https://github.com/ITISFoundation/osparc-simcore/issues/6817
20222042
@@ -2043,6 +2063,11 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
20432063 ),
20442064 )
20452065 )[0 ]
2066+ await _assert_message_received (
2067+ computational_pipeline_rabbit_client_parser ,
2068+ 1 ,
2069+ ComputationalPipelineStatusMessage .model_validate_json ,
2070+ )
20462071 await assert_comp_tasks_and_comp_run_snapshot_tasks (
20472072 sqlalchemy_async_engine ,
20482073 project_uuid = published_project .project .uuid ,
@@ -2073,6 +2098,11 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
20732098 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
20742099 ),
20752100 )
2101+ await _assert_message_received (
2102+ computational_pipeline_rabbit_client_parser ,
2103+ 1 ,
2104+ ComputationalPipelineStatusMessage .model_validate_json ,
2105+ )
20762106 await assert_comp_tasks_and_comp_run_snapshot_tasks (
20772107 sqlalchemy_async_engine ,
20782108 project_uuid = published_project .project .uuid ,
@@ -2097,6 +2127,11 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
20972127 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
20982128 ),
20992129 )
2130+ await _assert_message_received (
2131+ computational_pipeline_rabbit_client_parser ,
2132+ 0 ,
2133+ ComputationalPipelineStatusMessage .model_validate_json ,
2134+ )
21002135 await assert_comp_tasks_and_comp_run_snapshot_tasks (
21012136 sqlalchemy_async_engine ,
21022137 project_uuid = published_project .project .uuid ,
0 commit comments