Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Annotated

from fastapi import Depends, FastAPI
from servicelib.fastapi.dependencies import get_app

from ....modules.celery import get_celery_client as _get_celery_client_from_app
from ....modules.celery.client import CeleryTaskQueueClient


def get_celery_client(
app: Annotated[FastAPI, Depends(get_app)],
) -> CeleryTaskQueueClient:
return _get_celery_client_from_app(app)
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ async def compute_path_size(
location_id: LocationID,
path: Path,
) -> AsyncJobGet:
assert app # nosec
task_uuid = await get_celery_client(app).send_task(
remote_compute_path_size.__name__,
task_context=job_id_data.model_dump(),
Expand Down
4 changes: 2 additions & 2 deletions services/storage/src/simcore_service_storage/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

def setup_dsm(app: FastAPI) -> None:
async def _on_startup() -> None:
dsm_provider = DataManagerProvider(app)
dsm_provider = DataManagerProvider(app=app)
dsm_provider.register_builder(
SimcoreS3DataManager.get_location_id(),
create_simcore_s3_data_manager,
Expand All @@ -38,7 +38,7 @@ async def _on_shutdown() -> None:


def get_dsm_provider(app: FastAPI) -> DataManagerProvider:
if not app.state.dsm_provider:
if not hasattr(app.state, "dsm_provider"):
raise ConfigurationError(
msg="DSM provider not available. Please check the configuration."
)
Expand Down
2 changes: 0 additions & 2 deletions services/storage/src/simcore_service_storage/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,4 @@
tracing_settings=_settings.STORAGE_TRACING,
)

_logger = logging.getLogger(__name__)

app = create_app(_settings)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import ssl
from typing import Any

from celery import Celery # type: ignore[import-untyped]
from settings_library.celery import CelerySettings
Expand All @@ -8,24 +9,28 @@
_logger = logging.getLogger(__name__)


def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
base_config = {
"broker_connection_retry_on_startup": True,
"result_expires": celery_settings.CELERY_RESULT_EXPIRES,
"result_extended": True,
"result_serializer": "json",
"task_send_sent_event": True,
"task_track_started": True,
"worker_send_task_events": True,
}
if celery_settings.CELERY_REDIS_RESULT_BACKEND.REDIS_SECURE:
base_config["redis_backend_use_ssl"] = {"ssl_cert_reqs": ssl.CERT_NONE}
return base_config


def create_app(celery_settings: CelerySettings) -> Celery:
assert celery_settings

app = Celery(
return Celery(
broker=celery_settings.CELERY_RABBIT_BROKER.dsn,
backend=celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
RedisDatabase.CELERY_TASKS,
),
**_celery_configure(celery_settings),
)
app.conf.broker_connection_retry_on_startup = True
# NOTE: disable SSL cert validation (https://github.com/ITISFoundation/osparc-simcore/pull/7407)
if celery_settings.CELERY_REDIS_RESULT_BACKEND.REDIS_SECURE:
app.conf.redis_backend_use_ssl = {"ssl_cert_reqs": ssl.CERT_NONE}
app.conf.result_expires = celery_settings.CELERY_RESULT_EXPIRES
app.conf.result_extended = True # original args are included in the results
app.conf.result_serializer = "json"
app.conf.task_send_sent_event = True
app.conf.task_track_started = True
app.conf.worker_send_task_events = True # enable tasks monitoring

return app
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
tracing_settings=_settings.STORAGE_TRACING,
)

_logger = logging.getLogger(__name__)

assert _settings.STORAGE_CELERY
app = create_celery_app(_settings.STORAGE_CELERY)
Expand Down
137 changes: 124 additions & 13 deletions services/storage/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


import asyncio
import datetime
import logging
import random
import sys
Expand All @@ -20,6 +21,9 @@
import simcore_service_storage
from asgi_lifespan import LifespanManager
from aws_library.s3 import SimcoreS3API
from celery import Celery
from celery.contrib.testing.worker import TestWorkController, start_worker
from celery.signals import worker_init, worker_shutdown
from faker import Faker
from fakeredis.aioredis import FakeRedis
from fastapi import FastAPI
Expand Down Expand Up @@ -56,15 +60,22 @@
)
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.aiohttp import status
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
from servicelib.utils import limited_gather
from settings_library.rabbit import RabbitSettings
from simcore_postgres_database.models.tokens import tokens
from simcore_postgres_database.storage_models import file_meta_data, projects, users
from simcore_service_storage.api._worker_tasks.tasks import setup_worker_tasks
from simcore_service_storage.core.application import create_app
from simcore_service_storage.core.settings import ApplicationSettings
from simcore_service_storage.datcore_dsm import DatCoreDataManager
from simcore_service_storage.dsm import get_dsm_provider
from simcore_service_storage.models import FileMetaData, FileMetaDataAtDB, S3BucketName
from simcore_service_storage.modules.celery.signals import (
on_worker_init,
on_worker_shutdown,
)
from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker
from simcore_service_storage.modules.long_running_tasks import (
get_completed_upload_tasks,
)
Expand All @@ -89,7 +100,6 @@
"pytest_simcore.environment_configs",
"pytest_simcore.file_extra",
"pytest_simcore.httpbin_service",
"pytest_simcore.minio_service",
"pytest_simcore.openapi_specs",
"pytest_simcore.postgres_service",
"pytest_simcore.pytest_global_environs",
Expand Down Expand Up @@ -188,6 +198,12 @@ def enabled_rabbitmq(
return rabbit_service


@pytest.fixture
async def mocked_redis_server(mocker: MockerFixture) -> None:
mock_redis = FakeRedis()
mocker.patch("redis.asyncio.from_url", return_value=mock_redis)


@pytest.fixture
def app_settings(
app_environment: EnvVarsDict,
Expand All @@ -196,26 +212,22 @@ def app_settings(
postgres_host_config: dict[str, str],
mocked_s3_server_envs: EnvVarsDict,
datcore_adapter_service_mock: respx.MockRouter,
mocked_redis_server,
mocked_redis_server: None,
) -> ApplicationSettings:
test_app_settings = ApplicationSettings.create_from_envs()
print(f"{test_app_settings.model_dump_json(indent=2)=}")
return test_app_settings


@pytest.fixture
async def mocked_redis_server(mocker: MockerFixture) -> None:
mock_redis = FakeRedis()
mocker.patch("redis.asyncio.from_url", return_value=mock_redis)


_LIFESPAN_TIMEOUT: Final[int] = 10


@pytest.fixture
async def initialized_app(app_settings: ApplicationSettings) -> AsyncIterator[FastAPI]:
settings = ApplicationSettings.create_from_envs()
app = create_app(settings)
async def initialized_app(
mock_celery_app: None,
app_settings: ApplicationSettings,
) -> AsyncIterator[FastAPI]:
app = create_app(app_settings)
# NOTE: the timeout is sometime too small for CI machines, and even larger machines
async with LifespanManager(
app, startup_timeout=_LIFESPAN_TIMEOUT, shutdown_timeout=_LIFESPAN_TIMEOUT
Expand Down Expand Up @@ -349,13 +361,13 @@ def upload_file(
sqlalchemy_async_engine: AsyncEngine,
storage_s3_client: SimcoreS3API,
storage_s3_bucket: S3BucketName,
initialized_app: FastAPI,
client: httpx.AsyncClient,
project_id: ProjectID,
node_id: NodeID,
create_upload_file_link_v2: Callable[..., Awaitable[FileUploadSchema]],
create_file_of_size: Callable[[ByteSize, str | None], Path],
create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID],
with_storage_celery_worker: CeleryTaskQueueWorker,
) -> Callable[
[ByteSize, str, SimcoreS3FileID | None], Awaitable[tuple[Path, SimcoreS3FileID]]
]:
Expand Down Expand Up @@ -893,7 +905,9 @@ async def output_file(
bucket=TypeAdapter(S3BucketName).validate_python("master-simcore"),
location_id=SimcoreS3DataManager.get_location_id(),
location_name=SimcoreS3DataManager.get_location_name(),
sha256_checksum=faker.sha256(),
sha256_checksum=TypeAdapter(SHA256Str).validate_python(
faker.sha256(raw_output=False)
),
)
file.entity_tag = "df9d868b94e53d18009066ca5cd90e9f"
file.file_size = ByteSize(12)
Expand Down Expand Up @@ -945,3 +959,100 @@ async def fake_datcore_tokens(
await conn.execute(
tokens.delete().where(tokens.c.token_id.in_(created_token_ids))
)


@pytest.fixture(scope="session")
def celery_config() -> dict[str, Any]:
return {
"broker_connection_retry_on_startup": True,
"broker_url": "memory://localhost//",
"result_backend": "cache+memory://localhost//",
"result_expires": datetime.timedelta(days=7),
"result_extended": True,
"pool": "threads",
"worker_send_task_events": True,
"task_track_started": True,
"task_send_sent_event": True,
}


@pytest.fixture
def mock_celery_app(mocker: MockerFixture, celery_config: dict[str, Any]) -> Celery:
celery_app = Celery(**celery_config)

for module in (
"simcore_service_storage.modules.celery._common.create_app",
"simcore_service_storage.modules.celery.create_app",
):
mocker.patch(module, return_value=celery_app)

return celery_app


@pytest.fixture
def register_celery_tasks() -> Callable[[Celery], None]:
"""override if tasks are needed"""

def _(celery_app: Celery) -> None: ...

return _


@pytest.fixture
async def with_storage_celery_worker_controller(
app_environment: EnvVarsDict,
celery_app: Celery,
monkeypatch: pytest.MonkeyPatch,
register_celery_tasks: Callable[[Celery], None],
) -> AsyncIterator[TestWorkController]:
# Signals must be explicitily connected
worker_init.connect(on_worker_init)
worker_shutdown.connect(on_worker_shutdown)

setup_worker_tasks(celery_app)
register_celery_tasks(celery_app)

monkeypatch.setenv("STORAGE_WORKER_MODE", "true")
with start_worker(
celery_app,
pool="threads",
concurrency=1,
loglevel="info",
perform_ping_check=False,
worker_kwargs={"hostname": "celery@worker1"},
) as worker:
worker_init.send(sender=worker)

# NOTE: wait for worker to be ready (sic)
await asyncio.sleep(1)
yield worker

worker_shutdown.send(sender=worker)


@pytest.fixture
def with_storage_celery_worker(
with_storage_celery_worker_controller: TestWorkController,
) -> CeleryTaskQueueWorker:
assert isinstance(with_storage_celery_worker_controller.app, Celery)
return CeleryTaskQueueWorker(with_storage_celery_worker_controller.app)


@pytest.fixture
async def storage_rabbitmq_rpc_client(
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
) -> RabbitMQRPCClient:
rpc_client = await rabbitmq_rpc_client("pytest_storage_rpc_client")
assert rpc_client
return rpc_client


@pytest.fixture
def product_name(faker: Faker) -> str:
return faker.name()


@pytest.fixture
def set_log_levels_for_noisy_libraries() -> None:
# Reduce the log level for 'werkzeug'
logging.getLogger("werkzeug").setLevel(logging.WARNING)
Loading
Loading