Skip to content

Commit b6428ee

Browse files
committed
distribute schedule all
1 parent 6843190 commit b6428ee

File tree

2 files changed

+27
-6
lines changed

2 files changed

+27
-6
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313

1414
import asyncio
15+
import contextlib
1516
import datetime
1617
import logging
1718
from abc import ABC, abstractmethod
@@ -31,7 +32,7 @@
3132
from pydantic import PositiveInt
3233
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
3334
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
34-
from servicelib.redis import RedisClientSDK
35+
from servicelib.redis import CouldNotAcquireLockError, RedisClientSDK
3536
from servicelib.redis_utils import exclusive
3637
from servicelib.utils import limited_gather
3738

@@ -231,10 +232,27 @@ async def stop_pipeline(
231232

232233
async def schedule_all_pipelines(self) -> None:
233234
self.wake_up_event.clear()
234-
# if one of the task throws, the other are NOT cancelled which is what we want
235+
# this task might be distributed among multiple replicas of director-v2,
236+
# we do not care if CouldNotAcquireLockError raises as that means another dv-2 is taking
237+
# care of it
238+
239+
async def _distributed_schedule_pipeline(
240+
user_id: UserID,
241+
project_id: ProjectID,
242+
iteration: Iteration,
243+
pipeline_params: ScheduledPipelineParams,
244+
) -> None:
245+
with contextlib.suppress(CouldNotAcquireLockError):
246+
return await self._schedule_pipeline(
247+
user_id=user_id,
248+
project_id=project_id,
249+
iteration=iteration,
250+
pipeline_params=pipeline_params,
251+
)
252+
235253
await limited_gather(
236254
*(
237-
self._schedule_pipeline(
255+
_distributed_schedule_pipeline(
238256
user_id=user_id,
239257
project_id=project_id,
240258
iteration=iteration,
@@ -246,7 +264,6 @@ async def schedule_all_pipelines(self) -> None:
246264
iteration,
247265
), pipeline_params in self.scheduled_pipelines.items()
248266
),
249-
reraise=False,
250267
log=_logger,
251268
limit=40,
252269
tasks_group_prefix="computational-scheduled-pipeline",

services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,12 @@ async def _assert_comp_tasks_db(
156156

157157

158158
async def run_comp_scheduler(scheduler: BaseCompScheduler) -> None:
159-
await scheduler.schedule_all_pipelines()
159+
# NOTE: this simulates having 3 schedulers running in parallel
160+
await asyncio.gather(
161+
scheduler.schedule_all_pipelines(),
162+
scheduler.schedule_all_pipelines(),
163+
scheduler.schedule_all_pipelines(),
164+
)
160165

161166

162167
@pytest.fixture
@@ -185,7 +190,6 @@ def minimal_dask_scheduler_config(
185190
def scheduler(
186191
minimal_dask_scheduler_config: None,
187192
aiopg_engine: aiopg.sa.engine.Engine,
188-
# dask_spec_local_cluster: SpecCluster,
189193
minimal_app: FastAPI,
190194
) -> BaseCompScheduler:
191195
assert minimal_app.state.scheduler is not None

0 commit comments

Comments
 (0)