1212from unittest import mock
1313
1414import pytest
15+ from tenacity import retry , stop_after_delay , wait_fixed
1516from _helpers import PublishedProject
1617from fastapi import FastAPI
1718from pytest_mock import MockerFixture
2526from simcore_service_director_v2 .modules .comp_scheduler ._worker import (
2627 _get_scheduler_worker ,
2728)
29+ from settings_library .rabbit import RabbitSettings
2830
2931pytest_simcore_core_services_selection = ["postgres" , "rabbit" , "redis" ]
3032pytest_simcore_ops_services_selection = ["adminer" ]
@@ -93,19 +95,21 @@ def with_scheduling_concurrency(
9395 )
9496
9597
98+ @pytest .mark .testit
9699@pytest .mark .parametrize ("scheduling_concurrency" , [1 , 50 , 100 ])
97100@pytest .mark .parametrize (
98101 "queue_name" , [SchedulePipelineRabbitMessage .get_channel_name ()]
99102)
100103async def test_worker_scheduling_parallelism (
104+ rabbit_service : RabbitSettings ,
105+ ensure_parametrized_queue_is_empty : None ,
101106 scheduling_concurrency : int ,
102107 with_scheduling_concurrency : EnvVarsDict ,
103108 with_disabled_auto_scheduling : mock .Mock ,
104109 mocked_scheduler_api : mock .Mock ,
105110 initialized_app : FastAPI ,
106111 publish_project : Callable [[], Awaitable [PublishedProject ]],
107112 run_metadata : RunMetadataDict ,
108- ensure_parametrized_queue_is_empty : None ,
109113):
110114 with_disabled_auto_scheduling .assert_called_once ()
111115
@@ -125,8 +129,15 @@ async def _project_pipeline_creation_workflow() -> None:
125129 use_on_demand_clusters = False ,
126130 )
127131
132+ # whatever scheduling concurrency we call in here, we shall always see the same number of calls to the scheduler
128133 await asyncio .gather (
129134 * (_project_pipeline_creation_workflow () for _ in range (scheduling_concurrency ))
130135 )
136+ # the call to run the pipeline is async so we need to wait here
131137 mocked_scheduler_api .assert_called ()
132- assert mocked_scheduler_api .call_count == scheduling_concurrency
138+
139+ @retry (stop = stop_after_delay (5 ), reraise = True , wait = wait_fixed (0.5 ))
140+ def _assert_expected_called ():
141+ assert mocked_scheduler_api .call_count == scheduling_concurrency
142+
143+ _assert_expected_called ()
0 commit comments