diff --git a/.env-devel b/.env-devel index e90466c32f0f..d5df4deab06b 100644 --- a/.env-devel +++ b/.env-devel @@ -49,6 +49,8 @@ CATALOG_SERVICES_DEFAULT_RESOURCES='{"CPU": {"limit": 0.1, "reservation": 0.1}, CATALOG_SERVICES_DEFAULT_SPECIFICATIONS='{}' CATALOG_TRACING=null +CELERY_RESULT_EXPIRES=P7D + CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH='{"type":"tls","tls_ca_file":"/home/scu/.dask/dask-crt.pem","tls_client_cert":"/home/scu/.dask/dask-crt.pem","tls_client_key":"/home/scu/.dask/dask-key.pem"}' CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest CLUSTERS_KEEPER_DASK_NTHREADS=0 diff --git a/api/specs/web-server/_storage.py b/api/specs/web-server/_storage.py index fac4c4fde874..848fac10a7b0 100644 --- a/api/specs/web-server/_storage.py +++ b/api/specs/web-server/_storage.py @@ -210,7 +210,7 @@ async def export_data(data_export: DataExportPost, location_id: LocationID): response_model=Envelope[StorageAsyncJobStatus], name="get_async_job_status", ) -async def get_async_job_status(storage_async_job_get: StorageAsyncJobGet, job_id: UUID): +async def get_async_job_status(job_id: UUID): """Get async job status""" @@ -218,7 +218,7 @@ async def get_async_job_status(storage_async_job_get: StorageAsyncJobGet, job_id "/storage/async-jobs/{job_id}:abort", name="abort_async_job", ) -async def abort_async_job(storage_async_job_get: StorageAsyncJobGet, job_id: UUID): +async def abort_async_job(job_id: UUID): """aborts execution of an async job""" @@ -227,7 +227,7 @@ async def abort_async_job(storage_async_job_get: StorageAsyncJobGet, job_id: UUI response_model=Envelope[StorageAsyncJobResult], name="get_async_job_result", ) -async def get_async_job_result(storage_async_job_get: StorageAsyncJobGet, job_id: UUID): +async def get_async_job_result(job_id: UUID): """Get the result of the async job""" diff --git a/packages/common-library/src/common_library/async_tools.py b/packages/common-library/src/common_library/async_tools.py new file mode 100644 index 000000000000..d92944299e79 --- /dev/null +++ b/packages/common-library/src/common_library/async_tools.py @@ -0,0 +1,24 @@ +import asyncio +import functools +from collections.abc import Awaitable, Callable +from concurrent.futures import Executor +from typing import ParamSpec, TypeVar + +R = TypeVar("R") +P = ParamSpec("P") + + +def make_async( + executor: Executor | None = None, +) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]: + def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]: + @functools.wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + executor, functools.partial(func, *args, **kwargs) + ) + + return wrapper + + return decorator diff --git a/packages/common-library/tests/test_async_tools.py b/packages/common-library/tests/test_async_tools.py new file mode 100644 index 000000000000..961bf6f9fde4 --- /dev/null +++ b/packages/common-library/tests/test_async_tools.py @@ -0,0 +1,45 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +import pytest +from common_library.async_tools import make_async + + +@make_async() +def sync_function(x: int, y: int) -> int: + return x + y + + +@make_async() +def sync_function_with_exception() -> None: + raise ValueError("This is an error!") + + +@pytest.mark.asyncio +async def test_make_async_returns_coroutine(): + result = sync_function(2, 3) + assert asyncio.iscoroutine(result), "Function should return a coroutine" + + +@pytest.mark.asyncio +async def test_make_async_execution(): + result = await sync_function(2, 3) + assert result == 5, "Function should return 5" + + +@pytest.mark.asyncio +async def test_make_async_exception(): + with pytest.raises(ValueError, match="This is an error!"): + await sync_function_with_exception() + + +@pytest.mark.asyncio +async def test_make_async_with_executor(): + executor = ThreadPoolExecutor() + + @make_async(executor) + def heavy_computation(x: int) -> int: + return x * x + + result = await heavy_computation(4) + assert result == 16, "Function should return 16" diff --git a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py index 8901e66a7ad5..953cd1f819cc 100644 --- a/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py +++ b/packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py @@ -1,10 +1,8 @@ -from datetime import datetime from typing import Any, TypeAlias from uuid import UUID from models_library.users import UserID -from pydantic import BaseModel, model_validator -from typing_extensions import Self +from pydantic import BaseModel from ..progress_bar import ProgressReport @@ -15,18 +13,6 @@ class AsyncJobStatus(BaseModel): job_id: AsyncJobId progress: ProgressReport done: bool - started: datetime - stopped: datetime | None - - @model_validator(mode="after") - def _check_consistency(self) -> Self: - is_done = self.done - is_stopped = self.stopped is not None - - if is_done != is_stopped: - msg = f"Inconsistent data: {self.done=}, {self.stopped=}" - raise ValueError(msg) - return self class AsyncJobResult(BaseModel): diff --git a/packages/models-library/src/models_library/api_schemas_webserver/storage.py b/packages/models-library/src/models_library/api_schemas_webserver/storage.py index 57ccf1f88b03..f192ba51355e 100644 --- a/packages/models-library/src/models_library/api_schemas_webserver/storage.py +++ b/packages/models-library/src/models_library/api_schemas_webserver/storage.py @@ -1,4 +1,3 @@ -from datetime import datetime from pathlib import Path from typing import Annotated, Any @@ -62,8 +61,6 @@ class StorageAsyncJobStatus(OutputSchema): job_id: AsyncJobId progress: ProgressReport done: bool - started: datetime - stopped: datetime | None @classmethod def from_rpc_schema( @@ -73,8 +70,6 @@ def from_rpc_schema( job_id=async_job_rpc_status.job_id, progress=async_job_rpc_status.progress, done=async_job_rpc_status.done, - started=async_job_rpc_status.started, - stopped=async_job_rpc_status.stopped, ) diff --git a/packages/pytest-simcore/src/pytest_simcore/simcore_services.py b/packages/pytest-simcore/src/pytest_simcore/simcore_services.py index 88dba2e4543c..749bbc042300 100644 --- a/packages/pytest-simcore/src/pytest_simcore/simcore_services.py +++ b/packages/pytest-simcore/src/pytest_simcore/simcore_services.py @@ -37,6 +37,7 @@ "static-webserver", "traefik", "whoami", + "sto-worker", } # TODO: unify healthcheck policies see https://github.com/ITISFoundation/osparc-simcore/pull/2281 SERVICE_PUBLISHED_PORT = {} diff --git a/packages/settings-library/src/settings_library/celery.py b/packages/settings-library/src/settings_library/celery.py new file mode 100644 index 000000000000..9259b574047b --- /dev/null +++ b/packages/settings-library/src/settings_library/celery.py @@ -0,0 +1,52 @@ +from datetime import timedelta +from typing import Annotated + +from pydantic import Field +from pydantic_settings import SettingsConfigDict +from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings + +from .base import BaseCustomSettings + + +class CelerySettings(BaseCustomSettings): + CELERY_RABBIT_BROKER: Annotated[ + RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] + CELERY_REDIS_RESULT_BACKEND: Annotated[ + RedisSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] + CELERY_RESULT_EXPIRES: Annotated[ + timedelta, + Field( + description="Time after which task results will be deleted (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)." + ), + ] = timedelta(days=7) + CELERY_RESULT_PERSISTENT: Annotated[ + bool, + Field( + description="If set to True, result messages will be persistent (after a broker restart)." + ), + ] = True + + model_config = SettingsConfigDict( + json_schema_extra={ + "examples": [ + { + "CELERY_RABBIT_BROKER": { + "RABBIT_USER": "guest", + "RABBIT_SECURE": False, + "RABBIT_PASSWORD": "guest", + "RABBIT_HOST": "localhost", + "RABBIT_PORT": 5672, + }, + "CELERY_REDIS_RESULT_BACKEND": { + "REDIS_HOST": "localhost", + "REDIS_PORT": 6379, + }, + "CELERY_RESULT_EXPIRES": timedelta(days=1), # type: ignore[dict-item] + "CELERY_RESULT_PERSISTENT": True, + } + ], + } + ) diff --git a/packages/settings-library/src/settings_library/redis.py b/packages/settings-library/src/settings_library/redis.py index 7e3b0e7b6931..322991e9c533 100644 --- a/packages/settings-library/src/settings_library/redis.py +++ b/packages/settings-library/src/settings_library/redis.py @@ -1,6 +1,5 @@ from enum import IntEnum -from pydantic import TypeAdapter from pydantic.networks import RedisDsn from pydantic.types import SecretStr @@ -18,13 +17,14 @@ class RedisDatabase(IntEnum): DISTRIBUTED_IDENTIFIERS = 6 DEFERRED_TASKS = 7 DYNAMIC_SERVICES = 8 + CELERY_TASKS = 9 class RedisSettings(BaseCustomSettings): # host REDIS_SECURE: bool = False REDIS_HOST: str = "redis" - REDIS_PORT: PortInt = TypeAdapter(PortInt).validate_python(6789) + REDIS_PORT: PortInt = 6789 # auth REDIS_USER: str | None = None diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index c80befe23164..b244d224059b 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -95,6 +95,7 @@ services: distributed_identifiers:${REDIS_HOST}:${REDIS_PORT}:6:${REDIS_PASSWORD}, deferred_tasks:${REDIS_HOST}:${REDIS_PORT}:7:${REDIS_PASSWORD}, dynamic_services:${REDIS_HOST}:${REDIS_PORT}:8:${REDIS_PASSWORD} + celery_tasks:${REDIS_HOST}:${REDIS_PORT}:9:${REDIS_PASSWORD} # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml ports: - "18081:8081" diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 03bb9a865934..77520234af41 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -1152,7 +1152,7 @@ services: image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-latest} init: true hostname: "sto-{{.Node.Hostname}}-{{.Task.Slot}}" - environment: + environment: &storage_environment DATCORE_ADAPTER_HOST: ${DATCORE_ADAPTER_HOST:-datcore-adapter} LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED} LOG_FILTER_MAPPING : ${LOG_FILTER_MAPPING} @@ -1177,17 +1177,28 @@ services: S3_ENDPOINT: ${S3_ENDPOINT} S3_REGION: ${S3_REGION} S3_SECRET_KEY: ${S3_SECRET_KEY} + STORAGE_WORKER_MODE: "false" STORAGE_LOGLEVEL: ${STORAGE_LOGLEVEL} STORAGE_MONITORING_ENABLED: 1 STORAGE_PROFILING: ${STORAGE_PROFILING} STORAGE_PORT: ${STORAGE_PORT} TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT} TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT} - networks: + networks: &storage_networks - default - interactive_services_subnet - storage_subnet + sto-worker: + image: ${DOCKER_REGISTRY:-itisfoundation}/storage:${DOCKER_IMAGE_TAG:-master-github-latest} + init: true + hostname: "sto-worker-{{.Node.Hostname}}-{{.Task.Slot}}" + environment: + <<: *storage_environment + STORAGE_WORKER_MODE: "true" + CELERY_CONCURRENCY: 1 + networks: *storage_networks + rabbit: image: itisfoundation/rabbitmq:3.11.2-management init: true @@ -1272,7 +1283,7 @@ services: "--loglevel", "verbose", "--databases", - "9", + "10", "--appendonly", "yes", "--requirepass", diff --git a/services/storage/.env-devel b/services/storage/.env-devel new file mode 100644 index 000000000000..96d465822d39 --- /dev/null +++ b/services/storage/.env-devel @@ -0,0 +1,20 @@ +CELERY_RESULT_EXPIRES=P7D + +RABBIT_HOST=rabbit +RABBIT_PASSWORD=adminadmin +RABBIT_PORT=5672 +RABBIT_SECURE=false +RABBIT_USER=admin + +REDIS_HOST=redis +REDIS_PORT=6379 +REDIS_PASSWORD=adminadmin +REDIS_SECURE=false +REDIS_USER=null + +STORAGE_ENDPOINT=storage:8080 +STORAGE_HOST=storage +STORAGE_LOGLEVEL=INFO +STORAGE_PORT=8080 +STORAGE_PROFILING=1 +STORAGE_TRACING=null diff --git a/services/storage/Makefile b/services/storage/Makefile index 4bbab60c8a52..ef350cae0917 100644 --- a/services/storage/Makefile +++ b/services/storage/Makefile @@ -5,17 +5,14 @@ include ../../scripts/common.Makefile include ../../scripts/common-service.Makefile -.env-ignore: - $(APP_CLI_NAME) echo-dotenv > $@ - .PHONY: openapi.json openapi-specs: openapi.json -openapi.json: .env-ignore +openapi.json: .env # generating openapi specs file (need to have the environment set for this) @set -o allexport; \ source $<; \ set +o allexport; \ - python3 -c "import json; from $(APP_PACKAGE_NAME).main import *; print( json.dumps(the_app.openapi(), indent=2) )" > $@ + python3 -c "import json; from $(APP_PACKAGE_NAME).main import *; print( json.dumps(app.openapi(), indent=2) )" > $@ # validates OAS file: $@ $(call validate_openapi_specs,$@) diff --git a/services/storage/docker/boot.sh b/services/storage/docker/boot.sh index efe437aa521a..0fd1d2b4edcf 100755 --- a/services/storage/docker/boot.sh +++ b/services/storage/docker/boot.sh @@ -56,8 +56,17 @@ if [ "${SC_BOOT_MODE}" = "debug" ]; then --log-level \"${SERVER_LOG_LEVEL}\" " else - exec uvicorn simcore_service_storage.main:the_app \ - --host 0.0.0.0 \ - --port ${STORAGE_PORT} \ - --log-level "${SERVER_LOG_LEVEL}" + if [ "${STORAGE_WORKER_MODE}" = "true" ]; then + exec celery \ + --app=simcore_service_storage.modules.celery.worker_main:app \ + worker --pool=threads \ + --loglevel="${SERVER_LOG_LEVEL}" \ + --hostname="${HOSTNAME}" \ + --concurrency="${CELERY_CONCURRENCY}" + else + exec uvicorn simcore_service_storage.main:app \ + --host 0.0.0.0 \ + --port ${STORAGE_PORT} \ + --log-level "${SERVER_LOG_LEVEL}" + fi fi diff --git a/services/storage/docker/healthcheck.py b/services/storage/docker/healthcheck.py old mode 100644 new mode 100755 index b6711cd55eb6..d938c860dabf --- a/services/storage/docker/healthcheck.py +++ b/services/storage/docker/healthcheck.py @@ -20,18 +20,50 @@ import os +import socket +import subprocess import sys from urllib.request import urlopen +from simcore_service_storage.core.settings import ApplicationSettings + SUCCESS, UNHEALTHY = 0, 1 # Disabled if boots with debugger -ok = os.environ.get("SC_BOOT_MODE", "").lower() == "debug" +ok = os.getenv("SC_BOOT_MODE", "").lower() == "debug" # Queries host # pylint: disable=consider-using-with + +app_settings = ApplicationSettings.create_from_envs() + +def _is_celery_worker_healthy(): + assert app_settings.STORAGE_CELERY + broker_url = app_settings.STORAGE_CELERY.CELERY_RABBIT_BROKER.dsn + + try: + result = subprocess.run( + [ + "celery", + "--broker", + broker_url, + "inspect", + "ping", + "--destination", + "celery@" + socket.gethostname(), + ], + capture_output=True, + text=True, + check=True, + ) + return "pong" in result.stdout + except subprocess.CalledProcessError: + return False + + ok = ( ok + or (app_settings.STORAGE_WORKER_MODE and _is_celery_worker_healthy()) or urlopen( "{host}{baseurl}".format( host=sys.argv[1], baseurl=os.environ.get("SIMCORE_NODE_BASEPATH", "") diff --git a/services/storage/requirements/_base.in b/services/storage/requirements/_base.in index 0dda4d60e5a5..44da01ea7897 100644 --- a/services/storage/requirements/_base.in +++ b/services/storage/requirements/_base.in @@ -16,7 +16,9 @@ aioboto3 # s3 storage aiofiles # i/o +asgi_lifespan asyncpg # database +celery[redis] httpx opentelemetry-instrumentation-botocore packaging diff --git a/services/storage/requirements/_base.txt b/services/storage/requirements/_base.txt index b6e426f43e09..0e84c66d32ec 100644 --- a/services/storage/requirements/_base.txt +++ b/services/storage/requirements/_base.txt @@ -67,6 +67,8 @@ aiosignal==1.3.2 # via aiohttp alembic==1.14.1 # via -r requirements/../../../packages/postgres-database/requirements/_base.in +amqp==5.3.1 + # via kombu annotated-types==0.7.0 # via pydantic anyio==4.8.0 @@ -85,6 +87,8 @@ arrow==1.3.0 # -r requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in +asgi-lifespan==2.1.0 + # via -r requirements/_base.in asgiref==3.8.1 # via opentelemetry-instrumentation-asgi asyncpg==0.30.0 @@ -96,6 +100,8 @@ attrs==25.1.0 # aiohttp # jsonschema # referencing +billiard==4.2.1 + # via celery boto3==1.35.81 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt @@ -132,6 +138,8 @@ botocore==1.35.81 # s3transfer botocore-stubs==1.36.17 # via types-aiobotocore +celery==5.4.0 + # via -r requirements/_base.in certifi==2025.1.31 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt @@ -167,9 +175,19 @@ charset-normalizer==3.4.1 # via requests click==8.1.8 # via + # celery + # click-didyoumean + # click-plugins + # click-repl # rich-toolkit # typer # uvicorn +click-didyoumean==0.3.1 + # via celery +click-plugins==1.1.1 + # via celery +click-repl==0.3.0 + # via celery deprecated==1.2.18 # via # opentelemetry-api @@ -304,6 +322,8 @@ jsonschema==4.23.0 # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in jsonschema-specifications==2024.10.1 # via jsonschema +kombu==5.4.2 + # via celery mako==1.3.9 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt @@ -498,6 +518,8 @@ prometheus-client==0.21.1 # prometheus-fastapi-instrumentator prometheus-fastapi-instrumentator==7.0.2 # via -r requirements/../../../packages/service-library/requirements/_fastapi.in +prompt-toolkit==3.0.50 + # via click-repl propcache==0.2.1 # via # aiohttp @@ -642,6 +664,7 @@ python-dateutil==2.9.0.post0 # via # arrow # botocore + # celery python-dotenv==1.0.1 # via # pydantic-settings @@ -710,6 +733,7 @@ redis==5.2.1 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/aws-library/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in + # celery referencing==0.35.1 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt @@ -765,7 +789,9 @@ shellingham==1.5.4 six==1.17.0 # via python-dateutil sniffio==1.3.1 - # via anyio + # via + # anyio + # asgi-lifespan sqlalchemy==1.4.54 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt @@ -883,6 +909,10 @@ typing-extensions==4.12.2 # types-aiobotocore-ec2 # types-aiobotocore-s3 # types-aiobotocore-ssm +tzdata==2025.1 + # via + # celery + # kombu ujson==5.10.0 # via # -c requirements/../../../packages/aws-library/requirements/../../../packages/common-library/requirements/../../../requirements/constraints.txt @@ -949,8 +979,15 @@ uvicorn==0.34.0 # fastapi-cli uvloop==0.21.0 # via uvicorn +vine==5.1.0 + # via + # amqp + # celery + # kombu watchfiles==1.0.4 # via uvicorn +wcwidth==0.2.13 + # via prompt-toolkit websockets==14.2 # via uvicorn wrapt==1.17.2 diff --git a/services/storage/requirements/_test.in b/services/storage/requirements/_test.in index 09a057e4a082..0e2518897773 100644 --- a/services/storage/requirements/_test.in +++ b/services/storage/requirements/_test.in @@ -6,7 +6,6 @@ --constraint _base.txt -asgi_lifespan asyncpg-stubs coverage docker @@ -17,6 +16,7 @@ moto[server] pandas pytest pytest-asyncio +pytest-celery pytest-cov pytest-icdiff pytest-instafail diff --git a/services/storage/requirements/_test.txt b/services/storage/requirements/_test.txt index 30a299db7af7..962a86a4d564 100644 --- a/services/storage/requirements/_test.txt +++ b/services/storage/requirements/_test.txt @@ -11,6 +11,10 @@ aiosignal==1.3.2 # via # -c requirements/_base.txt # aiohttp +amqp==5.3.1 + # via + # -c requirements/_base.txt + # kombu annotated-types==0.7.0 # via # -c requirements/_base.txt @@ -21,8 +25,6 @@ anyio==4.8.0 # via # -c requirements/_base.txt # httpx -asgi-lifespan==2.1.0 - # via -r requirements/_test.in asyncpg==0.30.0 # via # -c requirements/_base.txt @@ -39,6 +41,10 @@ aws-sam-translator==1.95.0 # via cfn-lint aws-xray-sdk==2.14.0 # via moto +billiard==4.2.1 + # via + # -c requirements/_base.txt + # celery blinker==1.9.0 # via flask boto3==1.35.81 @@ -54,6 +60,10 @@ botocore==1.35.81 # boto3 # moto # s3transfer +celery==5.4.0 + # via + # -c requirements/_base.txt + # pytest-celery certifi==2025.1.31 # via # -c requirements/../../../requirements/constraints.txt @@ -73,7 +83,23 @@ charset-normalizer==3.4.1 click==8.1.8 # via # -c requirements/_base.txt + # celery + # click-didyoumean + # click-plugins + # click-repl # flask +click-didyoumean==0.3.1 + # via + # -c requirements/_base.txt + # celery +click-plugins==1.1.1 + # via + # -c requirements/_base.txt + # celery +click-repl==0.3.0 + # via + # -c requirements/_base.txt + # celery coverage==7.6.12 # via # -r requirements/_test.in @@ -83,10 +109,14 @@ cryptography==44.0.2 # -c requirements/../../../requirements/constraints.txt # joserfc # moto +debugpy==1.8.12 + # via pytest-celery docker==7.1.0 # via # -r requirements/_test.in # moto + # pytest-celery + # pytest-docker-tools faker==36.1.1 # via -r requirements/_test.in fakeredis==2.27.0 @@ -172,6 +202,10 @@ jsonschema-specifications==2024.10.1 # -c requirements/_base.txt # jsonschema # openapi-schema-validator +kombu==5.4.2 + # via + # -c requirements/_base.txt + # celery lazy-object-proxy==1.10.0 # via openapi-spec-validator lupa==2.4 @@ -219,11 +253,19 @@ ply==3.11 # via jsonpath-ng pprintpp==0.4.0 # via pytest-icdiff +prompt-toolkit==3.0.50 + # via + # -c requirements/_base.txt + # click-repl propcache==0.2.1 # via # -c requirements/_base.txt # aiohttp # yarl +psutil==6.1.1 + # via + # -c requirements/_base.txt + # pytest-celery py-partiql-parser==0.5.6 # via moto pycparser==2.22 @@ -244,6 +286,7 @@ pytest==8.3.5 # -r requirements/_test.in # pytest-asyncio # pytest-cov + # pytest-docker-tools # pytest-icdiff # pytest-instafail # pytest-mock @@ -252,8 +295,12 @@ pytest-asyncio==0.23.8 # via # -c requirements/../../../requirements/constraints.txt # -r requirements/_test.in +pytest-celery==1.1.3 + # via -r requirements/_test.in pytest-cov==6.0.0 # via -r requirements/_test.in +pytest-docker-tools==3.1.3 + # via pytest-celery pytest-icdiff==0.9 # via -r requirements/_test.in pytest-instafail==0.5.0 @@ -268,6 +315,7 @@ python-dateutil==2.9.0.post0 # via # -c requirements/_base.txt # botocore + # celery # moto # pandas # simcore-service-storage-sdk @@ -323,6 +371,9 @@ s3transfer==0.10.4 # -c requirements/_base.txt # boto3 setuptools==75.8.2 + # via + # moto + # pytest-celery # via moto simcore-service-storage-sdk @ git+https://github.com/ITISFoundation/osparc-simcore.git@cfdf4f86d844ebb362f4f39e9c6571d561b72897#subdirectory=services/storage/client-sdk/python # via -r requirements/_test.in @@ -336,7 +387,6 @@ sniffio==1.3.1 # via # -c requirements/_base.txt # anyio - # asgi-lifespan sortedcontainers==2.4.0 # via fakeredis sqlalchemy==1.4.54 @@ -348,6 +398,10 @@ sqlalchemy2-stubs==0.0.2a38 # via sqlalchemy sympy==1.13.3 # via cfn-lint +tenacity==9.0.0 + # via + # -c requirements/_base.txt + # pytest-celery termcolor==2.5.0 # via pytest-sugar types-aiofiles==24.1.0.20241221 @@ -365,7 +419,10 @@ typing-extensions==4.12.2 # sqlalchemy2-stubs tzdata==2025.1 # via + # -c requirements/_base.txt + # celery # faker + # kombu # pandas urllib3==2.3.0 # via @@ -376,6 +433,16 @@ urllib3==2.3.0 # requests # responses # simcore-service-storage-sdk +vine==5.1.0 + # via + # -c requirements/_base.txt + # amqp + # celery + # kombu +wcwidth==0.2.13 + # via + # -c requirements/_base.txt + # prompt-toolkit werkzeug==3.1.3 # via # flask diff --git a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py index c4aed1aa65d8..f901502dda8d 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_async_jobs.py @@ -1,6 +1,4 @@ # pylint: disable=unused-argument -from datetime import datetime -from uuid import uuid4 from fastapi import FastAPI from models_library.api_schemas_rpc_async_jobs.async_jobs import ( @@ -15,9 +13,11 @@ ResultError, StatusError, ) -from models_library.progress_bar import ProgressReport from servicelib.rabbitmq import RPCRouter +from ...modules.celery import get_celery_client +from ...modules.celery.models import TaskStatus + router = RPCRouter() @@ -36,13 +36,15 @@ async def get_status( ) -> AsyncJobStatus: assert app # nosec assert job_id_data # nosec - progress_report = ProgressReport(actual_value=0.5, total=1.0, attempt=1) + + task_status: TaskStatus = await get_celery_client(app).get_task_status( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) return AsyncJobStatus( job_id=job_id, - progress=progress_report, - done=False, - started=datetime.now(), - stopped=None, + progress=task_status.progress_report, + done=task_status.is_done, ) @@ -53,7 +55,13 @@ async def get_result( assert app # nosec assert job_id # nosec assert job_id_data # nosec - return AsyncJobResult(result="Here's your result.", error=None) + + result = await get_celery_client(app).get_task_result( + task_context=job_id_data.model_dump(), + task_uuid=job_id, + ) + + return AsyncJobResult(result=result, error=None) @router.expose() @@ -61,4 +69,9 @@ async def list_jobs( app: FastAPI, filter_: str, job_id_data: AsyncJobNameData ) -> list[AsyncJobGet]: assert app # nosec - return [AsyncJobGet(job_id=AsyncJobId(f"{uuid4()}"))] + + task_uuids = await get_celery_client(app).get_task_uuids( + task_context=job_id_data.model_dump(), + ) + + return [AsyncJobGet(job_id=task_uuid) for task_uuid in task_uuids] diff --git a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py index 7e31bddce38d..424fbc2f0d0d 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/_data_export.py +++ b/services/storage/src/simcore_service_storage/api/rpc/_data_export.py @@ -1,9 +1,6 @@ -from uuid import uuid4 - from fastapi import FastAPI from models_library.api_schemas_rpc_async_jobs.async_jobs import ( AsyncJobGet, - AsyncJobId, AsyncJobNameData, ) from models_library.api_schemas_storage.data_export_async_jobs import ( @@ -17,6 +14,7 @@ from ...datcore_dsm import DatCoreDataManager from ...dsm import get_dsm_provider from ...exceptions.errors import FileAccessRightError +from ...modules.celery import get_celery_client from ...modules.datcore_adapter.datcore_adapter_exceptions import DatcoreAdapterError from ...simcore_s3_dsm import SimcoreS3DataManager @@ -53,6 +51,12 @@ async def start_data_export( location_id=data_export_start.location_id, ) from err + task_uuid = await get_celery_client(app).send_task( + "export_data_with_error", + task_context=job_id_data.model_dump(), + files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature + ) + return AsyncJobGet( - job_id=AsyncJobId(f"{uuid4()}"), + job_id=task_uuid, ) diff --git a/services/storage/src/simcore_service_storage/core/application.py b/services/storage/src/simcore_service_storage/core/application.py index 265e357d6054..13082a00fb06 100644 --- a/services/storage/src/simcore_service_storage/core/application.py +++ b/services/storage/src/simcore_service_storage/core/application.py @@ -83,14 +83,16 @@ def create_app(settings: ApplicationSettings) -> FastAPI: setup_client_session(app) setup_rabbitmq(app) - setup_rpc_api_routes(app) + if not settings.STORAGE_WORKER_MODE: + setup_rpc_api_routes(app) setup_rest_api_long_running_tasks_for_uploads(app) setup_rest_api_routes(app, API_VTAG) set_exception_handlers(app) + setup_redis(app) + setup_dsm(app) - if settings.STORAGE_CLEANER_INTERVAL_S: - setup_redis(app) + if settings.STORAGE_CLEANER_INTERVAL_S and not settings.STORAGE_WORKER_MODE: setup_dsm_cleaner(app) if settings.STORAGE_PROFILING: diff --git a/services/storage/src/simcore_service_storage/core/settings.py b/services/storage/src/simcore_service_storage/core/settings.py index f2a5c62d16ea..66142a82b3ee 100644 --- a/services/storage/src/simcore_service_storage/core/settings.py +++ b/services/storage/src/simcore_service_storage/core/settings.py @@ -1,17 +1,11 @@ from typing import Annotated, Self from fastapi import FastAPI -from pydantic import ( - AliasChoices, - Field, - PositiveInt, - TypeAdapter, - field_validator, - model_validator, -) +from pydantic import AliasChoices, Field, PositiveInt, field_validator, model_validator from servicelib.logging_utils_filtering import LoggerName, MessageSubstring from settings_library.application import BaseApplicationSettings from settings_library.basic_types import LogLevel, PortInt +from settings_library.celery import CelerySettings from settings_library.postgres import PostgresSettings from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings @@ -24,7 +18,7 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): STORAGE_HOST: str = "0.0.0.0" # nosec - STORAGE_PORT: PortInt = TypeAdapter(PortInt).validate_python(8080) + STORAGE_PORT: PortInt = 8080 LOG_LEVEL: Annotated[ LogLevel, @@ -36,63 +30,91 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): STORAGE_MONITORING_ENABLED: bool = False STORAGE_PROFILING: bool = False - STORAGE_POSTGRES: PostgresSettings | None = Field( - json_schema_extra={"auto_default_from_env": True} - ) - - STORAGE_REDIS: RedisSettings | None = Field( - json_schema_extra={"auto_default_from_env": True} - ) - - STORAGE_S3: S3Settings | None = Field( - json_schema_extra={"auto_default_from_env": True} - ) - - STORAGE_TRACING: TracingSettings | None = Field( - json_schema_extra={"auto_default_from_env": True} - ) - - DATCORE_ADAPTER: DatcoreAdapterSettings = Field( - json_schema_extra={"auto_default_from_env": True} - ) - - STORAGE_SYNC_METADATA_TIMEOUT: PositiveInt = Field( - 180, description="Timeout (seconds) for metadata sync task" - ) - - STORAGE_DEFAULT_PRESIGNED_LINK_EXPIRATION_SECONDS: int = Field( - 3600, description="Default expiration time in seconds for presigned links" - ) - - STORAGE_CLEANER_INTERVAL_S: int | None = Field( - 30, - description="Interval in seconds when task cleaning pending uploads runs. setting to NULL disables the cleaner.", - ) - - STORAGE_RABBITMQ: RabbitSettings | None = Field( - json_schema_extra={"auto_default_from_env": True}, - ) - - STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY: int = Field( - 4, - description="Maximal amount of threads used by underlying S3 client to transfer data to S3 backend", - ) - - STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field( - default=False, - validation_alias=AliasChoices( - "STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED", - "LOG_FORMAT_LOCAL_DEV_ENABLED", + STORAGE_POSTGRES: Annotated[ + PostgresSettings | None, + Field(json_schema_extra={"auto_default_from_env": True}), + ] + + STORAGE_REDIS: Annotated[ + RedisSettings | None, Field(json_schema_extra={"auto_default_from_env": True}) + ] + + STORAGE_S3: Annotated[ + S3Settings | None, Field(json_schema_extra={"auto_default_from_env": True}) + ] + + STORAGE_CELERY: Annotated[ + CelerySettings | None, Field(json_schema_extra={"auto_default_from_env": True}) + ] + + STORAGE_TRACING: Annotated[ + TracingSettings | None, Field(json_schema_extra={"auto_default_from_env": True}) + ] + + DATCORE_ADAPTER: Annotated[ + DatcoreAdapterSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] + + STORAGE_SYNC_METADATA_TIMEOUT: Annotated[ + PositiveInt, Field(180, description="Timeout (seconds) for metadata sync task") + ] + + STORAGE_DEFAULT_PRESIGNED_LINK_EXPIRATION_SECONDS: Annotated[ + int, + Field( + 3600, description="Default expiration time in seconds for presigned links" + ), + ] + + STORAGE_CLEANER_INTERVAL_S: Annotated[ + int | None, + Field( + 30, + description="Interval in seconds when task cleaning pending uploads runs. setting to NULL disables the cleaner.", ), - description="Enables local development _logger format. WARNING: make sure it is disabled if you want to have structured logs!", - ) - STORAGE_LOG_FILTER_MAPPING: dict[LoggerName, list[MessageSubstring]] = Field( - default_factory=dict, - validation_alias=AliasChoices( - "STORAGE_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING" + ] + + STORAGE_RABBITMQ: Annotated[ + RabbitSettings | None, + Field( + json_schema_extra={"auto_default_from_env": True}, + ) + ] + + STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY: Annotated[ + int, + Field( + 4, + description="Maximal amount of threads used by underlying S3 client to transfer data to S3 backend", ), - description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of _logger message patterns that should be filtered out.", - ) + ] + + STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED: Annotated[ + bool, + Field( + default=False, + validation_alias=AliasChoices( + "STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED", + "LOG_FORMAT_LOCAL_DEV_ENABLED", + ), + description="Enables local development _logger format. WARNING: make sure it is disabled if you want to have structured logs!", + ), + ] + + STORAGE_LOG_FILTER_MAPPING: Annotated[ + dict[LoggerName, list[MessageSubstring]], + Field( + default_factory=dict, + validation_alias=AliasChoices( + "STORAGE_LOG_FILTER_MAPPING", "LOG_FILTER_MAPPING" + ), + description="is a dictionary that maps specific loggers (such as 'uvicorn.access' or 'gunicorn.access') to a list of _logger message patterns that should be filtered out.", + ), + ] + + STORAGE_WORKER_MODE: Annotated[ + bool, Field(description="If True, run as a worker") + ] = False @field_validator("LOG_LEVEL", mode="before") @classmethod diff --git a/services/storage/src/simcore_service_storage/main.py b/services/storage/src/simcore_service_storage/main.py index 7af3c3605d2f..f0639c753685 100644 --- a/services/storage/src/simcore_service_storage/main.py +++ b/services/storage/src/simcore_service_storage/main.py @@ -2,22 +2,26 @@ import logging -from fastapi import FastAPI from servicelib.logging_utils import config_all_loggers -from simcore_service_storage.core.application import create_app -from simcore_service_storage.core.settings import ApplicationSettings +from simcore_service_storage.modules.celery import setup_celery -_the_settings = ApplicationSettings.create_from_envs() +from .core.application import create_app +from .core.settings import ApplicationSettings + +_settings = ApplicationSettings.create_from_envs() # SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148 -logging.basicConfig(level=_the_settings.log_level) # NOSONAR -logging.root.setLevel(_the_settings.log_level) +logging.basicConfig(level=_settings.log_level) # NOSONAR +logging.root.setLevel(_settings.log_level) config_all_loggers( - log_format_local_dev_enabled=_the_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED, - logger_filter_mapping=_the_settings.STORAGE_LOG_FILTER_MAPPING, - tracing_settings=_the_settings.STORAGE_TRACING, + log_format_local_dev_enabled=_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED, + logger_filter_mapping=_settings.STORAGE_LOG_FILTER_MAPPING, + tracing_settings=_settings.STORAGE_TRACING, ) +_logger = logging.getLogger(__name__) + +fastapi_app = create_app(_settings) +setup_celery(fastapi_app) -# SINGLETON FastAPI app -the_app: FastAPI = create_app(_the_settings) +app = fastapi_app diff --git a/services/storage/src/simcore_service_storage/modules/celery/__init__.py b/services/storage/src/simcore_service_storage/modules/celery/__init__.py new file mode 100644 index 000000000000..2852b74eab40 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/__init__.py @@ -0,0 +1,36 @@ +import logging +from asyncio import AbstractEventLoop + +from fastapi import FastAPI +from simcore_service_storage.modules.celery._common import create_app +from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient + +from ...core.settings import get_application_settings + +_logger = logging.getLogger(__name__) + + +def setup_celery(app: FastAPI) -> None: + async def on_startup() -> None: + celery_settings = get_application_settings(app).STORAGE_CELERY + assert celery_settings # nosec + celery_app = create_app(celery_settings) + app.state.celery_client = CeleryTaskQueueClient(celery_app) + + app.add_event_handler("startup", on_startup) + + +def get_celery_client(app: FastAPI) -> CeleryTaskQueueClient: + celery_client = app.state.celery_client + assert isinstance(celery_client, CeleryTaskQueueClient) + return celery_client + + +def get_event_loop(app: FastAPI) -> AbstractEventLoop: + event_loop = app.state.event_loop + assert isinstance(event_loop, AbstractEventLoop) + return event_loop + + +def set_event_loop(app: FastAPI, event_loop: AbstractEventLoop) -> None: + app.state.event_loop = event_loop diff --git a/services/storage/src/simcore_service_storage/modules/celery/_common.py b/services/storage/src/simcore_service_storage/modules/celery/_common.py new file mode 100644 index 000000000000..ae5979d4f1fb --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/_common.py @@ -0,0 +1,67 @@ +from collections.abc import Callable +from functools import wraps +import logging +import traceback +from typing import Any + +from celery import Celery, Task # type: ignore[import-untyped] +from celery.exceptions import Ignore # type: ignore[import-untyped] +from celery.contrib.abortable import AbortableTask # type: ignore[import-untyped] +from settings_library.celery import CelerySettings +from settings_library.redis import RedisDatabase + +from .models import TaskError, TaskState + +_logger = logging.getLogger(__name__) + + +def create_app(celery_settings: CelerySettings) -> Celery: + assert celery_settings + + app = Celery( + broker=celery_settings.CELERY_RABBIT_BROKER.dsn, + backend=celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS, + ), + ) + app.conf.result_expires = celery_settings.CELERY_RESULT_EXPIRES + app.conf.result_extended = True # original args are included in the results + app.conf.result_serializer = "json" + app.conf.task_track_started = True + return app + + +def error_handling(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper(task: Task, *args: Any, **kwargs: Any) -> Any: + try: + return func(task, *args, **kwargs) + except Exception as exc: + exc_type = type(exc).__name__ + exc_message = f"{exc}" + exc_traceback = traceback.format_exc().split('\n') + + _logger.exception( + "Task %s failed with exception: %s", + task.request.id, + exc_message, + ) + + task.update_state( + state=TaskState.ERROR.upper(), + meta=TaskError( + exc_type=exc_type, + exc_msg=exc_message, + ).model_dump(mode="json"), + traceback=exc_traceback + ) + raise Ignore from exc # ignore doing state updates + return wrapper + + +def define_task(app: Celery, fn: Callable, task_name: str | None = None): + app.task( + name=task_name or fn.__name__, + bind=True, + base=AbortableTask, + )(error_handling(fn)) diff --git a/services/storage/src/simcore_service_storage/modules/celery/client.py b/services/storage/src/simcore_service_storage/modules/celery/client.py new file mode 100644 index 000000000000..300aadb66b2c --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/client.py @@ -0,0 +1,143 @@ +import contextlib +import logging +from typing import Any, Final +from uuid import uuid4 + +from celery import Celery # type: ignore[import-untyped] +from celery.contrib.abortable import AbortableAsyncResult # type: ignore[import-untyped] +from common_library.async_tools import make_async +from models_library.progress_bar import ProgressReport +from pydantic import ValidationError +from servicelib.logging_utils import log_context + +from .models import TaskContext, TaskID, TaskState, TaskStatus, TaskUUID + +_logger = logging.getLogger(__name__) + +_CELERY_INSPECT_TASK_STATUSES: Final[tuple[str, ...]] = ( + "active", + "registered", + "scheduled", + "revoked", +) +_CELERY_TASK_META_PREFIX: Final[str] = "celery-task-meta-" +_CELERY_STATES_MAPPING: Final[dict[str, TaskState]] = { + "PENDING": TaskState.PENDING, + "STARTED": TaskState.PENDING, + "RETRY": TaskState.PENDING, + "RUNNING": TaskState.RUNNING, + "SUCCESS": TaskState.SUCCESS, + "ABORTED": TaskState.ABORTED, + "FAILURE": TaskState.ERROR, + "ERROR": TaskState.ERROR, +} +_CELERY_TASK_ID_KEY_SEPARATOR: Final[str] = ":" +_CELERY_TASK_ID_KEY_ENCODING = "utf-8" + +_MIN_PROGRESS_VALUE = 0.0 +_MAX_PROGRESS_VALUE = 100.0 + + +def _build_context_prefix(task_context: TaskContext) -> list[str]: + return [f"{task_context[key]}" for key in sorted(task_context)] + + +def _build_task_id_prefix(task_context: TaskContext) -> str: + return _CELERY_TASK_ID_KEY_SEPARATOR.join(_build_context_prefix(task_context)) + + +def _build_task_id(task_context: TaskContext, task_uuid: TaskUUID) -> TaskID: + return _CELERY_TASK_ID_KEY_SEPARATOR.join([_build_task_id_prefix(task_context), f"{task_uuid}"]) + + +class CeleryTaskQueueClient: + def __init__(self, celery_app: Celery): + self._celery_app = celery_app + + @make_async() + def send_task( + self, task_name: str, *, task_context: TaskContext, **task_params + ) -> TaskUUID: + task_uuid = uuid4() + task_id = _build_task_id(task_context, task_uuid) + with log_context( + _logger, + logging.DEBUG, + msg=f"Submitting task {task_name}: {task_id=} {task_params=}", + ): + self._celery_app.send_task(task_name, task_id=task_id, kwargs=task_params) + return task_uuid + + @make_async() + def abort_task( # pylint: disable=R6301 + self, task_context: TaskContext, task_uuid: TaskUUID + ) -> None: + task_id = _build_task_id(task_context, task_uuid) + _logger.info("Aborting task %s", task_id) + AbortableAsyncResult(task_id).abort() + + @make_async() + def get_task_result(self, task_context: TaskContext, task_uuid: TaskUUID) -> Any: + task_id = _build_task_id(task_context, task_uuid) + return self._celery_app.AsyncResult(task_id).result + + def _get_progress_report( + self, task_context: TaskContext, task_uuid: TaskUUID + ) -> ProgressReport: + task_id = _build_task_id(task_context, task_uuid) + result = self._celery_app.AsyncResult(task_id).result + state = self._get_state(task_context, task_uuid) + if result and state == TaskState.RUNNING: + with contextlib.suppress(ValidationError): + # avoids exception if result is not a ProgressReport (or overwritten by a Celery's state update) + return ProgressReport.model_validate(result) + if state in ( + TaskState.ABORTED, + TaskState.ERROR, + TaskState.SUCCESS, + ): + return ProgressReport(actual_value=_MAX_PROGRESS_VALUE) + return ProgressReport(actual_value=_MIN_PROGRESS_VALUE) + + def _get_state(self, task_context: TaskContext, task_uuid: TaskUUID) -> TaskState: + task_id = _build_task_id(task_context, task_uuid) + return _CELERY_STATES_MAPPING[self._celery_app.AsyncResult(task_id).state] + + @make_async() + def get_task_status( + self, task_context: TaskContext, task_uuid: TaskUUID + ) -> TaskStatus: + return TaskStatus( + task_uuid=task_uuid, + task_state=self._get_state(task_context, task_uuid), + progress_report=self._get_progress_report(task_context, task_uuid), + ) + + def _get_completed_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]: + search_key = ( + _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context) + ) + redis = self._celery_app.backend.client + if hasattr(redis, "keys") and (keys := redis.keys(search_key + "*")): + return { + TaskUUID(f"{key.decode(_CELERY_TASK_ID_KEY_ENCODING).removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR)}") + for key in keys + } + return set() + + @make_async() + def get_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]: + all_task_ids = self._get_completed_task_uuids(task_context) + + search_key = ( + _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context) + ) + for task_inspect_status in _CELERY_INSPECT_TASK_STATUSES: + if task_ids := getattr( + self._celery_app.control.inspect(), task_inspect_status + )(): + for values in task_ids.values(): + for value in values: + all_task_ids.add(TaskUUID(value.removeprefix(search_key + _CELERY_TASK_ID_KEY_SEPARATOR))) + + return all_task_ids diff --git a/services/storage/src/simcore_service_storage/modules/celery/models.py b/services/storage/src/simcore_service_storage/modules/celery/models.py new file mode 100644 index 000000000000..2f04c5b81329 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/models.py @@ -0,0 +1,57 @@ +from enum import StrEnum, auto +from typing import Any, Final, Self, TypeAlias +from uuid import UUID + +from models_library.progress_bar import ProgressReport +from pydantic import BaseModel, model_validator + +TaskContext: TypeAlias = dict[str, Any] +TaskID: TypeAlias = str +TaskUUID: TypeAlias = UUID + +_MIN_PROGRESS: Final[float] = 0.0 +_MAX_PROGRESS: Final[float] = 100.0 + + +class TaskState(StrEnum): + PENDING = auto() + RUNNING = auto() + SUCCESS = auto() + ERROR = auto() + ABORTED = auto() + + +_TASK_DONE = {TaskState.SUCCESS, TaskState.ERROR, TaskState.ABORTED} + + +class TaskStatus(BaseModel): + task_uuid: TaskUUID + task_state: TaskState + progress_report: ProgressReport + + @property + def is_done(self) -> bool: + return self.task_state in _TASK_DONE + + @model_validator(mode="after") + def _check_consistency(self) -> Self: + value = self.progress_report.actual_value + + valid_states = { + TaskState.PENDING: value == _MIN_PROGRESS, + TaskState.RUNNING: _MIN_PROGRESS <= value <= _MAX_PROGRESS, + TaskState.SUCCESS: value == _MAX_PROGRESS, + TaskState.ABORTED: value == _MAX_PROGRESS, + TaskState.ERROR: value == _MAX_PROGRESS, + } + + if not valid_states.get(self.task_state, True): + msg = f"Inconsistent progress actual value for state={self.task_state}: {value}" + raise ValueError(msg) + + return self + + +class TaskError(BaseModel): + exc_type: str + exc_msg: str diff --git a/services/storage/src/simcore_service_storage/modules/celery/signals.py b/services/storage/src/simcore_service_storage/modules/celery/signals.py new file mode 100644 index 000000000000..f2186c519729 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/signals.py @@ -0,0 +1,70 @@ +import asyncio +import logging +import threading +from typing import Final + +from asgi_lifespan import LifespanManager +from celery import Celery # type: ignore[import-untyped] +from fastapi import FastAPI +from servicelib.async_utils import cancel_wait_task +from ...core.application import create_app +from ...core.settings import ApplicationSettings +from ...modules.celery import get_event_loop, set_event_loop +from ...modules.celery.utils import ( + get_fastapi_app, + set_celery_worker, + set_fastapi_app, +) +from ...modules.celery.worker import CeleryTaskQueueWorker + +_logger = logging.getLogger(__name__) + +_LIFESPAN_TIMEOUT: Final[int] = 10 + + +def on_worker_init(sender, **_kwargs): + def _init_fastapi(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + shutdown_event = asyncio.Event() + + fastapi_app = create_app(ApplicationSettings.create_from_envs()) + + async def lifespan(): + async with LifespanManager( + fastapi_app, + startup_timeout=_LIFESPAN_TIMEOUT, + shutdown_timeout=_LIFESPAN_TIMEOUT, + ): + try: + await shutdown_event.wait() + except asyncio.CancelledError: + _logger.warning("Lifespan task cancelled") + + lifespan_task = loop.create_task(lifespan()) + fastapi_app.state.lifespan_task = lifespan_task + fastapi_app.state.shutdown_event = shutdown_event + set_event_loop(fastapi_app, loop) + + set_fastapi_app(sender.app, fastapi_app) + set_celery_worker(sender.app, CeleryTaskQueueWorker(sender.app)) + + loop.run_forever() + + thread = threading.Thread(target=_init_fastapi, daemon=True) + thread.start() + + +def on_worker_shutdown(sender, **_kwargs): + assert isinstance(sender.app, Celery) + + fastapi_app = get_fastapi_app(sender.app) + assert isinstance(fastapi_app, FastAPI) + event_loop = get_event_loop(fastapi_app) + + async def shutdown(): + fastapi_app.state.shutdown_event.set() + + await cancel_wait_task(fastapi_app.state.lifespan_task, max_delay=5) + + asyncio.run_coroutine_threadsafe(shutdown(), event_loop) diff --git a/services/storage/src/simcore_service_storage/modules/celery/tasks.py b/services/storage/src/simcore_service_storage/modules/celery/tasks.py new file mode 100644 index 000000000000..b58a09a69361 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/tasks.py @@ -0,0 +1,30 @@ +import logging +import time + + +from celery import Task # type: ignore[import-untyped] +from models_library.progress_bar import ProgressReport +from models_library.projects_nodes_io import StorageFileID +from servicelib.logging_utils import log_context + +from .utils import get_celery_worker + +_logger = logging.getLogger(__name__) + + +def export_data(task: Task, files: list[StorageFileID]): + _logger.info("Exporting files: %s", files) + for n, file in enumerate(files, start=1): + with log_context( + _logger, + logging.INFO, + msg=f"Exporting {file=} ({n}/{len(files)})", + ): + assert task.name + get_celery_worker(task.app).set_task_progress( + task_name=task.name, + task_id=task.request.id, + report=ProgressReport(actual_value=n / len(files) * 100), + ) + time.sleep(10) + return "done" diff --git a/services/storage/src/simcore_service_storage/modules/celery/utils.py b/services/storage/src/simcore_service_storage/modules/celery/utils.py new file mode 100644 index 000000000000..5f5186548204 --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/utils.py @@ -0,0 +1,27 @@ +from celery import Celery # type: ignore[import-untyped] +from fastapi import FastAPI + +from .worker import CeleryTaskQueueWorker + +_WORKER_KEY = "celery_worker" +_FASTAPI_APP_KEY = "fastapi_app" + + +def get_celery_worker(celery_app: Celery) -> CeleryTaskQueueWorker: + worker = celery_app.conf[_WORKER_KEY] + assert isinstance(worker, CeleryTaskQueueWorker) + return worker + + +def get_fastapi_app(celery_app: Celery) -> FastAPI: + fastapi_app = celery_app.conf[_FASTAPI_APP_KEY] + assert isinstance(fastapi_app, FastAPI) + return fastapi_app + + +def set_celery_worker(celery_app: Celery, worker: CeleryTaskQueueWorker) -> None: + celery_app.conf[_WORKER_KEY] = worker + + +def set_fastapi_app(celery_app: Celery, fastapi_app: FastAPI) -> None: + celery_app.conf[_FASTAPI_APP_KEY] = fastapi_app diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker.py b/services/storage/src/simcore_service_storage/modules/celery/worker.py new file mode 100644 index 000000000000..36456d887b5a --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/worker.py @@ -0,0 +1,28 @@ +import logging + +from celery import Celery # type: ignore[import-untyped] +from models_library.progress_bar import ProgressReport +from servicelib.logging_utils import log_context + +from .models import TaskID + +_logger = logging.getLogger(__name__) + + +class CeleryTaskQueueWorker: + def __init__(self, celery_app: Celery) -> None: + self.celery_app = celery_app + + def set_task_progress( + self, task_name: str, task_id: TaskID, report: ProgressReport + ) -> None: + with log_context( + _logger, + logging.DEBUG, + msg=f"Setting progress for {task_name}: {report.model_dump_json()}", + ): + self.celery_app.tasks[task_name].update_state( + task_id=task_id, + state="RUNNING", + meta=report.model_dump(mode="json"), + ) diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py new file mode 100644 index 000000000000..16bc216b74ad --- /dev/null +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -0,0 +1,33 @@ +"""Main application to be deployed in for example uvicorn.""" + +import logging + +from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped] +from servicelib.logging_utils import config_all_loggers +from simcore_service_storage.modules.celery.signals import ( + on_worker_init, + on_worker_shutdown, +) + +from ...core.settings import ApplicationSettings +from ._common import create_app as create_celery_app, define_task +from .tasks import export_data + +_settings = ApplicationSettings.create_from_envs() + +logging.basicConfig(level=_settings.log_level) # NOSONAR +logging.root.setLevel(_settings.log_level) +config_all_loggers( + log_format_local_dev_enabled=_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED, + logger_filter_mapping=_settings.STORAGE_LOG_FILTER_MAPPING, + tracing_settings=_settings.STORAGE_TRACING, +) + +_logger = logging.getLogger(__name__) + +assert _settings.STORAGE_CELERY +app = create_celery_app(_settings.STORAGE_CELERY) +worker_init.connect(on_worker_init) +worker_shutdown.connect(on_worker_shutdown) + +define_task(app, export_data) diff --git a/services/storage/tests/unit/modules/celery/conftest.py b/services/storage/tests/unit/modules/celery/conftest.py new file mode 100644 index 000000000000..3cd06195b286 --- /dev/null +++ b/services/storage/tests/unit/modules/celery/conftest.py @@ -0,0 +1,93 @@ +from collections.abc import Callable, Iterable +from datetime import timedelta +from typing import Any + +import pytest +from celery import Celery +from celery.contrib.testing.worker import TestWorkController, start_worker +from celery.signals import worker_init, worker_shutdown +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict +from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient +from simcore_service_storage.modules.celery.signals import ( + on_worker_init, + on_worker_shutdown, +) +from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker + + +@pytest.fixture +def app_environment( + monkeypatch: pytest.MonkeyPatch, + app_environment: EnvVarsDict, +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + **app_environment, + "SC_BOOT_MODE": "local-development", + "RABBIT_HOST": "localhost", + "RABBIT_PORT": "5672", + "RABBIT_USER": "mock", + "RABBIT_SECURE": True, + "RABBIT_PASSWORD": "", + }, + ) + + +@pytest.fixture +def celery_conf() -> dict[str, Any]: + return { + "broker_url": "memory://", + "result_backend": "cache+memory://", + "result_expires": timedelta(days=7), + "result_extended": True, + "pool": "threads", + } + + +@pytest.fixture +def celery_app(celery_conf: dict[str, Any]): + return Celery(**celery_conf) + + +@pytest.fixture +def register_celery_tasks() -> Callable[[Celery], None]: + msg = "please define a callback that registers the tasks" + raise NotImplementedError(msg) + + +@pytest.fixture +def celery_client( + app_environment: EnvVarsDict, celery_app: Celery +) -> CeleryTaskQueueClient: + return CeleryTaskQueueClient(celery_app) + + +@pytest.fixture +def celery_worker_controller( + app_environment: EnvVarsDict, + register_celery_tasks: Callable[[Celery], None], + celery_app: Celery, +) -> Iterable[TestWorkController]: + + # Signals must be explicitily connected + worker_init.connect(on_worker_init) + worker_shutdown.connect(on_worker_shutdown) + + register_celery_tasks(celery_app) + + with start_worker(celery_app, loglevel="info", perform_ping_check=False) as worker: + worker_init.send(sender=worker) + + yield worker + + worker_shutdown.send(sender=worker) + + +@pytest.fixture +def celery_worker( + celery_worker_controller: TestWorkController, +) -> CeleryTaskQueueWorker: + assert isinstance(celery_worker_controller.app, Celery) + return CeleryTaskQueueWorker(celery_worker_controller.app) diff --git a/services/storage/tests/unit/modules/celery/test_celery.py b/services/storage/tests/unit/modules/celery/test_celery.py new file mode 100644 index 000000000000..99c3cc34263a --- /dev/null +++ b/services/storage/tests/unit/modules/celery/test_celery.py @@ -0,0 +1,165 @@ +import asyncio +import logging +import time +from collections.abc import Callable +from random import randint + +from pydantic import TypeAdapter, ValidationError +import pytest +from celery import Celery, Task +from celery.contrib.abortable import AbortableTask +from common_library.errors_classes import OsparcErrorMixin +from models_library.progress_bar import ProgressReport +from servicelib.logging_utils import log_context +from simcore_service_storage.modules.celery import get_event_loop +from simcore_service_storage.modules.celery._common import define_task +from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient +from simcore_service_storage.modules.celery.models import TaskContext, TaskError, TaskState +from simcore_service_storage.modules.celery.utils import ( + get_celery_worker, + get_fastapi_app, +) +from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed + +_logger = logging.getLogger(__name__) + + +async def _async_archive( + celery_app: Celery, task_name: str, task_id: str, files: list[str] +) -> str: + worker = get_celery_worker(celery_app) + + def sleep_for(seconds: float) -> None: + time.sleep(seconds) + + for n, file in enumerate(files, start=1): + with log_context(_logger, logging.INFO, msg=f"Processing file {file}"): + worker.set_task_progress( + task_name=task_name, + task_id=task_id, + report=ProgressReport(actual_value=n / len(files) * 10), + ) + await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1) + + return "archive.zip" + + +def sync_archive(task: Task, files: list[str]) -> str: + assert task.name + _logger.info("Calling async_archive") + return asyncio.run_coroutine_threadsafe( + _async_archive(task.app, task.name, task.request.id, files), + get_event_loop(get_fastapi_app(task.app)), + ).result() + + +class MyError(OsparcErrorMixin, Exception): + msg_template = "Something strange happened: {msg}" + + +def failure_task(task: Task): + msg = "BOOM!" + raise MyError(msg=msg) + + +def dreamer_task(task: AbortableTask) -> list[int]: + numbers = [] + for _ in range(30): + if task.is_aborted(): + _logger.warning("Alarm clock") + return numbers + numbers.append(randint(1, 90)) + time.sleep(1) + return numbers + + +@pytest.fixture +def register_celery_tasks() -> Callable[[Celery], None]: + def _(celery_app: Celery) -> None: + define_task(celery_app, sync_archive) + define_task(celery_app, failure_task) + define_task(celery_app, dreamer_task) + + return _ + + +@pytest.mark.usefixtures("celery_worker") +async def test_sumitting_task_calling_async_function_results_with_success_state( + celery_client: CeleryTaskQueueClient, +): + task_context = TaskContext(user_id=42) + + task_uuid = await celery_client.send_task( + "sync_archive", + task_context=task_context, + files=[f"file{n}" for n in range(5)], + ) + + for attempt in Retrying( + retry=retry_if_exception_type(AssertionError), + wait=wait_fixed(1), + stop=stop_after_delay(30), + ): + with attempt: + status = await celery_client.get_task_status(task_context, task_uuid) + assert status.task_state == TaskState.SUCCESS + + assert ( + await celery_client.get_task_status(task_context, task_uuid) + ).task_state == TaskState.SUCCESS + assert ( + await celery_client.get_task_result(task_context, task_uuid) + ) == "archive.zip" + + +@pytest.mark.usefixtures("celery_worker") +async def test_submitting_task_with_failure_results_with_error( + celery_client: CeleryTaskQueueClient, +): + task_context = TaskContext(user_id=42) + + task_uuid = await celery_client.send_task("failure_task", task_context=task_context) + + for attempt in Retrying( + retry=retry_if_exception_type((AssertionError, ValidationError)), + wait=wait_fixed(1), + stop=stop_after_delay(30), + ): + with attempt: + raw_result = await celery_client.get_task_result(task_context, task_uuid) + result = TypeAdapter(TaskError).validate_python(raw_result) + assert isinstance(result, TaskError) + + assert ( + await celery_client.get_task_status(task_context, task_uuid) + ).task_state == TaskState.ERROR + raw_result = await celery_client.get_task_result(task_context, task_uuid) + result = TypeAdapter(TaskError).validate_python(raw_result) + assert f"{result.exc_msg}" == "Something strange happened: BOOM!" + + +@pytest.mark.usefixtures("celery_worker") +async def test_aborting_task_results_with_aborted_state( + celery_client: CeleryTaskQueueClient, +): + task_context = TaskContext(user_id=42) + + task_uuid = await celery_client.send_task( + "dreamer_task", + task_context=task_context, + ) + + await celery_client.abort_task(task_context, task_uuid) + + for attempt in Retrying( + retry=retry_if_exception_type(AssertionError), + wait=wait_fixed(1), + stop=stop_after_delay(30), + ): + with attempt: + progress = await celery_client.get_task_status(task_context, task_uuid) + assert progress.task_state == TaskState.ABORTED + + assert ( + await celery_client.get_task_status(task_context, task_uuid) + ).task_state == TaskState.ABORTED diff --git a/services/storage/tests/unit/test_db_data_export.py b/services/storage/tests/unit/test_db_data_export.py index 717231787499..0bf9db1a2edb 100644 --- a/services/storage/tests/unit/test_db_data_export.py +++ b/services/storage/tests/unit/test_db_data_export.py @@ -1,5 +1,6 @@ # pylint: disable=W0621 # pylint: disable=W0613 +# pylint: disable=R6301 from collections.abc import Awaitable, Callable from pathlib import Path from typing import Any, Literal, NamedTuple @@ -18,6 +19,7 @@ from models_library.api_schemas_storage.data_export_async_jobs import ( DataExportTaskStartInput, ) +from models_library.progress_bar import ProgressReport from models_library.projects_nodes_io import NodeID, SimcoreS3FileID from models_library.users import UserID from pydantic import ByteSize, TypeAdapter @@ -28,9 +30,11 @@ from servicelib.rabbitmq import RabbitMQRPCClient from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs from settings_library.rabbit import RabbitSettings -from simcore_service_storage.api.rpc._async_jobs import AsyncJobNameData +from simcore_service_storage.api.rpc._async_jobs import AsyncJobNameData, TaskStatus from simcore_service_storage.api.rpc._data_export import AccessRightError from simcore_service_storage.core.settings import ApplicationSettings +from simcore_service_storage.modules.celery.client import TaskUUID +from simcore_service_storage.modules.celery.models import TaskState from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager pytest_plugins = [ @@ -43,6 +47,8 @@ "postgres", ] +_faker = Faker() + @pytest.fixture async def mock_rabbit_setup(mocker: MockerFixture): @@ -50,6 +56,38 @@ async def mock_rabbit_setup(mocker: MockerFixture): pass +class _MockCeleryClient: + async def send_task(self, *args, **kwargs) -> TaskUUID: + return _faker.uuid4() + + async def get_task_status(self, *args, **kwargs) -> TaskStatus: + return TaskStatus( + task_uuid=_faker.uuid4(), + task_state=TaskState.RUNNING, + progress_report=ProgressReport(actual_value=42.0), + ) + + async def get_task_result(self, *args, **kwargs) -> Any: + return {} + + async def get_task_uuids(self, *args, **kwargs) -> set[TaskUUID]: + return {_faker.uuid4()} + + +@pytest.fixture +async def mock_celery_client(mocker: MockerFixture) -> MockerFixture: + _celery_client = _MockCeleryClient() + mocker.patch( + "simcore_service_storage.api.rpc._async_jobs.get_celery_client", + return_value=_celery_client, + ) + mocker.patch( + "simcore_service_storage.api.rpc._data_export.get_celery_client", + return_value=_celery_client, + ) + return mocker + + @pytest.fixture async def app_environment( app_environment: EnvVarsDict, @@ -117,6 +155,7 @@ class UserWithFile(NamedTuple): ) async def test_start_data_export_success( rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, with_random_project_with_files: tuple[ dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, FileIDDict]], @@ -156,7 +195,10 @@ async def test_start_data_export_success( async def test_start_data_export_fail( - rpc_client: RabbitMQRPCClient, user_id: UserID, faker: Faker + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, + user_id: UserID, + faker: Faker, ): with pytest.raises(AccessRightError): _ = await async_jobs.submit_job( @@ -173,13 +215,16 @@ async def test_start_data_export_fail( ) -async def test_abort_data_export(rpc_client: RabbitMQRPCClient, faker: Faker): - _job_id = AsyncJobId(faker.uuid4()) +async def test_abort_data_export( + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, +): + _job_id = AsyncJobId(_faker.uuid4()) result = await async_jobs.abort( rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id_data=AsyncJobNameData( - user_id=faker.pyint(min_value=1, max_value=100), product_name="osparc" + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" ), job_id=_job_id, ) @@ -187,39 +232,48 @@ async def test_abort_data_export(rpc_client: RabbitMQRPCClient, faker: Faker): assert result.job_id == _job_id -async def test_get_data_export_status(rpc_client: RabbitMQRPCClient, faker: Faker): - _job_id = AsyncJobId(faker.uuid4()) +async def test_get_data_export_status( + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, +): + _job_id = AsyncJobId(_faker.uuid4()) result = await async_jobs.get_status( rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id=_job_id, job_id_data=AsyncJobNameData( - user_id=faker.pyint(min_value=1, max_value=100), product_name="osparc" + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" ), ) assert isinstance(result, AsyncJobStatus) assert result.job_id == _job_id -async def test_get_data_export_result(rpc_client: RabbitMQRPCClient, faker: Faker): - _job_id = AsyncJobId(faker.uuid4()) +async def test_get_data_export_result( + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, +): + _job_id = AsyncJobId(_faker.uuid4()) result = await async_jobs.get_result( rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id=_job_id, job_id_data=AsyncJobNameData( - user_id=faker.pyint(min_value=1, max_value=100), product_name="osparc" + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" ), ) assert isinstance(result, AsyncJobResult) -async def test_list_jobs(rpc_client: RabbitMQRPCClient, faker: Faker): +async def test_list_jobs( + rpc_client: RabbitMQRPCClient, + mock_celery_client: MockerFixture, +): result = await async_jobs.list_jobs( rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, job_id_data=AsyncJobNameData( - user_id=faker.pyint(min_value=1, max_value=100), product_name="osparc" + user_id=_faker.pyint(min_value=1, max_value=100), product_name="osparc" ), filter_="", ) diff --git a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml index 1fc7d0cad538..a64a214e1f0e 100644 --- a/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml +++ b/services/web/server/src/simcore_service_webserver/api/v0/openapi.yaml @@ -6427,12 +6427,6 @@ paths: type: string format: uuid title: Job Id - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/StorageAsyncJobGet' responses: '200': description: Successful Response @@ -6455,12 +6449,6 @@ paths: type: string format: uuid title: Job Id - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/StorageAsyncJobGet' responses: '200': description: Successful Response @@ -6482,12 +6470,6 @@ paths: type: string format: uuid title: Job Id - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/StorageAsyncJobGet' responses: '200': description: Successful Response @@ -14923,23 +14905,11 @@ components: done: type: boolean title: Done - started: - type: string - format: date-time - title: Started - stopped: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Stopped type: object required: - jobId - progress - done - - started - - stopped title: StorageAsyncJobStatus Structure: properties: diff --git a/services/web/server/src/simcore_service_webserver/storage/_rest.py b/services/web/server/src/simcore_service_webserver/storage/_rest.py index a3839db9d550..669bad63f218 100644 --- a/services/web/server/src/simcore_service_webserver/storage/_rest.py +++ b/services/web/server/src/simcore_service_webserver/storage/_rest.py @@ -7,6 +7,7 @@ import urllib.parse from typing import Any, Final, NamedTuple from urllib.parse import quote, unquote +from uuid import UUID from aiohttp import ClientTimeout, web from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData @@ -471,10 +472,14 @@ async def get_async_jobs(request: web.Request) -> web.Response: @permission_required("storage.files.*") @handle_data_export_exceptions async def get_async_job_status(request: web.Request) -> web.Response: + + class _PathParams(BaseModel): + job_id: UUID + _req_ctx = RequestContext.model_validate(request) rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - async_job_get = parse_request_path_parameters_as(StorageAsyncJobGet, request) + async_job_get = parse_request_path_parameters_as(_PathParams, request) async_job_rpc_status = await get_status( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, @@ -497,10 +502,13 @@ async def get_async_job_status(request: web.Request) -> web.Response: @permission_required("storage.files.*") @handle_data_export_exceptions async def abort_async_job(request: web.Request) -> web.Response: + class _PathParams(BaseModel): + job_id: UUID + _req_ctx = RequestContext.model_validate(request) rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - async_job_get = parse_request_path_parameters_as(StorageAsyncJobGet, request) + async_job_get = parse_request_path_parameters_as(_PathParams, request) async_job_rpc_abort = await abort( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, @@ -510,9 +518,11 @@ async def abort_async_job(request: web.Request) -> web.Response: ), ) return web.Response( - status=status.HTTP_200_OK - if async_job_rpc_abort.result - else status.HTTP_500_INTERNAL_SERVER_ERROR + status=( + status.HTTP_200_OK + if async_job_rpc_abort.result + else status.HTTP_500_INTERNAL_SERVER_ERROR + ) ) @@ -524,10 +534,13 @@ async def abort_async_job(request: web.Request) -> web.Response: @permission_required("storage.files.*") @handle_data_export_exceptions async def get_async_job_result(request: web.Request) -> web.Response: + class _PathParams(BaseModel): + job_id: UUID + _req_ctx = RequestContext.model_validate(request) rabbitmq_rpc_client = get_rabbitmq_rpc_client(request.app) - async_job_get = parse_request_path_parameters_as(StorageAsyncJobGet, request) + async_job_get = parse_request_path_parameters_as(_PathParams, request) async_job_rpc_result = await get_result( rabbitmq_rpc_client=rabbitmq_rpc_client, rpc_namespace=STORAGE_RPC_NAMESPACE, diff --git a/services/web/server/tests/integration/conftest.py b/services/web/server/tests/integration/conftest.py index c6575d80e21f..a66e1e4bec65 100644 --- a/services/web/server/tests/integration/conftest.py +++ b/services/web/server/tests/integration/conftest.py @@ -62,7 +62,7 @@ def webserver_environ( # version tha loads only the subsystems under test. For that reason, # the test webserver is built-up in webserver_service fixture that runs # on the host. - EXCLUDED_SERVICES = ["dask-scheduler", "director"] + EXCLUDED_SERVICES = ["dask-scheduler", "director", "sto-worker"] services_with_published_ports = [ name for name in core_services