99
1010
1111import asyncio
12+ import contextlib
1213import datetime
1314import logging
1415from collections .abc import AsyncIterator , Awaitable , Callable
4243 CompRunsRepository ,
4344)
4445from sqlalchemy .ext .asyncio import AsyncEngine
46+ from tenacity import (
47+ RetryError ,
48+ retry ,
49+ retry_if_exception_type ,
50+ retry_unless_exception_type ,
51+ )
4552
4653pytest_simcore_core_services_selection = ["postgres" , "rabbit" , "redis" ]
4754pytest_simcore_ops_services_selection = ["adminer" , "redis-commander" ]
@@ -102,7 +109,7 @@ async def test_schedule_all_pipelines_empty_db(
102109 await schedule_all_pipelines (initialized_app )
103110
104111 # check nothing was distributed
105- scheduler_rabbit_client_parser . assert_not_called ( )
112+ _assert_scheduler_client_not_called ( scheduler_rabbit_client_parser )
106113
107114 # check comp_runs is still empty
108115 await assert_comp_runs_empty (sqlalchemy_async_engine )
@@ -142,6 +149,35 @@ async def slow_limited_gather(*args, **kwargs):
142149 mock_function .assert_called_once ()
143150
144151
152+ @retry (
153+ retry = retry_if_exception_type (AssertionError ),
154+ stop = retry .stop_after_attempt (3 ),
155+ wait = retry .wait_fixed (0.5 ),
156+ reraise = True ,
157+ )
158+ def _assert_scheduler_client_called_once_with (
159+ scheduler_rabbit_client_parser : mock .AsyncMock ,
160+ expected_message : SchedulePipelineRabbitMessage ,
161+ ):
162+ scheduler_rabbit_client_parser .assert_called_once_with (expected_message .body ())
163+
164+
165+ def _assert_scheduler_client_not_called (
166+ scheduler_rabbit_client_parser : mock .AsyncMock ,
167+ ):
168+ @retry (
169+ retry = retry_unless_exception_type (AssertionError ),
170+ stop = retry .stop_after_attempt (2 ),
171+ wait = retry .wait_fixed (1 ),
172+ reraise = True ,
173+ ) # pylint: disable=unused-variable
174+ def _ ():
175+ scheduler_rabbit_client_parser .assert_not_called ()
176+
177+ with contextlib .suppress (RetryError ):
178+ _ ()
179+
180+
145181async def test_schedule_all_pipelines (
146182 with_disabled_auto_scheduling : mock .Mock ,
147183 with_disabled_scheduler_worker : mock .Mock ,
@@ -164,12 +200,13 @@ async def test_schedule_all_pipelines(
164200 collection_run_id = fake_collection_run_id ,
165201 )
166202 # this directly schedule a new pipeline
167- scheduler_rabbit_client_parser .assert_called_once_with (
203+ _assert_scheduler_client_called_once_with (
204+ scheduler_rabbit_client_parser ,
168205 SchedulePipelineRabbitMessage (
169206 user_id = published_project .project .prj_owner ,
170207 project_id = published_project .project .uuid ,
171208 iteration = 1 ,
172- ). body ()
209+ ),
173210 )
174211 scheduler_rabbit_client_parser .reset_mock ()
175212 comp_run = (await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 ))[0 ]
@@ -186,7 +223,7 @@ async def test_schedule_all_pipelines(
186223
187224 # this will now not schedule the pipeline since it was already scheduled
188225 await schedule_all_pipelines (initialized_app )
189- scheduler_rabbit_client_parser . assert_not_called ( )
226+ _assert_scheduler_client_not_called ( scheduler_rabbit_client_parser )
190227 comp_runs = await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
191228 comp_run = comp_runs [0 ]
192229 assert comp_run .scheduled
@@ -206,12 +243,13 @@ async def test_schedule_all_pipelines(
206243 # now we schedule a pipeline again, but we wait for the scheduler interval to pass
207244 # this will trigger a new schedule
208245 await schedule_all_pipelines (initialized_app )
209- scheduler_rabbit_client_parser .assert_called_once_with (
246+ _assert_scheduler_client_called_once_with (
247+ scheduler_rabbit_client_parser ,
210248 SchedulePipelineRabbitMessage (
211249 user_id = published_project .project .prj_owner ,
212250 project_id = published_project .project .uuid ,
213251 iteration = 1 ,
214- ). body ()
252+ ),
215253 )
216254 scheduler_rabbit_client_parser .reset_mock ()
217255 comp_runs = await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
@@ -229,12 +267,13 @@ async def test_schedule_all_pipelines(
229267 project_id = published_project .project .uuid ,
230268 )
231269 await schedule_all_pipelines (initialized_app )
232- scheduler_rabbit_client_parser .assert_called_once_with (
270+ _assert_scheduler_client_called_once_with (
271+ scheduler_rabbit_client_parser ,
233272 SchedulePipelineRabbitMessage (
234273 user_id = published_project .project .prj_owner ,
235274 project_id = published_project .project .uuid ,
236275 iteration = 1 ,
237- ). body ()
276+ ),
238277 )
239278 scheduler_rabbit_client_parser .reset_mock ()
240279 comp_runs = await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
@@ -267,12 +306,13 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
267306 collection_run_id = fake_collection_run_id ,
268307 )
269308 # this directly schedule a new pipeline
270- scheduler_rabbit_client_parser .assert_called_once_with (
309+ _assert_scheduler_client_called_once_with (
310+ scheduler_rabbit_client_parser ,
271311 SchedulePipelineRabbitMessage (
272312 user_id = published_project .project .prj_owner ,
273313 project_id = published_project .project .uuid ,
274314 iteration = 1 ,
275- ). body ()
315+ ),
276316 )
277317 scheduler_rabbit_client_parser .reset_mock ()
278318 comp_run = (await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 ))[0 ]
@@ -288,7 +328,7 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
288328
289329 # this will now not schedule the pipeline since it was already scheduled
290330 await schedule_all_pipelines (initialized_app )
291- scheduler_rabbit_client_parser . assert_not_called ( )
331+ _assert_scheduler_client_not_called ( scheduler_rabbit_client_parser )
292332 comp_runs = await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
293333 comp_run = comp_runs [0 ]
294334 assert comp_run .scheduled == start_schedule_time , "scheduled time changed!"
@@ -308,12 +348,13 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
308348 assert (
309349 "found 1 lost pipelines, they will be re-scheduled now" in caplog .messages
310350 )
311- scheduler_rabbit_client_parser .assert_called_once_with (
351+ _assert_scheduler_client_called_once_with (
352+ scheduler_rabbit_client_parser ,
312353 SchedulePipelineRabbitMessage (
313354 user_id = published_project .project .prj_owner ,
314355 project_id = published_project .project .uuid ,
315356 iteration = 1 ,
316- ). body ()
357+ ),
317358 )
318359 scheduler_rabbit_client_parser .reset_mock ()
319360 comp_runs = await assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
@@ -353,7 +394,7 @@ async def test_empty_pipeline_is_not_scheduled(
353394 collection_run_id = fake_collection_run_id ,
354395 )
355396 await assert_comp_runs_empty (sqlalchemy_async_engine )
356- scheduler_rabbit_client_parser . assert_not_called ( )
397+ _assert_scheduler_client_not_called ( scheduler_rabbit_client_parser )
357398
358399 # create the empty pipeline now
359400 await create_pipeline (project_id = f"{ empty_project .uuid } " )
@@ -371,4 +412,4 @@ async def test_empty_pipeline_is_not_scheduled(
371412 assert len (caplog .records ) == 1
372413 assert "no computational dag defined" in caplog .records [0 ].message
373414 await assert_comp_runs_empty (sqlalchemy_async_engine )
374- scheduler_rabbit_client_parser . assert_not_called ( )
415+ _assert_scheduler_client_not_called ( scheduler_rabbit_client_parser )
0 commit comments