Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
APP_CONFIG_KEY: Final[str] = f"{__name__ }.config"
APP_SETTINGS_KEY: Final[str] = f"{__name__ }.settings"

APP_DB_ENGINE_KEY: Final[str] = f"{__name__ }.db_engine"
APP_AIOPG_ENGINE_KEY: Final[str] = f"{__name__ }.aiopg_engine"

APP_CLIENT_SESSION_KEY: Final[str] = f"{__name__ }.session"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Helpers on asyncpg specific for aiohttp
SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529
"""


import logging
from typing import Final

from aiohttp import web
from servicelib.logging_utils import log_context
from settings_library.postgres import PostgresSettings
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
get_pg_engine_stateinfo,
)
from sqlalchemy.ext.asyncio import AsyncEngine

from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready
from ..logging_utils import log_context

APP_DB_ASYNC_ENGINE_KEY: Final[str] = f"{__name__ }.AsyncEngine"


_logger = logging.getLogger(__name__)


def _set_async_engine_to_app_state(app: web.Application, engine: AsyncEngine):
if exists := app.get(APP_DB_ASYNC_ENGINE_KEY, None):
msg = f"An instance of {type(exists)} already in app[{APP_DB_ASYNC_ENGINE_KEY}]={exists}"
raise ValueError(msg)

app[APP_DB_ASYNC_ENGINE_KEY] = engine
return get_async_engine(app)


def get_async_engine(app: web.Application) -> AsyncEngine:
engine: AsyncEngine = app[APP_DB_ASYNC_ENGINE_KEY]
assert engine # nosec
return engine


async def connect_to_db(app: web.Application, settings: PostgresSettings) -> None:
"""
- db services up, data migrated and ready to use
- sets an engine in app state (use `get_async_engine(app)` to retrieve)
"""
if settings.POSTGRES_CLIENT_NAME:
settings = settings.copy(
update={"POSTGRES_CLIENT_NAME": settings.POSTGRES_CLIENT_NAME + "-asyncpg"}
)

with log_context(
_logger,
logging.INFO,
"Connecting app[APP_DB_ASYNC_ENGINE_KEY] to postgres with %s",
f"{settings=}",
):
engine = await create_async_engine_and_pg_database_ready(settings)
_set_async_engine_to_app_state(app, engine)

_logger.info(
"app[APP_DB_ASYNC_ENGINE_KEY] ready : %s",
await get_pg_engine_stateinfo(engine),
)


async def close_db_connection(app: web.Application) -> None:
engine = get_async_engine(app)
with log_context(
_logger, logging.DEBUG, f"app[APP_DB_ASYNC_ENGINE_KEY] disconnect of {engine}"
):
if engine:
await engine.dispose()
63 changes: 63 additions & 0 deletions packages/service-library/src/servicelib/db_asyncpg_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
import time
from datetime import timedelta

from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult
from settings_library.postgres import PostgresSettings
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
raise_if_migration_not_ready,
)
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from tenacity import retry

from .retry_policies import PostgresRetryPolicyUponInitialization

_logger = logging.getLogger(__name__)


@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
async def create_async_engine_and_pg_database_ready(
settings: PostgresSettings,
) -> AsyncEngine:
"""
- creates asyncio engine
- waits until db service is up
- waits until db data is migrated (i.e. ready to use)
- returns engine
"""
server_settings = None
if settings.POSTGRES_CLIENT_NAME:
server_settings = {
"application_name": settings.POSTGRES_CLIENT_NAME,
}

engine: AsyncEngine = create_async_engine(
settings.dsn_with_async_sqlalchemy,
pool_size=settings.POSTGRES_MINSIZE,
max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE,
connect_args={"server_settings": server_settings},
pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects
future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released
)

try:
await raise_if_migration_not_ready(engine)
except Exception:
# NOTE: engine must be closed because retry will create a new engine
await engine.dispose()
raise

return engine


async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult:
try:
tic = time.time()
# test
async with engine.connect():
...
elapsed_time = time.time() - tic
return IsResponsive(elapsed=timedelta(seconds=elapsed_time))
except SQLAlchemyError as err:
return IsNonResponsive(reason=f"{err}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging

from fastapi import FastAPI
from settings_library.postgres import PostgresSettings
from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear
get_pg_engine_stateinfo,
)

from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready
from ..logging_utils import log_context

_logger = logging.getLogger(__name__)


async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None:
with log_context(
_logger,
logging.DEBUG,
f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}",
):
engine = await create_async_engine_and_pg_database_ready(settings)

app.state.engine = engine
_logger.debug(
"Setup engine: %s",
await get_pg_engine_stateinfo(engine),
)


async def close_db_connection(app: FastAPI) -> None:
with log_context(_logger, logging.DEBUG, f"db disconnect of {app.state.engine}"):
if engine := app.state.engine:
await engine.dispose()
22 changes: 17 additions & 5 deletions packages/settings-library/src/settings_library/postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import urllib.parse
from functools import cached_property
from typing import Any, ClassVar
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

from pydantic import Field, PostgresDsn, SecretStr, validator

Expand Down Expand Up @@ -75,11 +75,23 @@ def dsn_with_async_sqlalchemy(self) -> str:
def dsn_with_query(self) -> str:
"""Some clients do not support queries in the dsn"""
dsn = self.dsn
return self._update_query(dsn)

def _update_query(self, uri: str) -> str:
# SEE https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
new_params: dict[str, str] = {}
if self.POSTGRES_CLIENT_NAME:
dsn += "?" + urllib.parse.urlencode(
{"application_name": self.POSTGRES_CLIENT_NAME}
)
return dsn
new_params = {
"application_name": self.POSTGRES_CLIENT_NAME,
}

if new_params:
parsed_uri = urlparse(uri)
query = dict(parse_qsl(parsed_uri.query))
query.update(new_params)
updated_query = urlencode(query)
return urlunparse(parsed_uri._replace(query=updated_query))
return uri

class Config(BaseCustomSettings.Config):
schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc]
Expand Down
24 changes: 15 additions & 9 deletions packages/settings-library/tests/test__pydantic_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pydantic import BaseSettings, validator
from pydantic.fields import ModelField, Undefined
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict


def assert_field_specs(
Expand Down Expand Up @@ -48,11 +49,10 @@ class Settings(BaseSettings):

@validator("*", pre=True)
@classmethod
def parse_none(cls, v, values, field: ModelField):
def _parse_none(cls, v, values, field: ModelField):
# WARNING: In nullable fields, envs equal to null or none are parsed as None !!
if field.allow_none:
if isinstance(v, str) and v.lower() in ("null", "none"):
return None
if field.allow_none and isinstance(v, str) and v.lower() in ("null", "none"):
return None
return v


Expand Down Expand Up @@ -132,15 +132,21 @@ def test_fields_declarations():
def test_construct(monkeypatch):
# from __init__
settings_from_init = Settings(
VALUE=1, VALUE_ALSO_REQUIRED=10, VALUE_NULLABLE_REQUIRED=None
VALUE=1,
VALUE_ALSO_REQUIRED=10,
VALUE_NULLABLE_REQUIRED=None,
)

print(settings_from_init.json(exclude_unset=True, indent=1))

# from env vars
monkeypatch.setenv("VALUE", "1")
monkeypatch.setenv("VALUE_ALSO_REQUIRED", "10")
monkeypatch.setenv(
"VALUE_NULLABLE_REQUIRED", "null"
setenvs_from_dict(
monkeypatch,
{
"VALUE": "1",
"VALUE_ALSO_REQUIRED": "10",
"VALUE_NULLABLE_REQUIRED": "null",
},
) # WARNING: set this env to None would not work w/o ``parse_none`` validator! bug???

settings_from_env = Settings()
Expand Down
47 changes: 34 additions & 13 deletions packages/settings-library/tests/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
# pylint: disable=unused-variable


from urllib.parse import urlparse

import pytest
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.postgres import PostgresSettings


Expand All @@ -12,9 +16,16 @@ def env_file():
return ".env-sample"


def test_cached_property_dsn(mock_environment: dict):
@pytest.fixture
def mock_environment(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
return mock_environment | setenvs_from_dict(
monkeypatch, {"POSTGRES_CLIENT_NAME": "Some &43 funky name"}
)

settings = PostgresSettings()

def test_cached_property_dsn(mock_environment: EnvVarsDict):

settings = PostgresSettings.create_from_envs()

# all are upper-case
assert all(key == key.upper() for key in settings.dict())
Expand All @@ -28,20 +39,30 @@ def test_cached_property_dsn(mock_environment: dict):
assert "dsn" in settings.dict()


def test_dsn_with_query(mock_environment: dict, monkeypatch):

def test_dsn_with_query(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch):
settings = PostgresSettings()

assert not settings.POSTGRES_CLIENT_NAME
assert settings.POSTGRES_CLIENT_NAME
assert settings.dsn == "postgresql://foo:secret@localhost:5432/foodb"

# now with app
monkeypatch.setenv("POSTGRES_CLIENT_NAME", "Some &43 funky name")

settings_with_app = PostgresSettings()

assert settings_with_app.POSTGRES_CLIENT_NAME
assert (
settings_with_app.dsn_with_query
settings.dsn_with_query
== "postgresql://foo:secret@localhost:5432/foodb?application_name=Some+%2643+funky+name"
)

with monkeypatch.context() as patch:
patch.delenv("POSTGRES_CLIENT_NAME")
settings = PostgresSettings()

assert not settings.POSTGRES_CLIENT_NAME
assert settings.dsn == settings.dsn_with_query


def test_dsn_with_async_sqlalchemy_has_query(
mock_environment: EnvVarsDict, monkeypatch
):
settings = PostgresSettings()

parsed_url = urlparse(settings.dsn_with_async_sqlalchemy)
assert parsed_url.scheme.split("+") == ["postgresql", "asyncpg"]

assert not parsed_url.query
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TypeAlias

from fastapi import FastAPI
from servicelib.db_async_engine import close_db_connection, connect_to_db
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db
from servicelib.logging_utils import log_context

from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import logging

from models_library.healthchecks import LivenessResult
from servicelib.db_asyncpg_utils import check_postgres_liveness
from sqlalchemy.ext.asyncio import AsyncEngine

from .payments_gateway import PaymentsGatewayApi
from .postgres import check_postgres_liveness
from .resource_usage_tracker import ResourceUsageTrackerApi

_logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import time
from datetime import timedelta

from fastapi import FastAPI
from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult
from servicelib.db_async_engine import close_db_connection, connect_to_db
from sqlalchemy.exc import SQLAlchemyError
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db
from sqlalchemy.ext.asyncio import AsyncEngine

from ..core.settings import ApplicationSettings
Expand All @@ -16,18 +11,6 @@ def get_engine(app: FastAPI) -> AsyncEngine:
return engine


async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult:
try:
tic = time.time()
# test
async with engine.connect():
...
elapsed_time = time.time() - tic
return IsResponsive(elapsed=timedelta(seconds=elapsed_time))
except SQLAlchemyError as err:
return IsNonResponsive(reason=f"{err}")


def setup_postgres(app: FastAPI):
app.state.engine = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fastapi import FastAPI
from servicelib.db_async_engine import close_db_connection, connect_to_db
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db


def setup(app: FastAPI):
Expand Down
Loading
Loading