Skip to content

Commit 3642829

Browse files
GitHKAndrei Neagu
andauthored
🎨 adding new scheduling mode to dynamic-scheduler ⚠️ (#6889)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent e317f6b commit 3642829

File tree

7 files changed

+102
-25
lines changed

7 files changed

+102
-25
lines changed

.env-devel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ DIRECTOR_V2_TRACING={}
126126
# DYNAMIC_SCHEDULER ----
127127
DYNAMIC_SCHEDULER_LOGLEVEL=DEBUG
128128
DYNAMIC_SCHEDULER_PROFILING=1
129+
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER=0
129130
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT=01:00:00
130131
DYNAMIC_SCHEDULER_TRACING={}
131132
DYNAMIC_SCHEDULER_UI_STORAGE_SECRET=adminadmin

services/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ services:
562562
REDIS_PASSWORD: ${REDIS_PASSWORD}
563563
DIRECTOR_V2_HOST: ${DIRECTOR_V2_HOST}
564564
DIRECTOR_V2_PORT: ${DIRECTOR_V2_PORT}
565+
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: ${DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER}
565566
DYNAMIC_SCHEDULER_LOGLEVEL: ${DYNAMIC_SCHEDULER_LOGLEVEL}
566567
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: ${DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT}
567568
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
ServiceWasNotFoundError,
1313
)
1414

15-
from ...core.settings import ApplicationSettings
16-
from ...services.director_v2 import DirectorV2Client
17-
from ...services.service_tracker import set_request_as_running, set_request_as_stopped
15+
from ...services import scheduler_interface
1816

1917
router = RPCRouter()
2018

@@ -23,23 +21,16 @@
2321
async def get_service_status(
2422
app: FastAPI, *, node_id: NodeID
2523
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
26-
director_v2_client = DirectorV2Client.get_from_app_state(app)
27-
response: NodeGet | DynamicServiceGet | NodeGetIdle = (
28-
await director_v2_client.get_status(node_id)
29-
)
30-
return response
24+
return await scheduler_interface.get_service_status(app, node_id=node_id)
3125

3226

3327
@router.expose()
3428
async def run_dynamic_service(
3529
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
3630
) -> NodeGet | DynamicServiceGet:
37-
director_v2_client = DirectorV2Client.get_from_app_state(app)
38-
response: NodeGet | DynamicServiceGet = (
39-
await director_v2_client.run_dynamic_service(dynamic_service_start)
31+
return await scheduler_interface.run_dynamic_service(
32+
app, dynamic_service_start=dynamic_service_start
4033
)
41-
await set_request_as_running(app, dynamic_service_start)
42-
return response
4334

4435

4536
@router.expose(
@@ -51,12 +42,6 @@ async def run_dynamic_service(
5142
async def stop_dynamic_service(
5243
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
5344
) -> None:
54-
director_v2_client = DirectorV2Client.get_from_app_state(app)
55-
settings: ApplicationSettings = app.state.settings
56-
await director_v2_client.stop_dynamic_service(
57-
node_id=dynamic_service_stop.node_id,
58-
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
59-
save_state=dynamic_service_stop.save_state,
60-
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
45+
return await scheduler_interface.stop_dynamic_service(
46+
app, dynamic_service_stop=dynamic_service_stop
6147
)
62-
await set_request_as_stopped(app, dynamic_service_stop)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
6868
),
6969
)
7070

71+
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
72+
default=False,
73+
description=(
74+
"this is a way to switch between different dynamic schedulers for the new style services"
75+
# NOTE: this option should be removed when the scheduling will be done via this service
76+
),
77+
)
78+
7179
@field_validator("DYNAMIC_SCHEDULER_LOGLEVEL", mode="before")
7280
@classmethod
7381
def _validate_log_level(cls, value: str) -> str:
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from fastapi import FastAPI
2+
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
3+
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
4+
DynamicServiceStart,
5+
DynamicServiceStop,
6+
)
7+
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
8+
from models_library.projects_nodes_io import NodeID
9+
10+
from ..core.settings import ApplicationSettings
11+
from .director_v2 import DirectorV2Client
12+
from .service_tracker import set_request_as_running, set_request_as_stopped
13+
14+
15+
async def get_service_status(
16+
app: FastAPI, *, node_id: NodeID
17+
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
18+
settings: ApplicationSettings = app.state.settings
19+
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
20+
raise NotImplementedError
21+
22+
director_v2_client = DirectorV2Client.get_from_app_state(app)
23+
response: NodeGet | DynamicServiceGet | NodeGetIdle = (
24+
await director_v2_client.get_status(node_id)
25+
)
26+
return response
27+
28+
29+
async def run_dynamic_service(
30+
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
31+
) -> NodeGet | DynamicServiceGet:
32+
settings: ApplicationSettings = app.state.settings
33+
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
34+
raise NotImplementedError
35+
36+
director_v2_client = DirectorV2Client.get_from_app_state(app)
37+
response: NodeGet | DynamicServiceGet = (
38+
await director_v2_client.run_dynamic_service(dynamic_service_start)
39+
)
40+
41+
await set_request_as_running(app, dynamic_service_start)
42+
return response
43+
44+
45+
async def stop_dynamic_service(
46+
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
47+
) -> None:
48+
settings: ApplicationSettings = app.state.settings
49+
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
50+
raise NotImplementedError
51+
52+
director_v2_client = DirectorV2Client.get_from_app_state(app)
53+
await director_v2_client.stop_dynamic_service(
54+
node_id=dynamic_service_stop.node_id,
55+
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
56+
save_state=dynamic_service_stop.save_state,
57+
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
58+
)
59+
60+
await set_request_as_stopped(app, dynamic_service_stop)

services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID
1313
from servicelib.deferred_tasks._base_deferred_handler import DeferredContext
1414

15-
from .. import service_tracker
16-
from ..director_v2 import DirectorV2Client
15+
from .. import scheduler_interface, service_tracker
1716
from ..notifier import notify_service_status_change
1817

1918
_logger = logging.getLogger(__name__)
@@ -47,9 +46,8 @@ async def run(
4746
app: FastAPI = context["app"]
4847
node_id: NodeID = context["node_id"]
4948

50-
director_v2_client: DirectorV2Client = DirectorV2Client.get_from_app_state(app)
5149
service_status: NodeGet | RunningDynamicServiceDetails | NodeGetIdle = (
52-
await director_v2_client.get_status(node_id)
50+
await scheduler_interface.get_service_status(app, node_id=node_id)
5351
)
5452
_logger.debug(
5553
"Service status type=%s, %s", type(service_status), service_status

services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from models_library.users import UserID
2121
from pydantic import TypeAdapter
2222
from pytest_mock import MockerFixture
23+
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
2324
from pytest_simcore.helpers.typing_env import EnvVarsDict
2425
from servicelib.rabbitmq import RabbitMQRPCClient, RPCServerError
2526
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler import services
@@ -133,12 +134,35 @@ def mock_director_v2_service_state(
133134
yield None
134135

135136

137+
@pytest.fixture(
138+
params=[
139+
False,
140+
pytest.param(
141+
True,
142+
marks=pytest.mark.xfail(
143+
reason="INTERNAL scheduler implementation is missing"
144+
),
145+
),
146+
]
147+
)
148+
def use_internal_scheduler(request: pytest.FixtureRequest) -> bool:
149+
return request.param
150+
151+
136152
@pytest.fixture
137153
def app_environment(
138154
app_environment: EnvVarsDict,
139155
rabbit_service: RabbitSettings,
140156
redis_service: RedisSettings,
157+
use_internal_scheduler: bool,
158+
monkeypatch: pytest.MonkeyPatch,
141159
) -> EnvVarsDict:
160+
setenvs_from_dict(
161+
monkeypatch,
162+
{
163+
"DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER": f"{use_internal_scheduler}",
164+
},
165+
)
142166
return app_environment
143167

144168

0 commit comments

Comments
 (0)