Skip to content

Commit 35254e9

Browse files
committed
merge giancarloromeo/add-distributed-task-queue
2 parents 045aa49 + 66e51d9 commit 35254e9

File tree

35 files changed

+1278
-171
lines changed

35 files changed

+1278
-171
lines changed

.env-devel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1},
4949
CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}'
5050
CATALOG_TRACING=null
5151

52+
CELERY_RESULT_EXPIRES=P7D
53+
5254
CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH='{"type":"tls","tls_ca_file":"/home/scu/.dask/dask-crt.pem","tls_client_cert":"/home/scu/.dask/dask-crt.pem","tls_client_key":"/home/scu/.dask/dask-key.pem"}'
5355
CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest
5456
CLUSTERS_KEEPER_DASK_NTHREADS=0
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import asyncio
2+
import functools
3+
from collections.abc import Awaitable, Callable
4+
from concurrent.futures import Executor
5+
from typing import ParamSpec, TypeVar
6+
7+
R = TypeVar("R")
8+
P = ParamSpec("P")
9+
10+
11+
def make_async(
12+
executor: Executor | None = None,
13+
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
14+
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
15+
@functools.wraps(func)
16+
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
17+
loop = asyncio.get_running_loop()
18+
return await loop.run_in_executor(
19+
executor, functools.partial(func, *args, **kwargs)
20+
)
21+
22+
return wrapper
23+
24+
return decorator
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
3+
4+
import pytest
5+
from common_library.async_tools import make_async
6+
7+
8+
@make_async()
9+
def sync_function(x: int, y: int) -> int:
10+
return x + y
11+
12+
13+
@make_async()
14+
def sync_function_with_exception() -> None:
15+
raise ValueError("This is an error!")
16+
17+
18+
@pytest.mark.asyncio
19+
async def test_make_async_returns_coroutine():
20+
result = sync_function(2, 3)
21+
assert asyncio.iscoroutine(result), "Function should return a coroutine"
22+
23+
24+
@pytest.mark.asyncio
25+
async def test_make_async_execution():
26+
result = await sync_function(2, 3)
27+
assert result == 5, "Function should return 5"
28+
29+
30+
@pytest.mark.asyncio
31+
async def test_make_async_exception():
32+
with pytest.raises(ValueError, match="This is an error!"):
33+
await sync_function_with_exception()
34+
35+
36+
@pytest.mark.asyncio
37+
async def test_make_async_with_executor():
38+
executor = ThreadPoolExecutor()
39+
40+
@make_async(executor)
41+
def heavy_computation(x: int) -> int:
42+
return x * x
43+
44+
result = await heavy_computation(4)
45+
assert result == 16, "Function should return 16"

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
from datetime import datetime
21
from typing import Any, TypeAlias
32
from uuid import UUID
43

54
from models_library.users import UserID
6-
from pydantic import BaseModel, model_validator
7-
from typing_extensions import Self
5+
from pydantic import BaseModel
86

97
from ..progress_bar import ProgressReport
108

@@ -15,18 +13,6 @@ class AsyncJobStatus(BaseModel):
1513
job_id: AsyncJobId
1614
progress: ProgressReport
1715
done: bool
18-
started: datetime
19-
stopped: datetime | None
20-
21-
@model_validator(mode="after")
22-
def _check_consistency(self) -> Self:
23-
is_done = self.done
24-
is_stopped = self.stopped is not None
25-
26-
if is_done != is_stopped:
27-
msg = f"Inconsistent data: {self.done=}, {self.stopped=}"
28-
raise ValueError(msg)
29-
return self
3016

3117

3218
class AsyncJobResult(BaseModel):

packages/models-library/src/models_library/api_schemas_webserver/storage.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from datetime import datetime
21
from pathlib import Path
32
from typing import Annotated, Any
43

@@ -85,8 +84,6 @@ class StorageAsyncJobStatus(OutputSchema):
8584
job_id: AsyncJobId
8685
progress: ProgressReport
8786
done: bool
88-
started: datetime
89-
stopped: datetime | None
9087
links: AsyncJobLinks
9188

9289
@classmethod
@@ -97,8 +94,6 @@ def from_rpc_schema(
9794
job_id=async_job_rpc_status.job_id,
9895
progress=async_job_rpc_status.progress,
9996
done=async_job_rpc_status.done,
100-
started=async_job_rpc_status.started,
101-
stopped=async_job_rpc_status.stopped,
10297
links=AsyncJobLinks.from_job_id(
10398
app=app, job_id=f"{async_job_rpc_status.job_id}"
10499
),
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from datetime import timedelta
2+
from typing import Annotated
3+
4+
from pydantic import Field
5+
from pydantic_settings import SettingsConfigDict
6+
from settings_library.rabbit import RabbitSettings
7+
from settings_library.redis import RedisSettings
8+
9+
from .base import BaseCustomSettings
10+
11+
12+
class CelerySettings(BaseCustomSettings):
13+
CELERY_RABBIT_BROKER: Annotated[
14+
RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True})
15+
]
16+
CELERY_REDIS_RESULT_BACKEND: Annotated[
17+
RedisSettings, Field(json_schema_extra={"auto_default_from_env": True})
18+
]
19+
CELERY_RESULT_EXPIRES: Annotated[
20+
timedelta,
21+
Field(
22+
description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)."
23+
),
24+
] = timedelta(days=7)
25+
CELERY_RESULT_PERSISTENT: Annotated[
26+
bool,
27+
Field(
28+
description="If set to True, result messages will be persistent (after a broker restart)."
29+
),
30+
] = True
31+
32+
model_config = SettingsConfigDict(
33+
json_schema_extra={
34+
"examples": [
35+
{
36+
"CELERY_RABBIT_BROKER": {
37+
"RABBIT_USER": "guest",
38+
"RABBIT_SECURE": False,
39+
"RABBIT_PASSWORD": "guest",
40+
"RABBIT_HOST": "localhost",
41+
"RABBIT_PORT": 5672,
42+
},
43+
"CELERY_REDIS_RESULT_BACKEND": {
44+
"REDIS_HOST": "localhost",
45+
"REDIS_PORT": 6379,
46+
},
47+
"CELERY_RESULT_EXPIRES": timedelta(days=1), # type: ignore[dict-item]
48+
"CELERY_RESULT_PERSISTENT": True,
49+
}
50+
],
51+
}
52+
)

packages/settings-library/src/settings_library/redis.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from enum import IntEnum
22

3-
from pydantic import TypeAdapter
43
from pydantic.networks import RedisDsn
54
from pydantic.types import SecretStr
65

@@ -18,13 +17,14 @@ class RedisDatabase(IntEnum):
1817
DISTRIBUTED_IDENTIFIERS = 6
1918
DEFERRED_TASKS = 7
2019
DYNAMIC_SERVICES = 8
20+
CELERY_TASKS = 9
2121

2222

2323
class RedisSettings(BaseCustomSettings):
2424
# host
2525
REDIS_SECURE: bool = False
2626
REDIS_HOST: str = "redis"
27-
REDIS_PORT: PortInt = TypeAdapter(PortInt).validate_python(6789)
27+
REDIS_PORT: PortInt = 6789
2828

2929
# auth
3030
REDIS_USER: str | None = None

services/docker-compose-ops.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ services:
9595
distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD},
9696
deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD},
9797
dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD}
98+
celery_tasks:${REDIS_HOST}:${REDIS_PORT}:9:${REDIS_PASSWORD}
9899
# If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml
99100
ports:
100101
- "18081:8081"

services/docker-compose.yml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,7 +1152,7 @@ services:
11521152
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-latest}
11531153
init: true
11541154
hostname: "sto-{{.Node.Hostname}}-{{.Task.Slot}}"
1155-
environment:
1155+
environment: &storage_environment
11561156
BF_API_KEY: ${BF_API_KEY}
11571157
BF_API_SECRET: ${BF_API_SECRET}
11581158
DATCORE_ADAPTER_HOST: ${DATCORE_ADAPTER_HOST:-datcore-adapter}
@@ -1179,17 +1179,28 @@ services:
11791179
S3_ENDPOINT: ${S3_ENDPOINT}
11801180
S3_REGION: ${S3_REGION}
11811181
S3_SECRET_KEY: ${S3_SECRET_KEY}
1182+
STORAGE_WORKER_MODE: "false"
11821183
STORAGE_LOGLEVEL: ${STORAGE_LOGLEVEL}
11831184
STORAGE_MONITORING_ENABLED: 1
11841185
STORAGE_PROFILING: ${STORAGE_PROFILING}
11851186
STORAGE_PORT: ${STORAGE_PORT}
11861187
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
11871188
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
1188-
networks:
1189+
networks: &storage_networks
11891190
- default
11901191
- interactive_services_subnet
11911192
- storage_subnet
11921193

1194+
sto-worker:
1195+
image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest}
1196+
init: true
1197+
hostname: "sto-worker-{{.Node.Hostname}}-{{.Task.Slot}}"
1198+
environment:
1199+
<<: *storage_environment
1200+
STORAGE_WORKER_MODE: "true"
1201+
CELERY_CONCURRENCY: 1
1202+
networks: *storage_networks
1203+
11931204
rabbit:
11941205
image: itisfoundation/rabbitmq:3.11.2-management
11951206
init: true
@@ -1274,7 +1285,7 @@ services:
12741285
"--loglevel",
12751286
"verbose",
12761287
"--databases",
1277-
"9",
1288+
"10",
12781289
"--appendonly",
12791290
"yes",
12801291
"--requirepass",

services/storage/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ openapi.json: .env-ignore
1515
@set -o allexport; \
1616
source $<; \
1717
set +o allexport; \
18-
python3 -c "import json; from $(APP_PACKAGE_NAME).main import *; print( json.dumps(the_app.openapi(), indent=2) )" > $@
18+
python3 -c "import json; from $(APP_PACKAGE_NAME).main import *; print( json.dumps(app.openapi(), indent=2) )" > $@
1919

2020
# validates OAS file: $@
2121
$(call validate_openapi_specs,$@)

0 commit comments

Comments
 (0)