Skip to content

Commit 20a4cc0

Browse files
committed
added setting to control scheduling concurrency
1 parent 8530b71 commit 20a4cc0

File tree

3 files changed

+46
-15
lines changed

3 files changed

+46
-15
lines changed

services/director-v2/src/simcore_service_director_v2/core/settings.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,7 @@
88

99
from common_library.pydantic_validators import validate_numeric_string_as_timedelta
1010
from fastapi import FastAPI
11-
from models_library.basic_types import (
12-
LogLevel,
13-
PortInt,
14-
VersionTag,
15-
)
11+
from models_library.basic_types import LogLevel, PortInt, VersionTag
1612
from models_library.clusters import (
1713
DEFAULT_CLUSTER_ID,
1814
Cluster,
@@ -26,6 +22,7 @@
2622
AnyUrl,
2723
Field,
2824
NonNegativeInt,
25+
PositiveInt,
2926
field_validator,
3027
)
3128
from servicelib.logging_utils_filtering import LoggerName, MessageSubstring
@@ -77,6 +74,10 @@ class ComputationalBackendSettings(BaseCustomSettings):
7774
COMPUTATIONAL_BACKEND_ENABLED: bool = Field(
7875
default=True,
7976
)
77+
COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: PositiveInt = Field(
78+
default=50,
79+
description="defines how many pipelines the application can schedule concurrently",
80+
)
8081
COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = Field(
8182
default=True,
8283
)

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import contextlib
23
import functools
34
import logging
@@ -10,6 +11,7 @@
1011
from servicelib.redis import CouldNotAcquireLockError
1112
from servicelib.redis_utils import exclusive
1213

14+
from ...core.settings import get_application_settings
1315
from ...models.comp_runs import Iteration
1416
from ..rabbitmq import get_rabbitmq_client
1517
from ._constants import MODULE_NAME_WORKER
@@ -62,17 +64,31 @@ async def _handle_apply_distributed_schedule(app: FastAPI, data: bytes) -> bool:
6264

6365

6466
async def setup_worker(app: FastAPI) -> None:
67+
app_settings = get_application_settings(app)
6568
rabbitmq_client = get_rabbitmq_client(app)
66-
await rabbitmq_client.subscribe(
67-
SchedulePipelineRabbitMessage.get_channel_name(),
68-
functools.partial(_handle_apply_distributed_schedule, app),
69-
exclusive_queue=False,
69+
app.state.scheduler_worker_consumers = await asyncio.gather(
70+
*(
71+
rabbitmq_client.subscribe(
72+
SchedulePipelineRabbitMessage.get_channel_name(),
73+
functools.partial(_handle_apply_distributed_schedule, app),
74+
exclusive_queue=False,
75+
)
76+
for _ in range(
77+
app_settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND.COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY
78+
)
79+
)
7080
)
7181

7282
app.state.scheduler_worker = create_scheduler(app)
7383

7484

7585
async def shutdown_worker(app: FastAPI) -> None:
7686
assert app.state.scheduler_worker # nosec
77-
# TODO: we might need to cancel stuff here. not sure yet what
78-
# unsubscribing is maybe not a good idea if we want to keep the data in the queue
87+
rabbitmq_client = get_rabbitmq_client(app)
88+
await asyncio.gather(
89+
*(
90+
rabbitmq_client.unsubscribe_consumer(*consumer)
91+
for consumer in app.state.scheduler_worker_consumers
92+
),
93+
return_exceptions=False,
94+
)

services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@
88
# pylint: disable=too-many-statements
99

1010
import asyncio
11-
from typing import Awaitable, Callable
11+
from collections.abc import Awaitable, Callable
1212
from unittest import mock
1313

1414
import pytest
1515
from _helpers import PublishedProject
1616
from fastapi import FastAPI
1717
from models_library.clusters import DEFAULT_CLUSTER_ID
1818
from pytest_mock import MockerFixture
19+
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
20+
from pytest_simcore.helpers.typing_env import EnvVarsDict
1921
from simcore_service_director_v2.models.comp_runs import RunMetadataDict
2022
from simcore_service_director_v2.modules.comp_scheduler._manager import run_new_pipeline
2123
from simcore_service_director_v2.modules.comp_scheduler._models import (
@@ -83,10 +85,23 @@ async def mocked_scheduler_api(mocker: MockerFixture) -> mock.Mock:
8385
)
8486

8587

88+
@pytest.fixture
89+
def with_scheduling_concurrency(
90+
mock_env: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, scheduling_concurrency: int
91+
) -> EnvVarsDict:
92+
return mock_env | setenvs_from_dict(
93+
monkeypatch,
94+
{"COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY": f"{scheduling_concurrency}"},
95+
)
96+
97+
98+
@pytest.mark.parametrize("scheduling_concurrency", [1, 50, 100])
8699
@pytest.mark.parametrize(
87100
"queue_name", [SchedulePipelineRabbitMessage.get_channel_name()]
88101
)
89102
async def test_worker_scheduling_parallelism(
103+
scheduling_concurrency: int,
104+
with_scheduling_concurrency: EnvVarsDict,
90105
with_disabled_auto_scheduling: mock.Mock,
91106
mocked_scheduler_api: mock.Mock,
92107
initialized_app: FastAPI,
@@ -113,9 +128,8 @@ async def _project_pipeline_creation_workflow() -> None:
113128
use_on_demand_clusters=False,
114129
)
115130

116-
num_concurrent_calls = 10
117131
await asyncio.gather(
118-
*(_project_pipeline_creation_workflow() for _ in range(num_concurrent_calls))
132+
*(_project_pipeline_creation_workflow() for _ in range(scheduling_concurrency))
119133
)
120134
mocked_scheduler_api.assert_called()
121-
assert mocked_scheduler_api.call_count == num_concurrent_calls
135+
assert mocked_scheduler_api.call_count == scheduling_concurrency

0 commit comments

Comments
 (0)