3535from models_library .projects_nodes_io import NodeID
3636from models_library .projects_state import RunningState
3737from models_library .rabbitmq_messages import (
38+ ComputationalPipelineStatusMessage ,
3839 InstrumentationRabbitMessage ,
3940 RabbitResourceTrackingBaseMessage ,
4041 RabbitResourceTrackingHeartbeatMessage ,
@@ -198,6 +199,7 @@ async def _assert_publish_in_dask_backend(
198199 published_tasks : list [CompTaskAtDB ],
199200 mocked_dask_client : mock .MagicMock ,
200201 scheduler : BaseCompScheduler ,
202+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
201203) -> tuple [list [CompTaskAtDB ], dict [NodeID , Callable [[], None ]]]:
202204 expected_pending_tasks = [
203205 published_tasks [1 ],
@@ -285,6 +287,11 @@ async def _return_tasks_pending(job_ids: list[str]) -> list[RunningState]:
285287 where_statement = (comp_runs .c .user_id == published_project .project .prj_owner )
286288 & (comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ),
287289 )
290+ await _assert_message_received (
291+ computational_pipeline_rabbit_client_parser ,
292+ 1 ,
293+ ComputationalPipelineStatusMessage .model_validate_json ,
294+ )
288295 await assert_comp_tasks_and_comp_run_snapshot_tasks (
289296 sqlalchemy_async_engine ,
290297 project_uuid = published_project .project .uuid ,
@@ -336,6 +343,19 @@ async def resource_tracking_rabbit_client_parser(
336343 await client .unsubscribe (queue_name )
337344
338345
346+ @pytest .fixture
347+ async def computational_pipeline_rabbit_client_parser (
348+ create_rabbitmq_client : Callable [[str ], RabbitMQClient ], mocker : MockerFixture
349+ ) -> AsyncIterator [mock .AsyncMock ]:
350+ client = create_rabbitmq_client ("computational_pipeline_pytest_consumer" )
351+ mock = mocker .AsyncMock (return_value = True )
352+ queue_name , _ = await client .subscribe (
353+ ComputationalPipelineStatusMessage .get_channel_name (), mock
354+ )
355+ yield mock
356+ await client .unsubscribe (queue_name )
357+
358+
339359async def _assert_message_received (
340360 mocked_message_parser : mock .AsyncMock ,
341361 expected_call_count : int ,
@@ -422,6 +442,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
422442 mocked_clean_task_output_and_log_files_if_invalid : mock .Mock ,
423443 instrumentation_rabbit_client_parser : mock .AsyncMock ,
424444 resource_tracking_rabbit_client_parser : mock .AsyncMock ,
445+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
425446 run_metadata : RunMetadataDict ,
426447):
427448 with_disabled_auto_scheduling .assert_called_once ()
@@ -438,6 +459,12 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
438459 )
439460 with_disabled_scheduler_publisher .assert_called ()
440461
462+ await _assert_message_received (
463+ computational_pipeline_rabbit_client_parser ,
464+ 1 ,
465+ ComputationalPipelineStatusMessage .model_validate_json ,
466+ )
467+
441468 # -------------------------------------------------------------------------------
442469 # 1. first run will move comp_tasks to PENDING so the dask-worker can take them
443470 expected_pending_tasks , _ = await _assert_publish_in_dask_backend (
@@ -446,6 +473,7 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
446473 expected_published_tasks ,
447474 mocked_dask_client ,
448475 scheduler_api ,
476+ computational_pipeline_rabbit_client_parser ,
449477 )
450478
451479 # -------------------------------------------------------------------------------
@@ -478,6 +506,11 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[RunningState]:
478506 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
479507 ),
480508 )
509+ await _assert_message_received (
510+ computational_pipeline_rabbit_client_parser ,
511+ 1 ,
512+ ComputationalPipelineStatusMessage .model_validate_json ,
513+ )
481514 await assert_comp_tasks_and_comp_run_snapshot_tasks (
482515 sqlalchemy_async_engine ,
483516 project_uuid = published_project .project .uuid ,
@@ -854,6 +887,11 @@ async def _return_3rd_task_success(job_ids: list[str]) -> list[RunningState]:
854887 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
855888 ),
856889 )
890+ await _assert_message_received (
891+ computational_pipeline_rabbit_client_parser ,
892+ 1 ,
893+ ComputationalPipelineStatusMessage .model_validate_json ,
894+ )
857895
858896 await assert_comp_tasks_and_comp_run_snapshot_tasks (
859897 sqlalchemy_async_engine ,
@@ -903,6 +941,7 @@ async def with_started_project(
903941 scheduler_api : BaseCompScheduler ,
904942 instrumentation_rabbit_client_parser : mock .AsyncMock ,
905943 resource_tracking_rabbit_client_parser : mock .AsyncMock ,
944+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
906945) -> RunningProject :
907946 with_disabled_auto_scheduling .assert_called_once ()
908947 published_project = await publish_project ()
@@ -916,7 +955,11 @@ async def with_started_project(
916955 run_metadata = run_metadata ,
917956 )
918957 with_disabled_scheduler_publisher .assert_called_once ()
919-
958+ await _assert_message_received (
959+ computational_pipeline_rabbit_client_parser ,
960+ 1 ,
961+ ComputationalPipelineStatusMessage .model_validate_json ,
962+ )
920963 #
921964 # 2. This runs the scheduler until the project is started scheduled in the back-end
922965 #
@@ -929,6 +972,7 @@ async def with_started_project(
929972 expected_published_tasks ,
930973 mocked_dask_client ,
931974 scheduler_api ,
975+ computational_pipeline_rabbit_client_parser ,
932976 )
933977
934978 #
@@ -966,6 +1010,11 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[RunningState]:
9661010 comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
9671011 ),
9681012 )
1013+ await _assert_message_received (
1014+ computational_pipeline_rabbit_client_parser ,
1015+ 1 ,
1016+ ComputationalPipelineStatusMessage .model_validate_json ,
1017+ )
9691018 await assert_comp_tasks_and_comp_run_snapshot_tasks (
9701019 sqlalchemy_async_engine ,
9711020 project_uuid = published_project .project .uuid ,
@@ -1140,6 +1189,7 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
11401189 fake_workbench_adjacency : dict [str , Any ],
11411190 sqlalchemy_async_engine : AsyncEngine ,
11421191 run_metadata : RunMetadataDict ,
1192+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
11431193):
11441194 """A pipeline which comp_tasks are missing should not be scheduled.
11451195 It shall be aborted and shown as such in the comp_runs db"""
@@ -1172,6 +1222,11 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
11721222 & (comp_runs .c .project_uuid == f"{ sleepers_project .uuid } " ),
11731223 )
11741224 )[0 ]
1225+ await _assert_message_received (
1226+ computational_pipeline_rabbit_client_parser ,
1227+ 1 ,
1228+ ComputationalPipelineStatusMessage .model_validate_json ,
1229+ )
11751230
11761231 #
11771232 # Trigger scheduling manually. since the pipeline is broken, it shall be aborted
@@ -1188,6 +1243,11 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
11881243 where_statement = (comp_runs .c .user_id == user ["id" ])
11891244 & (comp_runs .c .project_uuid == f"{ sleepers_project .uuid } " ),
11901245 )
1246+ await _assert_message_received (
1247+ computational_pipeline_rabbit_client_parser ,
1248+ 1 ,
1249+ ComputationalPipelineStatusMessage .model_validate_json ,
1250+ )
11911251
11921252
11931253async def test_task_progress_triggers (
@@ -1201,6 +1261,7 @@ async def test_task_progress_triggers(
12011261 mocked_parse_output_data_fct : mock .Mock ,
12021262 mocked_clean_task_output_and_log_files_if_invalid : mock .Mock ,
12031263 run_metadata : RunMetadataDict ,
1264+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
12041265):
12051266 _with_mock_send_computation_tasks (published_project .tasks , mocked_dask_client )
12061267 _run_in_db , expected_published_tasks = await _assert_start_pipeline (
@@ -1209,6 +1270,11 @@ async def test_task_progress_triggers(
12091270 published_project = published_project ,
12101271 run_metadata = run_metadata ,
12111272 )
1273+ await _assert_message_received (
1274+ computational_pipeline_rabbit_client_parser ,
1275+ 1 ,
1276+ ComputationalPipelineStatusMessage .model_validate_json ,
1277+ )
12121278 # -------------------------------------------------------------------------------
12131279 # 1. first run will move comp_tasks to PENDING so the dask-worker can take them
12141280 expected_pending_tasks , _ = await _assert_publish_in_dask_backend (
@@ -1217,6 +1283,7 @@ async def test_task_progress_triggers(
12171283 expected_published_tasks ,
12181284 mocked_dask_client ,
12191285 scheduler_api ,
1286+ computational_pipeline_rabbit_client_parser ,
12201287 )
12211288
12221289 # send some progress
@@ -1272,6 +1339,7 @@ async def test_handling_of_disconnected_scheduler_dask(
12721339 published_project : PublishedProject ,
12731340 backend_error : ComputationalSchedulerError ,
12741341 run_metadata : RunMetadataDict ,
1342+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
12751343):
12761344 # this will create a non connected backend issue that will trigger re-connection
12771345 mocked_dask_client_send_task = mocker .patch (
@@ -1289,6 +1357,11 @@ async def test_handling_of_disconnected_scheduler_dask(
12891357 run_metadata = run_metadata ,
12901358 use_on_demand_clusters = False ,
12911359 )
1360+ await _assert_message_received (
1361+ computational_pipeline_rabbit_client_parser ,
1362+ 1 ,
1363+ ComputationalPipelineStatusMessage .model_validate_json ,
1364+ )
12921365
12931366 # since there is no cluster, there is no dask-scheduler,
12941367 # the tasks shall all still be in PUBLISHED state now
@@ -1337,6 +1410,11 @@ async def test_handling_of_disconnected_scheduler_dask(
13371410 expected_progress = 1 ,
13381411 run_id = run_in_db .run_id ,
13391412 )
1413+ await _assert_message_received (
1414+ computational_pipeline_rabbit_client_parser ,
1415+ 1 ,
1416+ ComputationalPipelineStatusMessage .model_validate_json ,
1417+ )
13401418 # then we have another scheduler run
13411419 await scheduler_api .apply (
13421420 user_id = run_in_db .user_id ,
@@ -1452,6 +1530,7 @@ async def test_handling_scheduled_tasks_after_director_reboots(
14521530 mocked_parse_output_data_fct : mock .Mock ,
14531531 mocked_clean_task_output_fct : mock .Mock ,
14541532 reboot_state : RebootState ,
1533+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
14551534):
14561535 """After the dask client is rebooted, or that the director-v2 reboots the dv-2 internal scheduler
14571536 shall continue scheduling correctly. Even though the task might have continued to run
@@ -1534,6 +1613,11 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
15341613 comp_runs .c .project_uuid == f"{ running_project .project .uuid } " ,
15351614 ),
15361615 )
1616+ await _assert_message_received (
1617+ computational_pipeline_rabbit_client_parser ,
1618+ 1 ,
1619+ ComputationalPipelineStatusMessage .model_validate_json ,
1620+ )
15371621
15381622
15391623async def test_handling_cancellation_of_jobs_after_reboot (
0 commit comments