Skip to content

Commit 2799012

Browse files
committed
ongoing
1 parent 73058af commit 2799012

File tree

8 files changed

+76
-121
lines changed

8 files changed

+76
-121
lines changed

services/storage/src/simcore_service_storage/api/rest/health.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@
99
from aiohttp import web
1010
from aws_library.s3 import S3AccessError
1111
from common_library.json_serialization import json_dumps
12+
from fastapi import Request
1213
from models_library.api_schemas_storage import HealthCheck, S3BucketName
1314
from models_library.app_diagnostics import AppStatusCheck
1415
from pydantic import TypeAdapter
16+
from servicelib.db_asyncpg_utils import check_postgres_liveness
17+
from servicelib.fastapi.db_asyncpg_engine import get_engine
1518
from servicelib.rest_constants import RESPONSE_MODEL_POLICY
19+
from simcore_postgres_database.utils_aiosqlalchemy import get_pg_engine_stateinfo
1620

1721
from ..._meta import API_VERSION, API_VTAG, PROJECT_NAME, VERSION
18-
from ...constants import APP_CONFIG_KEY
19-
from ...core.settings import ApplicationSettings
20-
from ...modules.db.db import get_engine_state
21-
from ...modules.db.db import is_service_responsive as is_pg_responsive
22+
from ...core.settings import get_application_settings
2223
from ...modules.s3 import get_s3_client
2324

2425
_logger = logging.getLogger(__name__)
@@ -43,10 +44,10 @@ async def get_health(request: web.Request) -> web.Response:
4344

4445

4546
@routes.get(f"/{API_VTAG}/status", name="get_status")
46-
async def get_status(request: web.Request) -> web.Response:
47+
async def get_status(request: Request) -> web.Response:
4748
# NOTE: all calls here must NOT raise
4849
assert request.app # nosec
49-
app_settings: ApplicationSettings = request.app[APP_CONFIG_KEY]
50+
app_settings = get_application_settings(request.app)
5051
s3_state = "disabled"
5152
if app_settings.STORAGE_S3:
5253
try:
@@ -63,9 +64,12 @@ async def get_status(request: web.Request) -> web.Response:
6364
s3_state = "failed"
6465

6566
postgres_state = "disabled"
67+
6668
if app_settings.STORAGE_POSTGRES:
6769
postgres_state = (
68-
"connected" if await is_pg_responsive(request.app) else "failed"
70+
"connected"
71+
if await check_postgres_liveness(get_engine(request.app))
72+
else "failed"
6973
)
7074

7175
status = AppStatusCheck.model_validate(
@@ -75,7 +79,7 @@ async def get_status(request: web.Request) -> web.Response:
7579
"services": {
7680
"postgres": {
7781
"healthy": postgres_state,
78-
"pool": get_engine_state(request.app),
82+
"pool": get_pg_engine_stateinfo(get_engine(request.app)),
7983
},
8084
"s3": {"healthy": s3_state},
8185
},

services/storage/src/simcore_service_storage/core/application.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
APP_NAME,
2525
APP_STARTED_BANNER_MSG,
2626
)
27-
from ..api.rest.utils import dsm_exception_handler
2827
from ..dsm import setup_dsm
2928
from ..dsm_cleaner import setup_dsm_cleaner
3029
from ..modules.db.db import setup_db
@@ -83,7 +82,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
8382
setup_redis(app)
8483
setup_dsm_cleaner(app)
8584

86-
app.middlewares.append(dsm_exception_handler)
85+
# app.middlewares.append(dsm_exception_handler)
8786

8887
if settings.STORAGE_PROFILING:
8988
app.add_middleware(ProfilerMiddleware)

services/storage/src/simcore_service_storage/core/settings.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Annotated, Self
22

3+
from fastapi import FastAPI
34
from pydantic import (
45
AliasChoices,
56
Field,
@@ -108,3 +109,8 @@ def _ensure_settings_consistency(self) -> Self:
108109
)
109110
raise ValueError(msg)
110111
return self
112+
113+
114+
def get_application_settings(app: FastAPI) -> ApplicationSettings:
115+
assert isinstance(app.state.settings, ApplicationSettings) # nosec
116+
return app.state.settings
Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
import logging
2+
from typing import cast
23

34
from fastapi import FastAPI
45

5-
from .constants import APP_DSM_KEY
66
from .datcore_dsm import DatCoreDataManager, create_datcore_data_manager
77
from .dsm_factory import DataManagerProvider
8+
from .exceptions.errors import ConfigurationError
89
from .simcore_s3_dsm import SimcoreS3DataManager, create_simcore_s3_data_manager
910

1011
logger = logging.getLogger(__name__)
1112

1213

1314
def setup_dsm(app: FastAPI):
14-
async def _cleanup_context(app: FastAPI):
15+
async def _on_startup(app: FastAPI) -> None:
1516
dsm_provider = DataManagerProvider(app)
1617
dsm_provider.register_builder(
1718
SimcoreS3DataManager.get_location_id(),
@@ -23,17 +24,22 @@ async def _cleanup_context(app: FastAPI):
2324
create_datcore_data_manager,
2425
DatCoreDataManager,
2526
)
26-
app[APP_DSM_KEY] = dsm_provider
27+
app.state.dsm_provider = dsm_provider
2728

28-
yield
29-
30-
logger.info("Shuting down %s", f"{dsm_provider=}")
29+
async def _on_shutdown() -> None:
30+
if app.state.dsm_provider:
31+
# nothing to do
32+
...
3133

3234
# ------
3335

34-
app.cleanup_ctx.append(_cleanup_context)
36+
app.add_event_handler("startup", _on_startup)
37+
app.add_event_handler("shutdown", _on_shutdown)
3538

3639

3740
def get_dsm_provider(app: FastAPI) -> DataManagerProvider:
38-
dsm_provider: DataManagerProvider = app[APP_DSM_KEY]
39-
return dsm_provider
41+
if not app.state.dsm_provider:
42+
raise ConfigurationError(
43+
msg="DSM provider not available. Please check the configuration."
44+
)
45+
return cast(DataManagerProvider, app.state.dsm_provider)

services/storage/src/simcore_service_storage/exceptions/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ class StorageRuntimeError(OsparcErrorMixin, RuntimeError):
55
...
66

77

8+
class ConfigurationError(StorageRuntimeError):
9+
msg_template: str = "Application misconfiguration: {msg}"
10+
11+
812
class DatabaseAccessError(StorageRuntimeError):
913
msg_template: str = "Unexpected error while accessing database backend"
1014

Lines changed: 11 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,23 @@
11
import logging
2-
from typing import Any
32

4-
from aiopg.sa.engine import Engine
53
from fastapi import FastAPI
6-
from servicelib.aiohttp.aiopg_utils import is_pg_responsive
7-
from servicelib.common_aiopg_utils import DataSourceName, create_pg_engine
4+
from servicelib.db_async_engine import close_db_connection
5+
from servicelib.fastapi.db_asyncpg_engine import connect_to_db
86
from servicelib.retry_policies import PostgresRetryPolicyUponInitialization
9-
from settings_library.postgres import PostgresSettings
10-
from simcore_postgres_database.utils_aiopg import (
11-
get_pg_engine_stateinfo,
12-
raise_if_migration_not_ready,
13-
)
147
from tenacity import retry
158

16-
from ...constants import APP_AIOPG_ENGINE_KEY
9+
from ...core.settings import get_application_settings
1710

1811
_logger = logging.getLogger(__name__)
1912

2013

21-
@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
22-
async def _ensure_pg_ready(dsn: DataSourceName, min_size: int, max_size: int) -> None:
23-
_logger.info("Checking pg is ready %s", dsn)
14+
def setup_db(app: FastAPI) -> None:
15+
@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
16+
async def _on_startup(app: FastAPI) -> None:
17+
await connect_to_db(app, get_application_settings(app).STORAGE_POSTGRES)
2418

25-
async with create_pg_engine(dsn, minsize=min_size, maxsize=max_size) as engine:
26-
await raise_if_migration_not_ready(engine)
19+
async def _on_shutdown() -> None:
20+
await close_db_connection(app)
2721

28-
29-
async def postgres_cleanup_ctx(app: FastAPI):
30-
pg_cfg: PostgresSettings = app[APP_CONFIG_KEY].STORAGE_POSTGRES
31-
dsn = DataSourceName(
32-
application_name=f"{__name__}_{id(app)}",
33-
database=pg_cfg.POSTGRES_DB,
34-
user=pg_cfg.POSTGRES_USER,
35-
password=pg_cfg.POSTGRES_PASSWORD.get_secret_value(),
36-
host=pg_cfg.POSTGRES_HOST,
37-
port=pg_cfg.POSTGRES_PORT,
38-
)
39-
40-
await _ensure_pg_ready(
41-
dsn, min_size=pg_cfg.POSTGRES_MINSIZE, max_size=pg_cfg.POSTGRES_MAXSIZE
42-
)
43-
_logger.info("Creating pg engine for %s", dsn)
44-
async with create_pg_engine(
45-
dsn, minsize=pg_cfg.POSTGRES_MINSIZE, maxsize=pg_cfg.POSTGRES_MAXSIZE
46-
) as engine:
47-
assert engine # nosec
48-
app[APP_AIOPG_ENGINE_KEY] = engine
49-
50-
_logger.info("Created pg engine for %s", dsn)
51-
yield # ----------
52-
_logger.info("Deleting pg engine for %s", dsn)
53-
_logger.info("Deleted pg engine for %s", dsn)
54-
55-
56-
async def is_service_responsive(app: FastAPI) -> bool:
57-
"""Returns true if the app can connect to db service"""
58-
return await is_pg_responsive(engine=app[APP_AIOPG_ENGINE_KEY])
59-
60-
61-
def get_engine_state(app: FastAPI) -> dict[str, Any]:
62-
engine: Engine | None = app.get(APP_AIOPG_ENGINE_KEY)
63-
if engine:
64-
engine_info: dict[str, Any] = get_pg_engine_stateinfo(engine)
65-
return engine_info
66-
return {}
67-
68-
69-
def setup_db(app: FastAPI):
70-
app[APP_AIOPG_ENGINE_KEY] = None
71-
72-
# app is created at this point but not yet started
73-
_logger.debug("Setting up %s [service: %s] ...", __name__, "postgres")
74-
75-
# async connection to db
76-
app.cleanup_ctx.append(postgres_cleanup_ctx)
22+
app.add_event_handler("startup", _on_startup)
23+
app.add_event_handler("shutdown", _on_shutdown)
Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
11
"""Module to access s3 service"""
22

33
import logging
4-
from collections.abc import AsyncGenerator
54
from typing import cast
65

76
from aws_library.s3 import SimcoreS3API
87
from common_library.json_serialization import json_dumps
8+
from fastapi import FastAPI
99
from servicelib.logging_utils import log_context
1010
from tenacity.asyncio import AsyncRetrying
1111
from tenacity.before_sleep import before_sleep_log
1212
from tenacity.wait import wait_fixed
1313

14-
from ..constants import APP_CONFIG_KEY, APP_S3_KEY, RETRY_WAIT_SECS
15-
from ..core.settings import ApplicationSettings
14+
from ..constants import RETRY_WAIT_SECS
15+
from ..core.settings import get_application_settings
16+
from ..exceptions.errors import ConfigurationError
1617

1718
_logger = logging.getLogger(__name__)
1819

1920

20-
async def setup_s3_client(app) -> AsyncGenerator[None, None]:
21-
client = None
22-
23-
with log_context(_logger, logging.DEBUG, msg="setup.s3_client.cleanup_ctx"):
24-
storage_settings: ApplicationSettings = app[APP_CONFIG_KEY]
25-
storage_s3_settings = storage_settings.STORAGE_S3
26-
assert storage_s3_settings # nosec
21+
def setup_s3(app: FastAPI):
22+
async def _on_startup(app: FastAPI) -> None:
23+
app.state.s3_client = None
24+
settings = get_application_settings(app)
2725

2826
async for attempt in AsyncRetrying(
2927
wait=wait_fixed(RETRY_WAIT_SECS),
@@ -32,43 +30,34 @@ async def setup_s3_client(app) -> AsyncGenerator[None, None]:
3230
):
3331
with attempt:
3432
client = await SimcoreS3API.create(
35-
storage_s3_settings,
36-
storage_settings.STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY,
33+
settings.STORAGE_S3,
34+
settings.STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY,
3735
)
3836
_logger.info(
3937
"S3 client %s successfully created [%s]",
4038
f"{client=}",
4139
json_dumps(attempt.retry_state.retry_object.statistics),
4240
)
4341
assert client # nosec
44-
app[APP_S3_KEY] = client
45-
46-
yield
47-
48-
with log_context(_logger, logging.DEBUG, msg="teardown.s3_client.cleanup_ctx"):
49-
if client:
50-
await client.close()
42+
app.state.s3_client = client
5143

44+
with log_context(_logger, logging.DEBUG, msg="setup.s3_bucket.cleanup_ctx"):
45+
await client.create_bucket(
46+
bucket=settings.STORAGE_S3.S3_BUCKET_NAME,
47+
region=settings.STORAGE_S3.S3_REGION,
48+
)
5249

53-
async def setup_s3_bucket(app: FastAPI):
54-
with log_context(_logger, logging.DEBUG, msg="setup.s3_bucket.cleanup_ctx"):
55-
storage_s3_settings = app[APP_CONFIG_KEY].STORAGE_S3
56-
client = get_s3_client(app)
57-
await client.create_bucket(
58-
bucket=storage_s3_settings.S3_BUCKET_NAME,
59-
region=storage_s3_settings.S3_REGION,
60-
)
61-
yield
62-
50+
async def _on_shutdown() -> None:
51+
if app.state.s3_client:
52+
await cast(SimcoreS3API, app.state.s3_client).close()
6353

64-
def setup_s3(app: FastAPI):
65-
if setup_s3_client not in app.cleanup_ctx:
66-
app.cleanup_ctx.append(setup_s3_client)
67-
if setup_s3_bucket not in app.cleanup_ctx:
68-
app.cleanup_ctx.append(setup_s3_bucket)
54+
app.add_event_handler("startup", _on_startup)
55+
app.add_event_handler("shutdown", _on_shutdown)
6956

7057

7158
def get_s3_client(app: FastAPI) -> SimcoreS3API:
72-
assert app[APP_S3_KEY] # nosec
73-
assert isinstance(app[APP_S3_KEY], SimcoreS3API) # nosec
74-
return cast(SimcoreS3API, app[APP_S3_KEY])
59+
if not app.state.s3_client:
60+
raise ConfigurationError(
61+
msg="S3 client is not available. Please check the configuration."
62+
)
63+
return cast(SimcoreS3API, app.state.s3_client)

services/storage/src/simcore_service_storage/routes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from pathlib import Path
33

4-
from aiohttp_swagger import setup_swagger # type: ignore[import-untyped]
4+
from fastapi import FastAPI
55
from servicelib.aiohttp.rest_middlewares import append_rest_middlewares
66
from servicelib.aiohttp.rest_utils import (
77
get_named_routes_as_message,

0 commit comments

Comments
 (0)