Skip to content

Commit f5da4f2

Browse files
committed
splits aiopg and asyncpg contributions
1 parent cf49d8f commit f5da4f2

File tree

3 files changed

+158
-83
lines changed

3 files changed

+158
-83
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""
2+
Helpers on aiopg
3+
4+
SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529
5+
"""
6+
7+
import logging
8+
from collections.abc import AsyncIterator
9+
from typing import Any, cast
10+
11+
from aiohttp import web
12+
from aiopg.sa import Engine, create_engine
13+
from models_library.utils.json_serialization import json_dumps
14+
from servicelib.aiohttp.aiopg_utils import is_pg_responsive
15+
from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY
16+
from servicelib.logging_utils import log_context
17+
from servicelib.retry_policies import PostgresRetryPolicyUponInitialization
18+
from simcore_postgres_database.errors import DBAPIError
19+
from simcore_postgres_database.utils_aiopg import (
20+
DBMigrationError,
21+
close_engine,
22+
get_pg_engine_stateinfo,
23+
raise_if_migration_not_ready,
24+
)
25+
from tenacity import retry
26+
27+
from .settings import PostgresSettings, get_plugin_settings
28+
29+
_logger = logging.getLogger(__name__)
30+
31+
32+
@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
33+
async def _ensure_pg_ready(settings: PostgresSettings) -> Engine:
34+
engine: Engine = await create_engine(
35+
settings.dsn,
36+
application_name=settings.POSTGRES_CLIENT_NAME,
37+
minsize=settings.POSTGRES_MINSIZE,
38+
maxsize=settings.POSTGRES_MAXSIZE,
39+
)
40+
41+
try:
42+
await raise_if_migration_not_ready(engine)
43+
except (DBMigrationError, DBAPIError):
44+
await close_engine(engine)
45+
raise
46+
47+
return engine # tenacity rules guarantee exit with exc
48+
49+
50+
async def postgres_cleanup_ctx(app: web.Application) -> AsyncIterator[None]:
51+
52+
settings = get_plugin_settings(app)
53+
54+
with log_context(
55+
_logger,
56+
logging.INFO,
57+
"Connecting app[APP_AIOPG_ENGINE_KEY] to postgres with %s",
58+
f"{settings=}",
59+
):
60+
aiopg_engine = await _ensure_pg_ready(settings)
61+
app[APP_AIOPG_ENGINE_KEY] = aiopg_engine
62+
63+
_logger.info(
64+
"app[APP_AIOPG_ENGINE_KEY] created %s",
65+
json_dumps(get_engine_state(app), indent=1),
66+
)
67+
68+
yield # -------------------
69+
70+
if aiopg_engine is not app.get(APP_AIOPG_ENGINE_KEY):
71+
_logger.critical(
72+
"app[APP_AIOPG_ENGINE_KEY] does not hold right db engine. Somebody has changed it??"
73+
)
74+
75+
await close_engine(aiopg_engine)
76+
77+
_logger.debug(
78+
"app[APP_AIOPG_ENGINE_KEY] after shutdown %s (closed=%s): %s",
79+
aiopg_engine.dsn,
80+
aiopg_engine.closed,
81+
json_dumps(get_engine_state(app), indent=1),
82+
)
83+
84+
85+
def is_service_enabled(app: web.Application):
86+
return app.get(APP_AIOPG_ENGINE_KEY) is not None
87+
88+
89+
async def is_service_responsive(app: web.Application):
90+
"""Returns true if the app can connect to db service"""
91+
if not is_service_enabled(app):
92+
return False
93+
return await is_pg_responsive(engine=app[APP_AIOPG_ENGINE_KEY])
94+
95+
96+
def get_engine_state(app: web.Application) -> dict[str, Any]:
97+
engine: Engine | None = app.get(APP_AIOPG_ENGINE_KEY)
98+
if engine:
99+
pg_engine_stateinfo: dict[str, Any] = get_pg_engine_stateinfo(engine)
100+
return pg_engine_stateinfo
101+
return {}
102+
103+
104+
def get_database_engine(app: web.Application) -> Engine:
105+
return cast(Engine, app[APP_AIOPG_ENGINE_KEY])
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""
2+
Helpers on asyncpg
3+
4+
SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529
5+
"""
6+
7+
import logging
8+
from collections.abc import AsyncIterator
9+
10+
from aiohttp import web
11+
from servicelib.aiohttp.db_asyncpg_engine import (
12+
close_db_connection,
13+
connect_to_db,
14+
get_async_engine,
15+
)
16+
from sqlalchemy.ext.asyncio import AsyncEngine
17+
18+
from .settings import PostgresSettings, get_plugin_settings
19+
20+
_logger = logging.getLogger(__name__)
21+
22+
23+
async def postgres_cleanup_ctx(app: web.Application) -> AsyncIterator[None]:
24+
settings: PostgresSettings = get_plugin_settings(app)
25+
await connect_to_db(app, settings)
26+
27+
assert get_async_engine(app) # nosec
28+
assert isinstance(get_async_engine(app), AsyncEngine) # nosec
29+
30+
yield
31+
32+
await close_db_connection(app)
33+
34+
35+
__all__: tuple[str, ...] = (
36+
"get_async_engine",
37+
"postgres_cleanup_ctx",
38+
)

services/web/server/src/simcore_service_webserver/db/plugin.py

Lines changed: 15 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -3,97 +3,16 @@
33
"""
44

55
import logging
6-
from collections.abc import AsyncIterator
7-
from typing import Any, cast
86

97
from aiohttp import web
10-
from aiopg.sa import Engine, create_engine
11-
from models_library.utils.json_serialization import json_dumps
12-
from servicelib.aiohttp.aiopg_utils import is_pg_responsive
138
from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY
149
from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup
15-
from servicelib.retry_policies import PostgresRetryPolicyUponInitialization
16-
from simcore_postgres_database.errors import DBAPIError
17-
from simcore_postgres_database.utils_aiopg import (
18-
DBMigrationError,
19-
close_engine,
20-
get_pg_engine_stateinfo,
21-
raise_if_migration_not_ready,
22-
)
23-
from tenacity import retry
2410

25-
from .settings import PostgresSettings, get_plugin_settings
11+
from . import _aiopg, _asyncpg
2612

2713
_logger = logging.getLogger(__name__)
2814

2915

30-
@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
31-
async def _ensure_pg_ready(settings: PostgresSettings) -> Engine:
32-
33-
_logger.info("Connecting to postgres with %s", f"{settings=}")
34-
engine: Engine = await create_engine(
35-
settings.dsn,
36-
application_name=settings.POSTGRES_CLIENT_NAME,
37-
minsize=settings.POSTGRES_MINSIZE,
38-
maxsize=settings.POSTGRES_MAXSIZE,
39-
)
40-
41-
try:
42-
await raise_if_migration_not_ready(engine)
43-
except (DBMigrationError, DBAPIError):
44-
await close_engine(engine)
45-
raise
46-
47-
_logger.info("Connection to postgres with %s succeeded", f"{settings=}")
48-
return engine # tenacity rules guarantee exit with exc
49-
50-
51-
async def postgres_cleanup_ctx(app: web.Application) -> AsyncIterator[None]:
52-
53-
settings = get_plugin_settings(app)
54-
aiopg_engine = await _ensure_pg_ready(settings)
55-
app[APP_AIOPG_ENGINE_KEY] = aiopg_engine
56-
57-
_logger.info("pg engine created %s", json_dumps(get_engine_state(app), indent=1))
58-
59-
yield # -------------------
60-
61-
if aiopg_engine is not app.get(APP_AIOPG_ENGINE_KEY):
62-
_logger.critical("app does not hold right db engine. Somebody has changed it??")
63-
64-
await close_engine(aiopg_engine)
65-
66-
_logger.debug(
67-
"pg engine created after shutdown %s (closed=%s): %s",
68-
aiopg_engine.dsn,
69-
aiopg_engine.closed,
70-
json_dumps(get_engine_state(app), indent=1),
71-
)
72-
73-
74-
def is_service_enabled(app: web.Application):
75-
return app.get(APP_AIOPG_ENGINE_KEY) is not None
76-
77-
78-
async def is_service_responsive(app: web.Application):
79-
"""Returns true if the app can connect to db service"""
80-
if not is_service_enabled(app):
81-
return False
82-
return await is_pg_responsive(engine=app[APP_AIOPG_ENGINE_KEY])
83-
84-
85-
def get_engine_state(app: web.Application) -> dict[str, Any]:
86-
engine: Engine | None = app.get(APP_AIOPG_ENGINE_KEY)
87-
if engine:
88-
pg_engine_stateinfo: dict[str, Any] = get_pg_engine_stateinfo(engine)
89-
return pg_engine_stateinfo
90-
return {}
91-
92-
93-
def get_aiopg_engine(app: web.Application) -> Engine:
94-
return cast(Engine, app[APP_AIOPG_ENGINE_KEY])
95-
96-
9716
@app_module_setup(
9817
"simcore_service_webserver.db",
9918
ModuleCategory.ADDON,
@@ -106,4 +25,17 @@ def setup_db(app: web.Application):
10625
app[APP_AIOPG_ENGINE_KEY] = None
10726

10827
# async connection to db
109-
app.cleanup_ctx.append(postgres_cleanup_ctx)
28+
app.cleanup_ctx.append(_aiopg.postgres_cleanup_ctx)
29+
app.cleanup_ctx.append(_asyncpg.postgres_cleanup_ctx)
30+
31+
32+
# API
33+
34+
# aiopg helpers
35+
get_aiopg_engine = _aiopg.get_database_engine
36+
get_aiopg_engine_state = _aiopg.get_engine_state
37+
is_service_responsive = _aiopg.is_service_responsive
38+
is_service_enabled = _aiopg.is_service_enabled
39+
40+
# asyncpg helpers
41+
get_asyncpg_engine = _asyncpg.get_async_engine

0 commit comments

Comments
 (0)