Skip to content

Commit 0c3c040

Browse files
initial commit
1 parent d432c6c commit 0c3c040

File tree

11 files changed

+267
-2
lines changed

11 files changed

+267
-2
lines changed

packages/service-library/src/servicelib/job_scheduler/__init__.py

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from typing import Any
2+
3+
from ._models import JobName, JobStatus, ScheduleID, ScheduleResult
4+
5+
6+
class ClientInterface:
7+
async def schedule(self, job_name: JobName, **kwargs: Any) -> ScheduleID:
8+
pass
9+
10+
async def cancel(self, schedule_id: ScheduleID) -> None:
11+
pass
12+
13+
async def status(self, schedule_id: ScheduleID) -> JobStatus:
14+
pass
15+
16+
async def result(self, schedule_id: ScheduleID) -> ScheduleResult:
17+
pass
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from datetime import datetime, timezone
2+
from typing import Any
3+
4+
from servicelib.redis._client import RedisClientSDK
5+
6+
from ._client import JobName
7+
from ._models import Priority, ScheduleID, WorkerID
8+
from ._redis_job_scheduler_repository import RedisJobSchedulerRepository
9+
10+
11+
class JobScheduler:
12+
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
13+
self._repository = RedisJobSchedulerRepository(redis_client_sdk)
14+
15+
async def schedule(
16+
self, job_name: JobName, job_params: dict[str, Any], priority: Priority
17+
) -> ScheduleID:
18+
pass
19+
20+
async def dequeue(self, worker_id: WorkerID) -> ScheduleID | None:
21+
"""
22+
- ho un worker morto heartbeat > 10 x 5 sec
23+
- si: cerco tutte le schedule che ha attivo questo worker doev lui owner
24+
- asseggno il nuovo worker_id
25+
- restituisco il schedule_id per il worker
26+
- no: non faccio piu nulla
27+
- leggo le priotity queus
28+
- sort dellle key prento la piu importante
29+
- vedo se ho elementi in quella coda
30+
- si restituisco primo elemnto
31+
- no passo alla prossima coda meno imoportante e ripeto
32+
33+
- se non trovo nulla restuitsco None (nulla da fare)
34+
"""
35+
36+
pass
37+
38+
async def is_worker_alive(self, worker_id: WorkerID, max_inactive_time=60):
39+
last_heartbeat = await self._repository.get_worker_heartbeat(worker_id)
40+
if last_heartbeat:
41+
time_difference = datetime.now(timezone.utc) - last_heartbeat
42+
if time_difference.total_seconds() > max_inactive_time:
43+
return False
44+
return True
45+
return False
46+
47+
async def setup(self) -> None:
48+
pass
49+
50+
async def shutdown(self) -> None:
51+
pass
52+
53+
54+
class JobSchedulerWorker:
55+
def __init__(self, job_scheduler: JobScheduler, worker_id: WorkerID) -> None:
56+
self._job_scheduler = job_scheduler
57+
self._worker_id = worker_id
58+
59+
# worker4 gira ogni 5 secondi
60+
61+
async def _worker(self) -> None:
62+
"""
63+
1. ho ancura uno slot dispobibile per avviare un job? ( aggiorna il heartbet in redis)
64+
2 no = non faccio nulla
65+
3. si prendo elemnto dalla coda
66+
4. se ho un elemento da avviare lo avvio e scrivo che lo gestisco io
67+
"""
68+
69+
# leggere se posso avviare ancora
70+
schedule_id = await self._job_scheduler.dequeue(self._worker_id)
71+
if schedule_id is None:
72+
return
73+
74+
# avvio il task
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import datetime
2+
from enum import Enum, auto
3+
from typing import Any, TypeAlias
4+
5+
from pydantic import BaseModel, NonNegativeInt
6+
7+
ScheduleID: TypeAlias = str
8+
JobName: TypeAlias = str
9+
Priority: TypeAlias = NonNegativeInt
10+
WorkerID: TypeAlias = str
11+
12+
13+
class JobStatus(Enum):
14+
PENDING = auto()
15+
RUNNING = auto()
16+
COMPLETED = auto()
17+
FAILED = auto()
18+
19+
20+
class JobResult:
21+
pass
22+
23+
24+
class JobSchedule(BaseModel):
25+
schedule_id: ScheduleID | None = None
26+
27+
priority: Priority = 0
28+
29+
job_name: JobName | None = None
30+
job_params: dict[str, Any]
31+
job_status: JobStatus | None = None
32+
33+
worker_id: WorkerID | None = None
34+
35+
36+
class WorkersHeartbeat(BaseModel):
37+
worker_id: WorkerID
38+
last_beat: datetime.datetime # in utc
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from datetime import datetime, timezone
2+
from typing import Final
3+
from uuid import uuid4
4+
5+
from pydantic import NonNegativeInt
6+
7+
from ..redis._client import RedisClientSDK
8+
from ._models import JobSchedule, ScheduleID, WorkerID
9+
10+
_ASYNC_SCHEDULER_PREFIX: Final[str] = "as::"
11+
12+
_JOB_SCHEDULES_PREFIX: Final[str] = "js:"
13+
_SCHEDULER_WORKERS_PREFIX: Final[str] = "sw:"
14+
15+
16+
def _build_key(*parts) -> str:
17+
return f"{_ASYNC_SCHEDULER_PREFIX}{''.join(parts)}"
18+
19+
20+
class RedisJobSchedulerRepository:
21+
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
22+
self.redis_client_sdk = redis_client_sdk
23+
24+
async def get_new_unique_identifier(self) -> ScheduleID:
25+
candidate_already_exists = True
26+
while candidate_already_exists:
27+
candidate = f"{uuid4()}"
28+
candidate_already_exists = await self.get_schedule(candidate) is not None
29+
return ScheduleID(candidate)
30+
31+
async def save_schedule(
32+
self, schedule_id: ScheduleID, schedule: JobSchedule
33+
) -> None:
34+
await self.redis_client_sdk.redis.set(
35+
_build_key(_JOB_SCHEDULES_PREFIX, schedule_id), schedule.model_dump_json()
36+
)
37+
38+
async def get_schedule(self, schedule_id: ScheduleID) -> JobSchedule | None:
39+
raw_data = await self.redis_client_sdk.redis.get(
40+
_build_key(_JOB_SCHEDULES_PREFIX, schedule_id)
41+
)
42+
return JobSchedule.model_validate_json(raw_data) if raw_data else None
43+
44+
async def update_worker_heartbeat(
45+
self, worker_id: WorkerID, ttl: NonNegativeInt
46+
) -> None:
47+
await self.redis_client_sdk.redis.set(
48+
_build_key(_SCHEDULER_WORKERS_PREFIX, worker_id, ":heartbeat"),
49+
datetime.now(timezone.utc).isoformat(),
50+
ex=ttl,
51+
)
52+
53+
async def get_worker_heartbeat(self, worker_id: WorkerID):
54+
last_heartbeat = await self.redis_client_sdk.redis.get(
55+
_build_key(_SCHEDULER_WORKERS_PREFIX, worker_id, ":heartbeat"),
56+
)
57+
return datetime.fromisoformat(f"{last_heartbeat}") if last_heartbeat else None
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from settings_library.rabbit import RabbitSettings
2+
from settings_library.redis import RedisSettings
3+
4+
from ..deferred_tasks import BaseDeferredHandler
5+
from ._job_scheduler import JobScheduler
6+
7+
8+
class RpcJobScheduler:
9+
def __init__(
10+
self, rabbit_settings: RabbitSettings, redis_settings: RedisSettings
11+
) -> None:
12+
self._scheduler = JobScheduler(get_redis_client(redis_settings))
13+
14+
async def register_deferred_task(self, deferred_taask: type[BaseDeferredHandler]):
15+
pass
16+
17+
async def setup(self) -> None:
18+
await self._scheduler.setup()
19+
20+
async def shutdown(self) -> None:
21+
await self._scheduler.shutdown()
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from collections.abc import AsyncIterator, Callable
2+
from contextlib import AbstractAsyncContextManager
3+
4+
import pytest
5+
from servicelib.redis import RedisClientSDK
6+
from settings_library.redis import RedisDatabase
7+
8+
9+
@pytest.fixture
10+
async def redis_client_sdk_job_scheduler(
11+
get_redis_client_sdk: Callable[
12+
[RedisDatabase, bool], AbstractAsyncContextManager[RedisClientSDK]
13+
]
14+
) -> AsyncIterator[RedisClientSDK]:
15+
async with get_redis_client_sdk(
16+
RedisDatabase.JOB_SCHEDULER, decode_response=False
17+
) as client:
18+
yield client
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import pytest
2+
from pydantic import TypeAdapter
3+
from servicelib.job_scheduler._models import JobSchedule, ScheduleID
4+
from servicelib.job_scheduler._redis_job_scheduler_repository import (
5+
RedisJobSchedulerRepository,
6+
)
7+
from servicelib.redis._client import RedisClientSDK
8+
9+
pytest_simcore_core_services_selection = [
10+
"redis",
11+
]
12+
13+
14+
@pytest.fixture
15+
def job_schedule() -> JobSchedule:
16+
return TypeAdapter(JobSchedule).validate_python(
17+
{
18+
# "timeout": timedelta(seconds=1),
19+
# "execution_attempts": 1,
20+
# "class_unique_reference": "mock",
21+
# "start_context": {},
22+
# "state": TaskState.SCHEDULED,
23+
# "result": None,
24+
},
25+
)
26+
27+
28+
async def test_job_scheduler_workflow(
29+
redis_client_sdk_job_scheduler: RedisClientSDK,
30+
job_schedule: JobSchedule,
31+
):
32+
repo = RedisJobSchedulerRepository(redis_client_sdk_job_scheduler)
33+
34+
schedule_id: ScheduleID = await repo.get_new_unique_identifier()
35+
36+
assert await repo.get_schedule(schedule_id) is None
37+
await repo.save_schedule(schedule_id, job_schedule)
38+
assert await repo.get_schedule(schedule_id) == job_schedule

packages/settings-library/src/settings_library/redis.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class RedisDatabase(IntEnum):
1818
DISTRIBUTED_IDENTIFIERS = 6
1919
DEFERRED_TASKS = 7
2020
DYNAMIC_SERVICES = 8
21+
JOB_SCHEDULER = 9
2122

2223

2324
class RedisSettings(BaseCustomSettings):

services/docker-compose-ops.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ services:
9494
announcements:${REDIS_HOST}:${REDIS_PORT}:5:${REDIS_PASSWORD},
9595
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD},
9696
deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD},
97-
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD}
97+
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD},
98+
job_scheduler:${REDIS_HOST}:${REDIS_PORT}:9:${REDIS_PASSWORD},
9899
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml
99100
ports:
100101
- "18081:8081"

0 commit comments

Comments
 (0)