Skip to content

Commit 1b906ef

Browse files
committed
add rabbitmq to storage
1 parent 3bb98a6 commit 1b906ef

File tree

2 files changed

+69
-0
lines changed

2 files changed

+69
-0
lines changed

services/storage/src/simcore_service_storage/core/settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from settings_library.application import BaseApplicationSettings
1414
from settings_library.basic_types import LogLevel, PortInt
1515
from settings_library.postgres import PostgresSettings
16+
from settings_library.rabbit import RabbitSettings
1617
from settings_library.redis import RedisSettings
1718
from settings_library.s3 import S3Settings
1819
from settings_library.tracing import TracingSettings
@@ -75,6 +76,10 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
7576
description="Interval in seconds when task cleaning pending uploads runs. setting to NULL disables the cleaner.",
7677
)
7778

79+
STORAGE_RABBITMQ: RabbitSettings | None = Field(
80+
json_schema_extra={"auto_default_from_env": True},
81+
)
82+
7883
STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY: int = Field(
7984
4,
8085
description="Maximal amount of threads used by underlying S3 client to transfer data to S3 backend",
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import logging
2+
from typing import cast
3+
4+
from fastapi import FastAPI
5+
from servicelib.logging_utils import log_context
6+
from servicelib.rabbitmq import (
7+
RabbitMQClient,
8+
RabbitMQRPCClient,
9+
wait_till_rabbitmq_responsive,
10+
)
11+
from settings_library.rabbit import RabbitSettings
12+
13+
from ..exceptions.errors import ConfigurationError
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
18+
def setup(app: FastAPI) -> None:
19+
async def on_startup() -> None:
20+
with log_context(
21+
_logger,
22+
logging.INFO,
23+
msg="Storage startup Rabbitmq",
24+
):
25+
app.state.rabbitmq_client = None
26+
rabbit_settings: RabbitSettings | None = app.state.settings.STORAGE_RABBITMQ
27+
if not rabbit_settings:
28+
raise ConfigurationError(
29+
msg="RabbitMQ client is de-activated in the settings"
30+
)
31+
await wait_till_rabbitmq_responsive(rabbit_settings.dsn)
32+
app.state.rabbitmq_client = RabbitMQClient(
33+
client_name="storage", settings=rabbit_settings
34+
)
35+
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
36+
client_name="storage_rpc_server", settings=rabbit_settings
37+
)
38+
39+
async def on_shutdown() -> None:
40+
with log_context(
41+
_logger,
42+
logging.INFO,
43+
msg="Storage shutdown Rabbitmq",
44+
):
45+
if app.state.rabbitmq_client:
46+
await app.state.rabbitmq_client.close()
47+
if app.state.rabbitmq_rpc_server:
48+
await app.state.rabbitmq_rpc_server.close()
49+
50+
app.add_event_handler("startup", on_startup)
51+
app.add_event_handler("shutdown", on_shutdown)
52+
53+
54+
def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
55+
if not app.state.rabbitmq_client:
56+
raise ConfigurationError(
57+
msg="RabbitMQ client is not available. Please check the configuration."
58+
)
59+
return cast(RabbitMQClient, app.state.rabbitmq_client)
60+
61+
62+
def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
63+
assert app.state.rabbitmq_rpc_server # nosec
64+
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)

0 commit comments

Comments
 (0)