Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Any

from ._models import JobName, JobStatus, ScheduleID, ScheduleResult


class ClientInterface:
async def schedule(self, job_name: JobName, **kwargs: Any) -> ScheduleID:
pass

async def cancel(self, schedule_id: ScheduleID) -> None:
pass

async def status(self, schedule_id: ScheduleID) -> JobStatus:
pass

async def result(self, schedule_id: ScheduleID) -> ScheduleResult:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from datetime import datetime, timezone
from typing import Any

from servicelib.redis._client import RedisClientSDK

from ._client import JobName
from ._models import Priority, ScheduleID, WorkerID
from ._redis_job_scheduler_repository import RedisJobSchedulerRepository


class JobScheduler:
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
self._repository = RedisJobSchedulerRepository(redis_client_sdk)

async def schedule(
self, job_name: JobName, job_params: dict[str, Any], priority: Priority
) -> ScheduleID:
pass

async def dequeue(self, worker_id: WorkerID) -> ScheduleID | None:
"""
- ho un worker morto heartbeat > 10 x 5 sec
- si: cerco tutte le schedule che ha attivo questo worker doev lui owner
- asseggno il nuovo worker_id
- restituisco il schedule_id per il worker
- no: non faccio piu nulla
- leggo le priotity queus
- sort dellle key prento la piu importante
- vedo se ho elementi in quella coda
- si restituisco primo elemnto
- no passo alla prossima coda meno imoportante e ripeto

- se non trovo nulla restuitsco None (nulla da fare)
"""

pass

async def is_worker_alive(self, worker_id: WorkerID, max_inactive_time=60):
last_heartbeat = await self._repository.get_worker_heartbeat(worker_id)
if last_heartbeat:
time_difference = datetime.now(timezone.utc) - last_heartbeat
if time_difference.total_seconds() > max_inactive_time:
return False
return True
return False

async def setup(self) -> None:
pass

async def shutdown(self) -> None:
pass


class JobSchedulerWorker:
def __init__(self, job_scheduler: JobScheduler, worker_id: WorkerID) -> None:
self._job_scheduler = job_scheduler
self._worker_id = worker_id

# worker4 gira ogni 5 secondi

async def _worker(self) -> None:
"""
1. ho ancura uno slot dispobibile per avviare un job? ( aggiorna il heartbet in redis)
2 no = non faccio nulla
3. si prendo elemnto dalla coda
4. se ho un elemento da avviare lo avvio e scrivo che lo gestisco io
"""

# leggere se posso avviare ancora
schedule_id = await self._job_scheduler.dequeue(self._worker_id)
if schedule_id is None:
return

# avvio il task
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import datetime
from enum import Enum, auto
from typing import Any, TypeAlias

from pydantic import BaseModel, NonNegativeInt

ScheduleID: TypeAlias = str
JobName: TypeAlias = str
Priority: TypeAlias = NonNegativeInt
WorkerID: TypeAlias = str


class JobStatus(Enum):
PENDING = auto()
RUNNING = auto()
COMPLETED = auto()
FAILED = auto()


class JobResult:
pass


class JobSchedule(BaseModel):
schedule_id: ScheduleID | None = None

priority: Priority = 0

job_name: JobName | None = None
job_params: dict[str, Any]
job_status: JobStatus | None = None

worker_id: WorkerID | None = None


class WorkersHeartbeat(BaseModel):
worker_id: WorkerID
last_beat: datetime.datetime # in utc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import datetime, timezone
from typing import Final
from uuid import uuid4

from pydantic import NonNegativeInt

from ..redis._client import RedisClientSDK
from ._models import JobSchedule, ScheduleID, WorkerID

_ASYNC_SCHEDULER_PREFIX: Final[str] = "as::"

_JOB_SCHEDULES_PREFIX: Final[str] = "js:"
_SCHEDULER_WORKERS_PREFIX: Final[str] = "sw:"


def _build_key(*parts) -> str:
return f"{_ASYNC_SCHEDULER_PREFIX}{''.join(parts)}"


class RedisJobSchedulerRepository:
def __init__(self, redis_client_sdk: RedisClientSDK) -> None:
self.redis_client_sdk = redis_client_sdk

async def get_new_unique_identifier(self) -> ScheduleID:
candidate_already_exists = True
while candidate_already_exists:
candidate = f"{uuid4()}"
candidate_already_exists = await self.get_schedule(candidate) is not None
return ScheduleID(candidate)

async def save_schedule(
self, schedule_id: ScheduleID, schedule: JobSchedule
) -> None:
await self.redis_client_sdk.redis.set(
_build_key(_JOB_SCHEDULES_PREFIX, schedule_id), schedule.model_dump_json()
)

async def get_schedule(self, schedule_id: ScheduleID) -> JobSchedule | None:
raw_data = await self.redis_client_sdk.redis.get(
_build_key(_JOB_SCHEDULES_PREFIX, schedule_id)
)
return JobSchedule.model_validate_json(raw_data) if raw_data else None

async def update_worker_heartbeat(
self, worker_id: WorkerID, ttl: NonNegativeInt
) -> None:
await self.redis_client_sdk.redis.set(
_build_key(_SCHEDULER_WORKERS_PREFIX, worker_id, ":heartbeat"),
datetime.now(timezone.utc).isoformat(),
ex=ttl,
)

async def get_worker_heartbeat(self, worker_id: WorkerID):
last_heartbeat = await self.redis_client_sdk.redis.get(
_build_key(_SCHEDULER_WORKERS_PREFIX, worker_id, ":heartbeat"),
)
return datetime.fromisoformat(f"{last_heartbeat}") if last_heartbeat else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings

from ..deferred_tasks import BaseDeferredHandler
from ._job_scheduler import JobScheduler


class RpcJobScheduler:
def __init__(
self, rabbit_settings: RabbitSettings, redis_settings: RedisSettings
) -> None:
self._scheduler = JobScheduler(get_redis_client(redis_settings))

async def register_deferred_task(self, deferred_taask: type[BaseDeferredHandler]):
pass

async def setup(self) -> None:
await self._scheduler.setup()

async def shutdown(self) -> None:
await self._scheduler.shutdown()
18 changes: 18 additions & 0 deletions packages/service-library/tests/job_scheduler/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from collections.abc import AsyncIterator, Callable
from contextlib import AbstractAsyncContextManager

import pytest
from servicelib.redis import RedisClientSDK
from settings_library.redis import RedisDatabase


@pytest.fixture
async def redis_client_sdk_job_scheduler(
get_redis_client_sdk: Callable[
[RedisDatabase, bool], AbstractAsyncContextManager[RedisClientSDK]
]
) -> AsyncIterator[RedisClientSDK]:
async with get_redis_client_sdk(
RedisDatabase.JOB_SCHEDULER, decode_response=False
) as client:
yield client
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest
from pydantic import TypeAdapter
from servicelib.job_scheduler._models import JobSchedule, ScheduleID
from servicelib.job_scheduler._redis_job_scheduler_repository import (
RedisJobSchedulerRepository,
)
from servicelib.redis._client import RedisClientSDK

pytest_simcore_core_services_selection = [
"redis",
]


@pytest.fixture
def job_schedule() -> JobSchedule:
return TypeAdapter(JobSchedule).validate_python(
{
# "timeout": timedelta(seconds=1),
# "execution_attempts": 1,
# "class_unique_reference": "mock",
# "start_context": {},
# "state": TaskState.SCHEDULED,
# "result": None,
},
)


async def test_job_scheduler_workflow(
redis_client_sdk_job_scheduler: RedisClientSDK,
job_schedule: JobSchedule,
):
repo = RedisJobSchedulerRepository(redis_client_sdk_job_scheduler)

schedule_id: ScheduleID = await repo.get_new_unique_identifier()

assert await repo.get_schedule(schedule_id) is None
await repo.save_schedule(schedule_id, job_schedule)
assert await repo.get_schedule(schedule_id) == job_schedule
1 change: 1 addition & 0 deletions packages/settings-library/src/settings_library/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class RedisDatabase(IntEnum):
DISTRIBUTED_IDENTIFIERS = 6
DEFERRED_TASKS = 7
DYNAMIC_SERVICES = 8
JOB_SCHEDULER = 9


class RedisSettings(BaseCustomSettings):
Expand Down
3 changes: 2 additions & 1 deletion services/docker-compose-ops.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ services:
announcements:${REDIS_HOST}:${REDIS_PORT}:5:${REDIS_PASSWORD},
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD},
deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD},
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD}
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD},
job_scheduler:${REDIS_HOST}:${REDIS_PORT}:9:${REDIS_PASSWORD},
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml
ports:
- "18081:8081"
Expand Down
2 changes: 1 addition & 1 deletion services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ services:
"--loglevel",
"verbose",
"--databases",
"9",
"10",
"--appendonly",
"yes",
"--requirepass",
Expand Down
Loading