99
1010
1111import asyncio
12+ import logging
1213from collections .abc import AsyncIterator , Callable
14+ from typing import Any , Awaitable
1315from unittest import mock
1416
1517import pytest
1618import sqlalchemy as sa
1719from _helpers import PublishedProject
1820from fastapi import FastAPI
1921from models_library .clusters import DEFAULT_CLUSTER_ID
22+ from models_library .projects import ProjectAtDB
23+ from models_library .projects_state import RunningState
2024from pytest_mock .plugin import MockerFixture
2125from pytest_simcore .helpers .monkeypatch_envs import setenvs_from_dict
2226from pytest_simcore .helpers .typing_env import EnvVarsDict
2630from settings_library .rabbit import RabbitSettings
2731from settings_library .redis import RedisSettings
2832from simcore_postgres_database .models .comp_runs import comp_runs
33+ from simcore_service_director_v2 .core .errors import PipelineNotFoundError
34+ from simcore_service_director_v2 .models .comp_pipelines import CompPipelineAtDB
2935from simcore_service_director_v2 .models .comp_runs import CompRunsAtDB , RunMetadataDict
3036from simcore_service_director_v2 .modules .comp_scheduler ._distributed_scheduler import (
37+ _SCHEDULER_INTERVAL ,
3138 run_new_pipeline ,
3239 schedule_pipelines ,
40+ stop_pipeline ,
3341)
3442from simcore_service_director_v2 .modules .comp_scheduler ._models import (
3543 SchedulePipelineRabbitMessage ,
@@ -135,7 +143,7 @@ async def slow_limited_gather(*args, **kwargs):
135143 mock_function .assert_called_once ()
136144
137145
138- async def test_schedule_pipelines_with_non_scheduled_runs (
146+ async def test_schedule_pipelines (
139147 initialized_app : FastAPI ,
140148 published_project : PublishedProject ,
141149 sqlalchemy_async_engine : AsyncEngine ,
@@ -153,11 +161,121 @@ async def test_schedule_pipelines_with_non_scheduled_runs(
153161 run_metadata = run_metadata ,
154162 use_on_demand_clusters = False ,
155163 )
164+ # this directly schedule a new pipeline
156165 scheduler_rabbit_client_parser .assert_called_once_with (
157166 SchedulePipelineRabbitMessage (
158167 user_id = published_project .project .prj_owner ,
159168 project_id = published_project .project .uuid ,
160169 iteration = 1 ,
161170 ).body ()
162171 )
172+ scheduler_rabbit_client_parser .reset_mock ()
163173 comp_runs = await _assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
174+ comp_run = comp_runs [0 ]
175+ assert comp_run .project_uuid == published_project .project .uuid
176+ assert comp_run .user_id == published_project .project .prj_owner
177+ assert comp_run .iteration == 1
178+ assert comp_run .cancelled is None
179+ assert comp_run .cluster_id == DEFAULT_CLUSTER_ID
180+ assert comp_run .metadata == run_metadata
181+ assert comp_run .result is RunningState .PUBLISHED
182+ assert comp_run .last_scheduled is not None
183+ start_schedule_time = comp_run .last_scheduled
184+ start_modified_time = comp_run .modified
185+
186+ await asyncio .sleep (_SCHEDULER_INTERVAL .total_seconds () - 1 )
187+
188+ # this will now not schedule the pipeline since it was last scheduled
189+ await schedule_pipelines (initialized_app )
190+ scheduler_rabbit_client_parser .assert_not_called ()
191+ comp_runs = await _assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
192+ comp_run = comp_runs [0 ]
193+ assert comp_run .last_scheduled == start_schedule_time , "scheduled time changed!"
194+ assert comp_run .cancelled is None
195+ assert comp_run .modified == start_modified_time
196+
197+ await asyncio .sleep (_SCHEDULER_INTERVAL .total_seconds () + 1 )
198+ # this will now schedule the pipeline since the time passed
199+ await schedule_pipelines (initialized_app )
200+ scheduler_rabbit_client_parser .assert_called_once_with (
201+ SchedulePipelineRabbitMessage (
202+ user_id = published_project .project .prj_owner ,
203+ project_id = published_project .project .uuid ,
204+ iteration = 1 ,
205+ ).body ()
206+ )
207+ scheduler_rabbit_client_parser .reset_mock ()
208+ comp_runs = await _assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
209+ comp_run = comp_runs [0 ]
210+ assert comp_run .last_scheduled is not None
211+ assert comp_run .last_scheduled > start_schedule_time
212+ last_schedule_time = comp_run .last_scheduled
213+ assert comp_run .cancelled is None
214+ assert comp_run .modified > start_modified_time
215+
216+ # now we stop the pipeline, which should instantly trigger a schedule
217+ await stop_pipeline (
218+ initialized_app ,
219+ user_id = published_project .project .prj_owner ,
220+ project_id = published_project .project .uuid ,
221+ )
222+ await schedule_pipelines (initialized_app )
223+ scheduler_rabbit_client_parser .assert_called_once_with (
224+ SchedulePipelineRabbitMessage (
225+ user_id = published_project .project .prj_owner ,
226+ project_id = published_project .project .uuid ,
227+ iteration = 1 ,
228+ ).body ()
229+ )
230+ scheduler_rabbit_client_parser .reset_mock ()
231+ comp_runs = await _assert_comp_runs (sqlalchemy_async_engine , expected_total = 1 )
232+ comp_run = comp_runs [0 ]
233+ assert comp_run .last_scheduled is not None
234+ assert comp_run .last_scheduled > last_schedule_time
235+ assert comp_run .cancelled is not None
236+
237+
238+ async def test_empty_pipeline_is_not_scheduled (
239+ initialized_app : FastAPI ,
240+ registered_user : Callable [..., dict [str , Any ]],
241+ project : Callable [..., Awaitable [ProjectAtDB ]],
242+ pipeline : Callable [..., CompPipelineAtDB ],
243+ run_metadata : RunMetadataDict ,
244+ sqlalchemy_async_engine : AsyncEngine ,
245+ scheduler_rabbit_client_parser : mock .AsyncMock ,
246+ caplog : pytest .LogCaptureFixture ,
247+ ):
248+ await _assert_comp_runs_empty (sqlalchemy_async_engine )
249+ user = registered_user ()
250+ empty_project = await project (user )
251+
252+ # the project is not in the comp_pipeline, therefore scheduling it should fail
253+ with pytest .raises (PipelineNotFoundError ):
254+ await run_new_pipeline (
255+ initialized_app ,
256+ user_id = user ["id" ],
257+ project_id = empty_project .uuid ,
258+ cluster_id = DEFAULT_CLUSTER_ID ,
259+ run_metadata = run_metadata ,
260+ use_on_demand_clusters = False ,
261+ )
262+ await _assert_comp_runs_empty (sqlalchemy_async_engine )
263+ scheduler_rabbit_client_parser .assert_not_called ()
264+
265+ # create the empty pipeline now
266+ pipeline (project_id = f"{ empty_project .uuid } " )
267+
268+ # creating a run with an empty pipeline is useless, check the scheduler is not kicking in
269+ with caplog .at_level (logging .WARNING ):
270+ await run_new_pipeline (
271+ initialized_app ,
272+ user_id = user ["id" ],
273+ project_id = empty_project .uuid ,
274+ cluster_id = DEFAULT_CLUSTER_ID ,
275+ run_metadata = run_metadata ,
276+ use_on_demand_clusters = False ,
277+ )
278+ assert len (caplog .records ) == 1
279+ assert "no computational dag defined" in caplog .records [0 ].message
280+ await _assert_comp_runs_empty (sqlalchemy_async_engine )
281+ scheduler_rabbit_client_parser .assert_not_called ()
0 commit comments