|
21 | 21 | import simcore_service_storage |
22 | 22 | from asgi_lifespan import LifespanManager |
23 | 23 | from aws_library.s3 import SimcoreS3API |
| 24 | +from celery import Celery |
| 25 | +from celery.contrib.testing.worker import TestWorkController, start_worker |
| 26 | +from celery.signals import worker_init, worker_shutdown |
24 | 27 | from faker import Faker |
25 | 28 | from fakeredis.aioredis import FakeRedis |
26 | 29 | from fastapi import FastAPI |
|
41 | 44 | from models_library.users import UserID |
42 | 45 | from models_library.utils.fastapi_encoders import jsonable_encoder |
43 | 46 | from pydantic import ByteSize, TypeAdapter |
44 | | -from pytest_mock import MockerFixture |
| 47 | +from pytest_mock import MockerFixture, MockFixture |
45 | 48 | from pytest_simcore.helpers.fastapi import url_from_operation_id |
46 | 49 | from pytest_simcore.helpers.httpx_assert_checks import assert_status |
47 | 50 | from pytest_simcore.helpers.logging_tools import log_context |
|
57 | 60 | ) |
58 | 61 | from pytest_simcore.helpers.typing_env import EnvVarsDict |
59 | 62 | from servicelib.aiohttp import status |
| 63 | +from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient |
60 | 64 | from servicelib.utils import limited_gather |
61 | 65 | from settings_library.rabbit import RabbitSettings |
62 | 66 | from simcore_postgres_database.models.tokens import tokens |
63 | 67 | from simcore_postgres_database.storage_models import file_meta_data, projects, users |
| 68 | +from simcore_service_storage.api._worker_tasks.tasks import setup_worker_tasks |
64 | 69 | from simcore_service_storage.core.application import create_app |
65 | 70 | from simcore_service_storage.core.settings import ApplicationSettings |
66 | 71 | from simcore_service_storage.datcore_dsm import DatCoreDataManager |
67 | 72 | from simcore_service_storage.dsm import get_dsm_provider |
68 | 73 | from simcore_service_storage.models import FileMetaData, FileMetaDataAtDB, S3BucketName |
| 74 | +from simcore_service_storage.modules.celery.signals import ( |
| 75 | + on_worker_init, |
| 76 | + on_worker_shutdown, |
| 77 | +) |
| 78 | +from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker |
69 | 79 | from simcore_service_storage.modules.s3 import get_s3_client |
70 | 80 | from simcore_service_storage.simcore_s3_dsm import SimcoreS3DataManager |
71 | 81 | from sqlalchemy import literal_column |
@@ -959,3 +969,66 @@ def celery_config() -> dict[str, Any]: |
959 | 969 | "task_track_started": True, |
960 | 970 | "task_send_sent_event": True, |
961 | 971 | } |
| 972 | + |
| 973 | + |
| 974 | +@pytest.fixture |
| 975 | +def mock_celery_app(mocker: MockFixture, celery_config: dict[str, Any]) -> None: |
| 976 | + celery_app = Celery(**celery_config) |
| 977 | + |
| 978 | + mocker.patch( |
| 979 | + "simcore_service_storage.modules.celery.create_app", |
| 980 | + return_value=celery_app, |
| 981 | + ) |
| 982 | + |
| 983 | + |
| 984 | +@pytest.fixture |
| 985 | +async def celery_worker_controller( |
| 986 | + app_environment: EnvVarsDict, |
| 987 | + initialized_app: FastAPI, |
| 988 | + celery_app: Celery, |
| 989 | + monkeypatch: pytest.MonkeyPatch, |
| 990 | +) -> AsyncIterator[TestWorkController]: |
| 991 | + # Signals must be explicitily connected |
| 992 | + worker_init.connect(on_worker_init) |
| 993 | + worker_shutdown.connect(on_worker_shutdown) |
| 994 | + |
| 995 | + setup_worker_tasks(celery_app) |
| 996 | + |
| 997 | + monkeypatch.setenv("STORAGE_WORKER_MODE", "true") |
| 998 | + with start_worker( |
| 999 | + celery_app, |
| 1000 | + pool="threads", |
| 1001 | + concurrency=1, |
| 1002 | + loglevel="info", |
| 1003 | + perform_ping_check=False, |
| 1004 | + worker_kwargs={"hostname": "celery@worker1"}, |
| 1005 | + ) as worker: |
| 1006 | + worker_init.send(sender=worker) |
| 1007 | + |
| 1008 | + # NOTE: wait for worker to be ready (sic) |
| 1009 | + await asyncio.sleep(5) |
| 1010 | + yield worker |
| 1011 | + |
| 1012 | + worker_shutdown.send(sender=worker) |
| 1013 | + |
| 1014 | + |
| 1015 | +@pytest.fixture |
| 1016 | +def celery_worker( |
| 1017 | + celery_worker_controller: TestWorkController, |
| 1018 | +) -> CeleryTaskQueueWorker: |
| 1019 | + assert isinstance(celery_worker_controller.app, Celery) |
| 1020 | + return CeleryTaskQueueWorker(celery_worker_controller.app) |
| 1021 | + |
| 1022 | + |
| 1023 | +@pytest.fixture |
| 1024 | +async def storage_rabbitmq_rpc_client( |
| 1025 | + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], |
| 1026 | +) -> RabbitMQRPCClient: |
| 1027 | + rpc_client = await rabbitmq_rpc_client("pytest_storage_rpc_client") |
| 1028 | + assert rpc_client |
| 1029 | + return rpc_client |
| 1030 | + |
| 1031 | + |
| 1032 | +@pytest.fixture |
| 1033 | +def product_name(faker: Faker) -> str: |
| 1034 | + return faker.name() |
0 commit comments