2222from pytest_simcore .helpers .typing_env import EnvVarsDict
2323from servicelib .rabbitmq ._client import RabbitMQClient
2424from servicelib .redis import CouldNotAcquireLockError
25+ from servicelib .utils import limited_gather
2526from settings_library .rabbit import RabbitSettings
2627from settings_library .redis import RedisSettings
2728from simcore_postgres_database .models .comp_runs import comp_runs
3637from sqlalchemy .ext .asyncio import AsyncEngine
3738
3839pytest_simcore_core_services_selection = ["postgres" , "rabbit" , "redis" ]
39- pytest_simcore_ops_services_selection = [
40- "adminer" ,
41- ]
40+ pytest_simcore_ops_services_selection = ["adminer" , "redis-commander" ]
4241
4342
4443@pytest .fixture
@@ -80,7 +79,7 @@ async def _assert_comp_runs(
8079 async with sqlalchemy_async_engine .connect () as conn :
8180 list_of_comp_runs = [
8281 CompRunsAtDB .from_orm (row )
83- async for row in await conn .stream (sa .select (). select_from (comp_runs ))
82+ for row in await conn .execute (sa .select (comp_runs ))
8483 ]
8584 assert len (list_of_comp_runs ) == expected_total
8685 return list_of_comp_runs
@@ -106,25 +105,34 @@ async def test_schedule_pipelines_empty_db(
106105 await _assert_comp_runs_empty (sqlalchemy_async_engine )
107106
108107
109- async def test_schedule_pipelines_concurently_raises_and_only_one_runs (
110- initialized_app : FastAPI ,
108+ async def test_schedule_pipelines_concurently_runs_exclusively_and_raises (
109+ initialized_app : FastAPI , mocker : MockerFixture , monkeypatch : pytest . MonkeyPatch
111110):
112111 CONCURRENCY = 5
113- # TODO: this can be flaky as an empty scheduling is very short
114- with pytest .raises (
115- CouldNotAcquireLockError ,
116- match = ".+ computational-distributed-scheduler" ,
117- ):
118- await asyncio .gather (
119- * (schedule_pipelines (initialized_app ) for _ in range (CONCURRENCY ))
120- )
112+ # NOTE: this ensure no flakyness as empty scheduling is very fast
113+ original_function = limited_gather
114+
115+ async def slow_limited_gather (* args , ** kwargs ):
116+ result = await original_function (* args , ** kwargs )
117+ await asyncio .sleep (3 ) # to ensure flakyness does not occur
118+ return result
119+
120+ mock_function = mocker .patch (
121+ "simcore_service_director_v2.modules.comp_scheduler._distributed_scheduler.limited_gather" ,
122+ autospec = True ,
123+ side_effect = slow_limited_gather ,
124+ )
121125
122126 results = await asyncio .gather (
123127 * (schedule_pipelines (initialized_app ) for _ in range (CONCURRENCY )),
124128 return_exceptions = True ,
125129 )
126130
127- assert results .count (None ) == 1 , "Only one task should have run"
131+ assert results .count (None ) == 1 , f"Only one task should have run: { results } "
132+ for r in results :
133+ if r :
134+ assert isinstance (r , CouldNotAcquireLockError )
135+ mock_function .assert_called_once ()
128136
129137
130138async def test_schedule_pipelines_with_non_scheduled_runs (
@@ -135,8 +143,8 @@ async def test_schedule_pipelines_with_non_scheduled_runs(
135143 scheduler_rabbit_client_parser : mock .AsyncMock ,
136144):
137145 await _assert_comp_runs_empty (sqlalchemy_async_engine )
138- # now we schedule a pipeline
139146 assert published_project .project .prj_owner
147+ # now we schedule a pipeline
140148 await run_new_pipeline (
141149 initialized_app ,
142150 user_id = published_project .project .prj_owner ,
@@ -145,5 +153,11 @@ async def test_schedule_pipelines_with_non_scheduled_runs(
145153 run_metadata = run_metadata ,
146154 use_on_demand_clusters = False ,
147155 )
148- scheduler_rabbit_client_parser .assert_called_once_with ()
156+ scheduler_rabbit_client_parser .assert_called_once_with (
157+ SchedulePipelineRabbitMessage (
158+ user_id = published_project .project .prj_owner ,
159+ project_id = published_project .project .uuid ,
160+ iteration = 1 ,
161+ ).body ()
162+ )
149163 comp_runs = await _assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
0 commit comments