Skip to content

Commit 3682536

Browse files
giancarloromeomrnicegyu11
authored andcommitted
✨ Add a Distributed Task Queue (using Celery) (🏗️ ⚠️DEVOPS) (ITISFoundation#7214)
1 parent 70b0391 commit 3682536

File tree

39 files changed

+1297
-185
lines changed

39 files changed

+1297
-185
lines changed

.env-devel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1},
4949
CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'
5050
CATALOG_TRACING=null
5151

52+
CELERY_RESULT_EXPIRES=P7D
53+
5254
CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH='{"type":"tls","tls_ca_file":"/home/scu/.dask/dask-crt.pem","tls_client_cert":"/home/scu/.dask/dask-crt.pem","tls_client_key":"/home/scu/.dask/dask-key.pem"}'
5355
CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest
5456
CLUSTERS_KEEPER_DASK_NTHREADS=0

api/specs/web-server/_storage.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,15 @@ async def export_data(data_export: DataExportPost, location_id: LocationID):
210210
response_model=Envelope[StorageAsyncJobStatus],
211211
name="get_async_job_status",
212212
)
213-
async def get_async_job_status(storage_async_job_get: StorageAsyncJobGet, job_id: UUID):
213+
async def get_async_job_status(job_id: UUID):
214214
"""Get async job status"""
215215

216216

217217
@router.post(
218218
"/storage/async-jobs/{job_id}:abort",
219219
name="abort_async_job",
220220
)
221-
async def abort_async_job(storage_async_job_get: StorageAsyncJobGet, job_id: UUID):
221+
async def abort_async_job(job_id: UUID):
222222
"""aborts execution of an async job"""
223223

224224

@@ -227,7 +227,7 @@ async def abort_async_job(storage_async_job_get: StorageAsyncJobGet, job_id: UUI
227227
response_model=Envelope[StorageAsyncJobResult],
228228
name="get_async_job_result",
229229
)
230-
async def get_async_job_result(storage_async_job_get: StorageAsyncJobGet, job_id: UUID):
230+
async def get_async_job_result(job_id: UUID):
231231
"""Get the result of the async job"""
232232

233233

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import asyncio
2+
import functools
3+
from collections.abc import Awaitable, Callable
4+
from concurrent.futures import Executor
5+
from typing import ParamSpec, TypeVar
6+
7+
R = TypeVar("R")
8+
P = ParamSpec("P")
9+
10+
11+
def make_async(
12+
executor: Executor | None = None,
13+
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
14+
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
15+
@functools.wraps(func)
16+
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
17+
loop = asyncio.get_running_loop()
18+
return await loop.run_in_executor(
19+
executor, functools.partial(func, *args, **kwargs)
20+
)
21+
22+
return wrapper
23+
24+
return decorator
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
3+
4+
import pytest
5+
from common_library.async_tools import make_async
6+
7+
8+
@make_async()
9+
def sync_function(x: int, y: int) -> int:
10+
return x + y
11+
12+
13+
@make_async()
14+
def sync_function_with_exception() -> None:
15+
raise ValueError("This is an error!")
16+
17+
18+
@pytest.mark.asyncio
19+
async def test_make_async_returns_coroutine():
20+
result = sync_function(2, 3)
21+
assert asyncio.iscoroutine(result), "Function should return a coroutine"
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_make_async_execution():
26+
result = await sync_function(2, 3)
27+
assert result == 5, "Function should return 5"
28+
29+
30+
@pytest.mark.asyncio
31+
async def test_make_async_exception():
32+
with pytest.raises(ValueError, match="This is an error!"):
33+
await sync_function_with_exception()
34+
35+
36+
@pytest.mark.asyncio
37+
async def test_make_async_with_executor():
38+
executor = ThreadPoolExecutor()
39+
40+
@make_async(executor)
41+
def heavy_computation(x: int) -> int:
42+
return x * x
43+
44+
result = await heavy_computation(4)
45+
assert result == 16, "Function should return 16"

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
from datetime import datetime
21
from typing import Any, TypeAlias
32
from uuid import UUID
43

54
from models_library.users import UserID
6-
from pydantic import BaseModel, model_validator
7-
from typing_extensions import Self
5+
from pydantic import BaseModel
86

97
from ..progress_bar import ProgressReport
108

@@ -15,18 +13,6 @@ class AsyncJobStatus(BaseModel):
1513
job_id: AsyncJobId
1614
progress: ProgressReport
1715
done: bool
18-
started: datetime
19-
stopped: datetime | None
20-
21-
@model_validator(mode="after")
22-
def _check_consistency(self) -> Self:
23-
is_done = self.done
24-
is_stopped = self.stopped is not None
25-
26-
if is_done != is_stopped:
27-
msg = f"Inconsistent data: {self.done=}, {self.stopped=}"
28-
raise ValueError(msg)
29-
return self
3016

3117

3218
class AsyncJobResult(BaseModel):

packages/models-library/src/models_library/api_schemas_webserver/storage.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from datetime import datetime
21
from pathlib import Path
32
from typing import Annotated, Any
43

@@ -62,8 +61,6 @@ class StorageAsyncJobStatus(OutputSchema):
6261
job_id: AsyncJobId
6362
progress: ProgressReport
6463
done: bool
65-
started: datetime
66-
stopped: datetime | None
6764

6865
@classmethod
6966
def from_rpc_schema(
@@ -73,8 +70,6 @@ def from_rpc_schema(
7370
job_id=async_job_rpc_status.job_id,
7471
progress=async_job_rpc_status.progress,
7572
done=async_job_rpc_status.done,
76-
started=async_job_rpc_status.started,
77-
stopped=async_job_rpc_status.stopped,
7873
)
7974

8075

packages/pytest-simcore/src/pytest_simcore/simcore_services.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"static-webserver",
3838
"traefik",
3939
"whoami",
40+
"sto-worker",
4041
}
4142
# TODO: unify healthcheck policies see https://github.com/ITISFoundation/osparc-simcore/pull/2281
4243
SERVICE_PUBLISHED_PORT = {}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from datetime import timedelta
2+
from typing import Annotated
3+
4+
from pydantic import Field
5+
from pydantic_settings import SettingsConfigDict
6+
from settings_library.rabbit import RabbitSettings
7+
from settings_library.redis import RedisSettings
8+
9+
from .base import BaseCustomSettings
10+
11+
12+
class CelerySettings(BaseCustomSettings):
13+
CELERY_RABBIT_BROKER: Annotated[
14+
RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True})
15+
]
16+
CELERY_REDIS_RESULT_BACKEND: Annotated[
17+
RedisSettings, Field(json_schema_extra={"auto_default_from_env": True})
18+
]
19+
CELERY_RESULT_EXPIRES: Annotated[
20+
timedelta,
21+
Field(
22+
description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
23+
),
24+
] = timedelta(days=7)
25+
CELERY_RESULT_PERSISTENT: Annotated[
26+
bool,
27+
Field(
28+
description="If set to True, result messages will be persistent (after a broker restart)."
29+
),
30+
] = True
31+
32+
model_config = SettingsConfigDict(
33+
json_schema_extra={
34+
"examples": [
35+
{
36+
"CELERY_RABBIT_BROKER": {
37+
"RABBIT_USER": "guest",
38+
"RABBIT_SECURE": False,
39+
"RABBIT_PASSWORD": "guest",
40+
"RABBIT_HOST": "localhost",
41+
"RABBIT_PORT": 5672,
42+
},
43+
"CELERY_REDIS_RESULT_BACKEND": {
44+
"REDIS_HOST": "localhost",
45+
"REDIS_PORT": 6379,
46+
},
47+
"CELERY_RESULT_EXPIRES": timedelta(days=1), # type: ignore[dict-item]
48+
"CELERY_RESULT_PERSISTENT": True,
49+
}
50+
],
51+
}
52+
)

packages/settings-library/src/settings_library/redis.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from enum import IntEnum
22

3-
from pydantic import TypeAdapter
43
from pydantic.networks import RedisDsn
54
from pydantic.types import SecretStr
65

@@ -18,13 +17,14 @@ class RedisDatabase(IntEnum):
1817
DISTRIBUTED_IDENTIFIERS = 6
1918
DEFERRED_TASKS = 7
2019
DYNAMIC_SERVICES = 8
20+
CELERY_TASKS = 9
2121

2222

2323
class RedisSettings(BaseCustomSettings):
2424
# host
2525
REDIS_SECURE: bool = False
2626
REDIS_HOST: str = "redis"
27-
REDIS_PORT: PortInt = TypeAdapter(PortInt).validate_python(6789)
27+
REDIS_PORT: PortInt = 6789
2828

2929
# auth
3030
REDIS_USER: str | None = None

services/docker-compose-ops.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ services:
9595
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD},
9696
deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD},
9797
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD}
98+
celery_tasks:${REDIS_HOST}:${REDIS_PORT}:9:${REDIS_PASSWORD}
9899
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml
99100
ports:
100101
- "18081:8081"

0 commit comments

Comments
 (0)