Skip to content

Commit ab0735f

Browse files
committed
ongoing new scheduler
1 parent 5350b3f commit ab0735f

File tree

4 files changed

+67
-16
lines changed

4 files changed

+67
-16
lines changed

services/director-v2/src/simcore_service_director_v2/core/settings.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
import datetime
66
from functools import cached_property
7+
from typing import cast
78

9+
from fastapi import FastAPI
810
from models_library.basic_types import (
911
BootModeEnum,
1012
BuildTargetEnum,
@@ -240,3 +242,7 @@ class AppSettings(BaseCustomSettings, MixinLoggingSettings):
240242
def _validate_loglevel(cls, value: str) -> str:
241243
log_level: str = cls.validate_log_level(value)
242244
return log_level
245+
246+
247+
def get_application_settings(app: FastAPI) -> AppSettings:
248+
return cast(AppSettings, app.state.settings)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import datetime
2+
from typing import Final
3+
4+
from fastapi import FastAPI
5+
from servicelib.redis import RedisClientSDK
6+
from servicelib.redis_utils import exclusive
7+
from settings_library.redis import RedisDatabase
8+
9+
from ...core.settings import get_application_settings
10+
from ...utils.comp_scheduler import SCHEDULED_STATES
11+
from ..db import get_db_engine
12+
from ..db.repositories.comp_runs import CompRunsRepository
13+
from ..redis import get_redis_client_manager
14+
15+
_SCHEDULER_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5)
16+
17+
18+
def _redis_client_getter(*args, **kwargs) -> RedisClientSDK:
19+
assert kwargs # nosec
20+
app = args[0]
21+
assert isinstance(app, FastAPI) # nosec
22+
return get_redis_client_manager(app).client(RedisDatabase.LOCKS)
23+
24+
25+
@exclusive(_redis_client_getter, lock_key="computational-distributed-scheduler")
26+
async def schedule_pipelines(app: FastAPI) -> None:
27+
app_settings = get_application_settings(app)
28+
db_engine = get_db_engine(app)
29+
runs_to_schedule = CompRunsRepository.instance(db_engine).list(
30+
filter_by_state=SCHEDULED_STATES, scheduled_since=_SCHEDULER_INTERVAL
31+
)

services/director-v2/src/simcore_service_director_v2/modules/db/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from typing import cast
2+
3+
from aiopg.sa import Engine
14
from fastapi import FastAPI
25
from settings_library.postgres import PostgresSettings
36

@@ -13,3 +16,7 @@ async def on_shutdown() -> None:
1316

1417
app.add_event_handler("startup", on_startup)
1518
app.add_event_handler("shutdown", on_shutdown)
19+
20+
21+
def get_db_engine(app: FastAPI) -> Engine:
22+
return cast(Engine, app.state.engine)

services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_runs.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import datetime
22
import logging
3-
from collections import deque
43
from typing import Any
54

65
import arrow
@@ -55,24 +54,32 @@ async def get(
5554
return CompRunsAtDB.from_orm(row)
5655

5756
async def list(
58-
self, filter_by_state: set[RunningState] | None = None
57+
self,
58+
filter_by_state: set[RunningState] | None = None,
59+
scheduled_since: datetime.timedelta | None = None,
5960
) -> list[CompRunsAtDB]:
60-
if not filter_by_state:
61-
filter_by_state = set()
62-
runs_in_db: deque[CompRunsAtDB] = deque()
61+
62+
conditions = []
63+
if filter_by_state:
64+
conditions.append(
65+
or_(
66+
*[
67+
comp_runs.c.result == RUNNING_STATE_TO_DB[s]
68+
for s in filter_by_state
69+
]
70+
)
71+
)
72+
if scheduled_since is not None:
73+
scheduled_cutoff = arrow.utcnow().datetime - scheduled_since
74+
conditions.append(comp_runs.c.last_scheduled >= scheduled_cutoff)
75+
6376
async with self.db_engine.acquire() as conn:
64-
async for row in conn.execute(
65-
sa.select(comp_runs).where(
66-
or_(
67-
*[
68-
comp_runs.c.result == RUNNING_STATE_TO_DB[s]
69-
for s in filter_by_state
70-
]
71-
)
77+
return [
78+
CompRunsAtDB.from_orm(row)
79+
async for row in conn.execute(
80+
sa.select(comp_runs).where(sa.and_(*conditions))
7281
)
73-
):
74-
runs_in_db.append(CompRunsAtDB.from_orm(row))
75-
return list(runs_in_db)
82+
]
7683

7784
async def create(
7885
self,

0 commit comments

Comments
 (0)