Skip to content

Commit ba6829a

Browse files
authored
♻️ Preparations in webserver to integrate asyncpg engine (#6466)
1 parent 52771a9 commit ba6829a

File tree

36 files changed

+543
-189
lines changed

36 files changed

+543
-189
lines changed

packages/service-library/src/servicelib/aiohttp/application_keys.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
APP_CONFIG_KEY: Final[str] = f"{__name__ }.config"
2222
APP_SETTINGS_KEY: Final[str] = f"{__name__ }.settings"
2323

24-
APP_DB_ENGINE_KEY: Final[str] = f"{__name__ }.db_engine"
24+
APP_AIOPG_ENGINE_KEY: Final[str] = f"{__name__ }.aiopg_engine"
2525

2626
APP_CLIENT_SESSION_KEY: Final[str] = f"{__name__ }.session"
2727

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
Helpers on asyncpg specific for aiohttp
3+
4+
SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529
5+
"""
6+
7+
8+
import logging
9+
from typing import Final
10+
11+
from aiohttp import web
12+
from servicelib.logging_utils import log_context
13+
from settings_library.postgres import PostgresSettings
14+
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
15+
get_pg_engine_stateinfo,
16+
)
17+
from sqlalchemy.ext.asyncio import AsyncEngine
18+
19+
from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready
20+
from ..logging_utils import log_context
21+
22+
APP_DB_ASYNC_ENGINE_KEY: Final[str] = f"{__name__ }.AsyncEngine"
23+
24+
25+
_logger = logging.getLogger(__name__)
26+
27+
28+
def _set_async_engine_to_app_state(app: web.Application, engine: AsyncEngine):
29+
if exists := app.get(APP_DB_ASYNC_ENGINE_KEY, None):
30+
msg = f"An instance of {type(exists)} already in app[{APP_DB_ASYNC_ENGINE_KEY}]={exists}"
31+
raise ValueError(msg)
32+
33+
app[APP_DB_ASYNC_ENGINE_KEY] = engine
34+
return get_async_engine(app)
35+
36+
37+
def get_async_engine(app: web.Application) -> AsyncEngine:
38+
engine: AsyncEngine = app[APP_DB_ASYNC_ENGINE_KEY]
39+
assert engine # nosec
40+
return engine
41+
42+
43+
async def connect_to_db(app: web.Application, settings: PostgresSettings) -> None:
44+
"""
45+
- db services up, data migrated and ready to use
46+
- sets an engine in app state (use `get_async_engine(app)` to retrieve)
47+
"""
48+
if settings.POSTGRES_CLIENT_NAME:
49+
settings = settings.copy(
50+
update={"POSTGRES_CLIENT_NAME": settings.POSTGRES_CLIENT_NAME + "-asyncpg"}
51+
)
52+
53+
with log_context(
54+
_logger,
55+
logging.INFO,
56+
"Connecting app[APP_DB_ASYNC_ENGINE_KEY] to postgres with %s",
57+
f"{settings=}",
58+
):
59+
engine = await create_async_engine_and_pg_database_ready(settings)
60+
_set_async_engine_to_app_state(app, engine)
61+
62+
_logger.info(
63+
"app[APP_DB_ASYNC_ENGINE_KEY] ready : %s",
64+
await get_pg_engine_stateinfo(engine),
65+
)
66+
67+
68+
async def close_db_connection(app: web.Application) -> None:
69+
engine = get_async_engine(app)
70+
with log_context(
71+
_logger, logging.DEBUG, f"app[APP_DB_ASYNC_ENGINE_KEY] disconnect of {engine}"
72+
):
73+
if engine:
74+
await engine.dispose()
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import logging
2+
import time
3+
from datetime import timedelta
4+
5+
from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult
6+
from settings_library.postgres import PostgresSettings
7+
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
8+
raise_if_migration_not_ready,
9+
)
10+
from sqlalchemy.exc import SQLAlchemyError
11+
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
12+
from tenacity import retry
13+
14+
from .retry_policies import PostgresRetryPolicyUponInitialization
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
20+
async def create_async_engine_and_pg_database_ready(
21+
settings: PostgresSettings,
22+
) -> AsyncEngine:
23+
"""
24+
- creates asyncio engine
25+
- waits until db service is up
26+
- waits until db data is migrated (i.e. ready to use)
27+
- returns engine
28+
"""
29+
server_settings = None
30+
if settings.POSTGRES_CLIENT_NAME:
31+
server_settings = {
32+
"application_name": settings.POSTGRES_CLIENT_NAME,
33+
}
34+
35+
engine: AsyncEngine = create_async_engine(
36+
settings.dsn_with_async_sqlalchemy,
37+
pool_size=settings.POSTGRES_MINSIZE,
38+
max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE,
39+
connect_args={"server_settings": server_settings},
40+
pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects
41+
future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released
42+
)
43+
44+
try:
45+
await raise_if_migration_not_ready(engine)
46+
except Exception:
47+
# NOTE: engine must be closed because retry will create a new engine
48+
await engine.dispose()
49+
raise
50+
51+
return engine
52+
53+
54+
async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult:
55+
try:
56+
tic = time.time()
57+
# test
58+
async with engine.connect():
59+
...
60+
elapsed_time = time.time() - tic
61+
return IsResponsive(elapsed=timedelta(seconds=elapsed_time))
62+
except SQLAlchemyError as err:
63+
return IsNonResponsive(reason=f"{err}")
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import logging
2+
3+
from fastapi import FastAPI
4+
from settings_library.postgres import PostgresSettings
5+
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
6+
get_pg_engine_stateinfo,
7+
)
8+
9+
from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready
10+
from ..logging_utils import log_context
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
16+
with log_context(
17+
_logger,
18+
logging.DEBUG,
19+
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
20+
):
21+
engine = await create_async_engine_and_pg_database_ready(settings)
22+
23+
app.state.engine = engine
24+
_logger.debug(
25+
"Setup engine: %s",
26+
await get_pg_engine_stateinfo(engine),
27+
)
28+
29+
30+
async def close_db_connection(app: FastAPI) -> None:
31+
with log_context(_logger, logging.DEBUG, f"db disconnect of {app.state.engine}"):
32+
if engine := app.state.engine:
33+
await engine.dispose()

packages/settings-library/src/settings_library/postgres.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import urllib.parse
21
from functools import cached_property
32
from typing import Any, ClassVar
3+
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
44

55
from pydantic import Field, PostgresDsn, SecretStr, validator
66

@@ -75,11 +75,23 @@ def dsn_with_async_sqlalchemy(self) -> str:
7575
def dsn_with_query(self) -> str:
7676
"""Some clients do not support queries in the dsn"""
7777
dsn = self.dsn
78+
return self._update_query(dsn)
79+
80+
def _update_query(self, uri: str) -> str:
81+
# SEE https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
82+
new_params: dict[str, str] = {}
7883
if self.POSTGRES_CLIENT_NAME:
79-
dsn += "?" + urllib.parse.urlencode(
80-
{"application_name": self.POSTGRES_CLIENT_NAME}
81-
)
82-
return dsn
84+
new_params = {
85+
"application_name": self.POSTGRES_CLIENT_NAME,
86+
}
87+
88+
if new_params:
89+
parsed_uri = urlparse(uri)
90+
query = dict(parse_qsl(parsed_uri.query))
91+
query.update(new_params)
92+
updated_query = urlencode(query)
93+
return urlunparse(parsed_uri._replace(query=updated_query))
94+
return uri
8395

8496
class Config(BaseCustomSettings.Config):
8597
schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc]

packages/settings-library/tests/test__pydantic_settings.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from pydantic import BaseSettings, validator
1717
from pydantic.fields import ModelField, Undefined
18+
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
1819

1920

2021
def assert_field_specs(
@@ -48,11 +49,10 @@ class Settings(BaseSettings):
4849

4950
@validator("*", pre=True)
5051
@classmethod
51-
def parse_none(cls, v, values, field: ModelField):
52+
def _parse_none(cls, v, values, field: ModelField):
5253
# WARNING: In nullable fields, envs equal to null or none are parsed as None !!
53-
if field.allow_none:
54-
if isinstance(v, str) and v.lower() in ("null", "none"):
55-
return None
54+
if field.allow_none and isinstance(v, str) and v.lower() in ("null", "none"):
55+
return None
5656
return v
5757

5858

@@ -132,15 +132,21 @@ def test_fields_declarations():
132132
def test_construct(monkeypatch):
133133
# from __init__
134134
settings_from_init = Settings(
135-
VALUE=1, VALUE_ALSO_REQUIRED=10, VALUE_NULLABLE_REQUIRED=None
135+
VALUE=1,
136+
VALUE_ALSO_REQUIRED=10,
137+
VALUE_NULLABLE_REQUIRED=None,
136138
)
139+
137140
print(settings_from_init.json(exclude_unset=True, indent=1))
138141

139142
# from env vars
140-
monkeypatch.setenv("VALUE", "1")
141-
monkeypatch.setenv("VALUE_ALSO_REQUIRED", "10")
142-
monkeypatch.setenv(
143-
"VALUE_NULLABLE_REQUIRED", "null"
143+
setenvs_from_dict(
144+
monkeypatch,
145+
{
146+
"VALUE": "1",
147+
"VALUE_ALSO_REQUIRED": "10",
148+
"VALUE_NULLABLE_REQUIRED": "null",
149+
},
144150
) # WARNING: set this env to None would not work w/o ``parse_none`` validator! bug???
145151

146152
settings_from_env = Settings()

packages/settings-library/tests/test_postgres.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
# pylint: disable=unused-variable
44

55

6+
from urllib.parse import urlparse
7+
68
import pytest
9+
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
10+
from pytest_simcore.helpers.typing_env import EnvVarsDict
711
from settings_library.postgres import PostgresSettings
812

913

@@ -12,9 +16,16 @@ def env_file():
1216
return ".env-sample"
1317

1418

15-
def test_cached_property_dsn(mock_environment: dict):
19+
@pytest.fixture
20+
def mock_environment(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
21+
return mock_environment | setenvs_from_dict(
22+
monkeypatch, {"POSTGRES_CLIENT_NAME": "Some &43 funky name"}
23+
)
1624

17-
settings = PostgresSettings()
25+
26+
def test_cached_property_dsn(mock_environment: EnvVarsDict):
27+
28+
settings = PostgresSettings.create_from_envs()
1829

1930
# all are upper-case
2031
assert all(key == key.upper() for key in settings.dict())
@@ -28,20 +39,30 @@ def test_cached_property_dsn(mock_environment: dict):
2839
assert "dsn" in settings.dict()
2940

3041

31-
def test_dsn_with_query(mock_environment: dict, monkeypatch):
32-
42+
def test_dsn_with_query(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
3343
settings = PostgresSettings()
3444

35-
assert not settings.POSTGRES_CLIENT_NAME
45+
assert settings.POSTGRES_CLIENT_NAME
3646
assert settings.dsn == "postgresql://foo:secret@localhost:5432/foodb"
37-
38-
# now with app
39-
monkeypatch.setenv("POSTGRES_CLIENT_NAME", "Some &43 funky name")
40-
41-
settings_with_app = PostgresSettings()
42-
43-
assert settings_with_app.POSTGRES_CLIENT_NAME
4447
assert (
45-
settings_with_app.dsn_with_query
48+
settings.dsn_with_query
4649
== "postgresql://foo:secret@localhost:5432/foodb?application_name=Some+%2643+funky+name"
4750
)
51+
52+
with monkeypatch.context() as patch:
53+
patch.delenv("POSTGRES_CLIENT_NAME")
54+
settings = PostgresSettings()
55+
56+
assert not settings.POSTGRES_CLIENT_NAME
57+
assert settings.dsn == settings.dsn_with_query
58+
59+
60+
def test_dsn_with_async_sqlalchemy_has_query(
61+
mock_environment: EnvVarsDict, monkeypatch
62+
):
63+
settings = PostgresSettings()
64+
65+
parsed_url = urlparse(settings.dsn_with_async_sqlalchemy)
66+
assert parsed_url.scheme.split("+") == ["postgresql", "asyncpg"]
67+
68+
assert not parsed_url.query

services/catalog/src/simcore_service_catalog/core/events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import TypeAlias
44

55
from fastapi import FastAPI
6-
from servicelib.db_async_engine import close_db_connection, connect_to_db
6+
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db
77
from servicelib.logging_utils import log_context
88

99
from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG

services/payments/src/simcore_service_payments/services/healthchecks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
import logging
33

44
from models_library.healthchecks import LivenessResult
5+
from servicelib.db_asyncpg_utils import check_postgres_liveness
56
from sqlalchemy.ext.asyncio import AsyncEngine
67

78
from .payments_gateway import PaymentsGatewayApi
8-
from .postgres import check_postgres_liveness
99
from .resource_usage_tracker import ResourceUsageTrackerApi
1010

1111
_logger = logging.getLogger(__name__)

services/payments/src/simcore_service_payments/services/postgres.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
1-
import time
2-
from datetime import timedelta
3-
41
from fastapi import FastAPI
5-
from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult
6-
from servicelib.db_async_engine import close_db_connection, connect_to_db
7-
from sqlalchemy.exc import SQLAlchemyError
2+
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db
83
from sqlalchemy.ext.asyncio import AsyncEngine
94

105
from ..core.settings import ApplicationSettings
@@ -16,18 +11,6 @@ def get_engine(app: FastAPI) -> AsyncEngine:
1611
return engine
1712

1813

19-
async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult:
20-
try:
21-
tic = time.time()
22-
# test
23-
async with engine.connect():
24-
...
25-
elapsed_time = time.time() - tic
26-
return IsResponsive(elapsed=timedelta(seconds=elapsed_time))
27-
except SQLAlchemyError as err:
28-
return IsNonResponsive(reason=f"{err}")
29-
30-
3114
def setup_postgres(app: FastAPI):
3215
app.state.engine = None
3316

0 commit comments

Comments
 (0)