Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ DIRECTOR_V2_TRACING={}
# DYNAMIC_SCHEDULER ----
DYNAMIC_SCHEDULER_LOGLEVEL=DEBUG
DYNAMIC_SCHEDULER_PROFILING=1
DYNAMIC_SCHEDULER_SCHEDULING_MODE=VIA_DIRECTOR_V2
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT=01:00:00
DYNAMIC_SCHEDULER_TRACING={}

Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ services:
REDIS_PASSWORD: ${REDIS_PASSWORD}
DIRECTOR_V2_HOST: ${DIRECTOR_V2_HOST}
DIRECTOR_V2_PORT: ${DIRECTOR_V2_PORT}
DYNAMIC_SCHEDULER_SCHEDULING_MODE: ${DYNAMIC_SCHEDULER_SCHEDULING_MODE}
DYNAMIC_SCHEDULER_LOGLEVEL: ${DYNAMIC_SCHEDULER_LOGLEVEL}
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: ${DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT}
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
ServiceWasNotFoundError,
)

from ...core.settings import ApplicationSettings
from ...services.director_v2 import DirectorV2Client
from ...services.service_tracker import set_request_as_running, set_request_as_stopped
from ...services import scheduler_interface

router = RPCRouter()

Expand All @@ -23,23 +21,16 @@
async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet | NodeGetIdle = (
await director_v2_client.get_status(node_id)
)
return response
return await scheduler_interface.get_service_status(app, node_id=node_id)


@router.expose()
async def run_dynamic_service(
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet = (
await director_v2_client.run_dynamic_service(dynamic_service_start)
return await scheduler_interface.run_dynamic_service(
app, dynamic_service_start=dynamic_service_start
)
await set_request_as_running(app, dynamic_service_start)
return response


@router.expose(
Expand All @@ -51,12 +42,6 @@ async def run_dynamic_service(
async def stop_dynamic_service(
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> None:
director_v2_client = DirectorV2Client.get_from_app_state(app)
settings: ApplicationSettings = app.state.settings
await director_v2_client.stop_dynamic_service(
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
return await scheduler_interface.stop_dynamic_service(
app, dynamic_service_stop=dynamic_service_stop
)
await set_request_as_stopped(app, dynamic_service_stop)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from enum import StrEnum
from typing import Annotated

from pydantic import AliasChoices, Field, TypeAdapter, field_validator
Expand All @@ -14,6 +15,11 @@
from .._meta import API_VERSION, API_VTAG, PROJECT_NAME


class SchedulingMode(StrEnum):
INTERNAL = "INTERNAL"
VIA_DIRECTOR_V2 = "VIA_DIRECTOR_V2"


class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
"""Base settings of any osparc service's app"""

Expand Down Expand Up @@ -68,6 +74,14 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
),
)

DYNAMIC_SCHEDULER_SCHEDULING_MODE: SchedulingMode = Field(
SchedulingMode.VIA_DIRECTOR_V2,
description=(
"this is a way to switch between different dynamic schedulers for the new style services"
# NOTE: this option should be removed when the scheduling will be done via this service
),
)

@field_validator("DYNAMIC_SCHEDULER_LOGLEVEL", mode="before")
@classmethod
def _validate_log_level(cls, value: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects_nodes_io import NodeID

from ..core.settings import ApplicationSettings, SchedulingMode
from .director_v2 import DirectorV2Client
from .service_tracker import set_request_as_running, set_request_as_stopped


async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_SCHEDULING_MODE == SchedulingMode.INTERNAL:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet | NodeGetIdle = (
await director_v2_client.get_status(node_id)
)
return response


async def run_dynamic_service(
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_SCHEDULING_MODE == SchedulingMode.INTERNAL:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet = (
await director_v2_client.run_dynamic_service(dynamic_service_start)
)

await set_request_as_running(app, dynamic_service_start)
return response


async def stop_dynamic_service(
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_SCHEDULING_MODE == SchedulingMode.INTERNAL:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
settings: ApplicationSettings = app.state.settings
await director_v2_client.stop_dynamic_service(
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
)

await set_request_as_stopped(app, dynamic_service_stop)
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID
from servicelib.deferred_tasks._base_deferred_handler import DeferredContext

from .. import service_tracker
from ..director_v2 import DirectorV2Client
from .. import scheduler_interface, service_tracker
from ..notifier import notify_service_status_change

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,9 +46,8 @@ async def run(
app: FastAPI = context["app"]
node_id: NodeID = context["node_id"]

director_v2_client: DirectorV2Client = DirectorV2Client.get_from_app_state(app)
service_status: NodeGet | RunningDynamicServiceDetails | NodeGetIdle = (
await director_v2_client.get_status(node_id)
await scheduler_interface.get_service_status(app, node_id=node_id)
)
_logger.debug(
"Service status type=%s, %s", type(service_status), service_status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from models_library.users import UserID
from pydantic import TypeAdapter
from pytest_mock import MockerFixture
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.rabbitmq import RabbitMQRPCClient, RPCServerError
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler import services
Expand All @@ -29,6 +30,7 @@
)
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from simcore_service_dynamic_scheduler.core.settings import SchedulingMode

pytest_simcore_core_services_selection = [
"redis",
Expand Down Expand Up @@ -133,12 +135,31 @@ def mock_director_v2_service_state(
yield None


@pytest.fixture(
params=[
SchedulingMode.VIA_DIRECTOR_V2,
# NOTE: enable below when INTERNAL scheduler is impelmented
# SchedulingMode.INTERNAL,
]
)
def scheduling_mode(request: pytest.FixtureRequest) -> SchedulingMode:
return request.param


@pytest.fixture
def app_environment(
app_environment: EnvVarsDict,
rabbit_service: RabbitSettings,
redis_service: RedisSettings,
scheduling_mode: SchedulingMode,
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
setenvs_from_dict(
monkeypatch,
{
"DYNAMIC_SCHEDULER_SCHEDULING_MODE": scheduling_mode,
},
)
return app_environment


Expand Down
Loading