7171 BaseCompScheduler ,
7272 get_scheduler ,
7373)
74- from simcore_service_director_v2 .modules .comp_scheduler ._base_scheduler import (
75- ScheduledPipelineParams ,
76- )
7774from simcore_service_director_v2 .modules .comp_scheduler ._dask_scheduler import (
7875 DaskScheduler ,
7976)
@@ -165,14 +162,14 @@ async def schedule_all_pipelines(scheduler: BaseCompScheduler) -> None:
165162 # misconfigured pipelines that would be removed from the scheduler
166163 # NOTE: we simulate multiple dv-2 replicas by running several times
167164 # the same pipeline scheduling
168- local_pipelines = deepcopy (scheduler .scheduled_pipelines )
165+ local_pipelines = deepcopy (scheduler ._scheduled_pipelines ) # noqa: SLF001
169166 results = await asyncio .gather (
170167 * (
171168 scheduler ._schedule_pipeline ( # noqa: SLF001
172169 user_id = user_id ,
173170 project_id = project_id ,
174171 iteration = iteration ,
175- pipeline_params = params ,
172+ wake_up_callback = params . scheduler_waker . set ,
176173 )
177174 for _ in range (3 )
178175 for (
@@ -256,15 +253,16 @@ def mocked_clean_task_output_fct(mocker: MockerFixture) -> mock.MagicMock:
256253
257254@pytest .fixture
258255def with_disabled_auto_scheduling (mocker : MockerFixture ) -> mock .MagicMock :
259- """disables the scheduler task, note that it needs to be triggered manually then"""
256+ """disables the scheduler task, note that it needs to be triggered manu>ally then"""
260257
261258 def _fake_starter (
262259 self : BaseCompScheduler ,
263- pipeline_params : ScheduledPipelineParams ,
264260 * args ,
265261 ** kwargs ,
266- ) -> None :
267- pipeline_params .scheduler_task = mocker .MagicMock ()
262+ ):
263+ scheduler_task = mocker .MagicMock ()
264+ scheduler_task_wake_up_event = mocker .MagicMock ()
265+ return scheduler_task , scheduler_task_wake_up_event
268266
269267 return mocker .patch (
270268 "simcore_service_director_v2.modules.comp_scheduler._base_scheduler.BaseCompScheduler._start_scheduling" ,
@@ -358,7 +356,7 @@ async def test_empty_pipeline_is_not_scheduled(
358356 run_metadata = run_metadata ,
359357 use_on_demand_clusters = False ,
360358 )
361- assert len (scheduler .scheduled_pipelines ) == 0
359+ assert len (scheduler ._scheduled_pipelines ) == 0 # noqa: SLF001
362360 # check the database is empty
363361 async with aiopg_engine .acquire () as conn :
364362 result = await conn .scalar (
@@ -397,8 +395,12 @@ async def test_misconfigured_pipeline_is_not_scheduled(
397395 run_metadata = run_metadata ,
398396 use_on_demand_clusters = False ,
399397 )
400- assert len (scheduler .scheduled_pipelines ) == 1
401- for (u_id , p_id , it ), params in scheduler .scheduled_pipelines .items ():
398+ assert len (scheduler ._scheduled_pipelines ) == 1 # noqa: SLF001
399+ for (
400+ u_id ,
401+ p_id ,
402+ it ,
403+ ), params in scheduler ._scheduled_pipelines .items (): # noqa: SLF001
402404 assert u_id == user ["id" ]
403405 assert p_id == sleepers_project .uuid
404406 assert it > 0
@@ -415,7 +417,7 @@ async def test_misconfigured_pipeline_is_not_scheduled(
415417 # let the scheduler kick in
416418 await schedule_all_pipelines (scheduler )
417419 # check the scheduled pipelines is again empty since it's misconfigured
418- assert len (scheduler .scheduled_pipelines ) == 0
420+ assert len (scheduler ._scheduled_pipelines ) == 0 # noqa: SLF001
419421 # check the database entry is correctly updated
420422 async with aiopg_engine .acquire () as conn :
421423 result = await conn .execute (
@@ -444,12 +446,17 @@ async def _assert_start_pipeline(
444446 run_metadata = run_metadata ,
445447 use_on_demand_clusters = False ,
446448 )
447- assert len (scheduler .scheduled_pipelines ) == 1 , "the pipeline is not scheduled!"
448- for (u_id , p_id , it ), params in scheduler .scheduled_pipelines .items ():
449+ assert (
450+ len (scheduler ._scheduled_pipelines ) == 1 # noqa: SLF001
451+ ), "the pipeline is not scheduled!"
452+ for (
453+ u_id ,
454+ p_id ,
455+ it ,
456+ ) in scheduler ._scheduled_pipelines : # noqa: SLF001
449457 assert u_id == published_project .project .prj_owner
450458 assert p_id == published_project .project .uuid
451459 assert it > 0
452- assert params .run_metadata == run_metadata
453460
454461 # check the database is correctly updated, the run is published
455462 await _assert_comp_run_db (aiopg_engine , published_project , RunningState .PUBLISHED )
@@ -1017,7 +1024,7 @@ async def _return_3rd_task_success(job_ids: list[str]) -> list[DaskClientTaskSta
10171024 assert isinstance (messages [1 ], RabbitResourceTrackingStoppedMessage )
10181025
10191026 # the scheduled pipeline shall be removed
1020- assert scheduler .scheduled_pipelines == {}
1027+ assert scheduler ._scheduled_pipelines == {} # noqa: SLF001
10211028
10221029
10231030async def test_task_progress_triggers (
0 commit comments