diff --git a/packages/service-library/src/servicelib/aiohttp/application_keys.py b/packages/service-library/src/servicelib/aiohttp/application_keys.py index f98423bad05c..3958c860cb00 100644 --- a/packages/service-library/src/servicelib/aiohttp/application_keys.py +++ b/packages/service-library/src/servicelib/aiohttp/application_keys.py @@ -21,7 +21,7 @@ APP_CONFIG_KEY: Final[str] = f"{__name__ }.config" APP_SETTINGS_KEY: Final[str] = f"{__name__ }.settings" -APP_DB_ENGINE_KEY: Final[str] = f"{__name__ }.db_engine" +APP_AIOPG_ENGINE_KEY: Final[str] = f"{__name__ }.aiopg_engine" APP_CLIENT_SESSION_KEY: Final[str] = f"{__name__ }.session" diff --git a/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py b/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py new file mode 100644 index 000000000000..2ca9d431075b --- /dev/null +++ b/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py @@ -0,0 +1,74 @@ +""" +Helpers on asyncpg specific for aiohttp + +SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529 +""" + + +import logging +from typing import Final + +from aiohttp import web +from servicelib.logging_utils import log_context +from settings_library.postgres import PostgresSettings +from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear + get_pg_engine_stateinfo, +) +from sqlalchemy.ext.asyncio import AsyncEngine + +from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready +from ..logging_utils import log_context + +APP_DB_ASYNC_ENGINE_KEY: Final[str] = f"{__name__ }.AsyncEngine" + + +_logger = logging.getLogger(__name__) + + +def _set_async_engine_to_app_state(app: web.Application, engine: AsyncEngine): + if exists := app.get(APP_DB_ASYNC_ENGINE_KEY, None): + msg = f"An instance of {type(exists)} already in app[{APP_DB_ASYNC_ENGINE_KEY}]={exists}" + raise ValueError(msg) + + app[APP_DB_ASYNC_ENGINE_KEY] = engine + return get_async_engine(app) + + +def get_async_engine(app: web.Application) -> AsyncEngine: + engine: AsyncEngine = app[APP_DB_ASYNC_ENGINE_KEY] + assert engine # nosec + return engine + + +async def connect_to_db(app: web.Application, settings: PostgresSettings) -> None: + """ + - db services up, data migrated and ready to use + - sets an engine in app state (use `get_async_engine(app)` to retrieve) + """ + if settings.POSTGRES_CLIENT_NAME: + settings = settings.copy( + update={"POSTGRES_CLIENT_NAME": settings.POSTGRES_CLIENT_NAME + "-asyncpg"} + ) + + with log_context( + _logger, + logging.INFO, + "Connecting app[APP_DB_ASYNC_ENGINE_KEY] to postgres with %s", + f"{settings=}", + ): + engine = await create_async_engine_and_pg_database_ready(settings) + _set_async_engine_to_app_state(app, engine) + + _logger.info( + "app[APP_DB_ASYNC_ENGINE_KEY] ready : %s", + await get_pg_engine_stateinfo(engine), + ) + + +async def close_db_connection(app: web.Application) -> None: + engine = get_async_engine(app) + with log_context( + _logger, logging.DEBUG, f"app[APP_DB_ASYNC_ENGINE_KEY] disconnect of {engine}" + ): + if engine: + await engine.dispose() diff --git a/packages/service-library/src/servicelib/db_asyncpg_utils.py b/packages/service-library/src/servicelib/db_asyncpg_utils.py new file mode 100644 index 000000000000..84430916824a --- /dev/null +++ b/packages/service-library/src/servicelib/db_asyncpg_utils.py @@ -0,0 +1,63 @@ +import logging +import time +from datetime import timedelta + +from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult +from settings_library.postgres import PostgresSettings +from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear + raise_if_migration_not_ready, +) +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine +from tenacity import retry + +from .retry_policies import PostgresRetryPolicyUponInitialization + +_logger = logging.getLogger(__name__) + + +@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs) +async def create_async_engine_and_pg_database_ready( + settings: PostgresSettings, +) -> AsyncEngine: + """ + - creates asyncio engine + - waits until db service is up + - waits until db data is migrated (i.e. ready to use) + - returns engine + """ + server_settings = None + if settings.POSTGRES_CLIENT_NAME: + server_settings = { + "application_name": settings.POSTGRES_CLIENT_NAME, + } + + engine: AsyncEngine = create_async_engine( + settings.dsn_with_async_sqlalchemy, + pool_size=settings.POSTGRES_MINSIZE, + max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE, + connect_args={"server_settings": server_settings}, + pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects + future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released + ) + + try: + await raise_if_migration_not_ready(engine) + except Exception: + # NOTE: engine must be closed because retry will create a new engine + await engine.dispose() + raise + + return engine + + +async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult: + try: + tic = time.time() + # test + async with engine.connect(): + ... + elapsed_time = time.time() - tic + return IsResponsive(elapsed=timedelta(seconds=elapsed_time)) + except SQLAlchemyError as err: + return IsNonResponsive(reason=f"{err}") diff --git a/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py b/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py new file mode 100644 index 000000000000..a45e5dc2145d --- /dev/null +++ b/packages/service-library/src/servicelib/fastapi/db_asyncpg_engine.py @@ -0,0 +1,33 @@ +import logging + +from fastapi import FastAPI +from settings_library.postgres import PostgresSettings +from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear + get_pg_engine_stateinfo, +) + +from ..db_asyncpg_utils import create_async_engine_and_pg_database_ready +from ..logging_utils import log_context + +_logger = logging.getLogger(__name__) + + +async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None: + with log_context( + _logger, + logging.DEBUG, + f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}", + ): + engine = await create_async_engine_and_pg_database_ready(settings) + + app.state.engine = engine + _logger.debug( + "Setup engine: %s", + await get_pg_engine_stateinfo(engine), + ) + + +async def close_db_connection(app: FastAPI) -> None: + with log_context(_logger, logging.DEBUG, f"db disconnect of {app.state.engine}"): + if engine := app.state.engine: + await engine.dispose() diff --git a/packages/settings-library/src/settings_library/postgres.py b/packages/settings-library/src/settings_library/postgres.py index f8335bbeed2b..7724aba99dc4 100644 --- a/packages/settings-library/src/settings_library/postgres.py +++ b/packages/settings-library/src/settings_library/postgres.py @@ -1,6 +1,6 @@ -import urllib.parse from functools import cached_property from typing import Any, ClassVar +from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse from pydantic import Field, PostgresDsn, SecretStr, validator @@ -75,11 +75,23 @@ def dsn_with_async_sqlalchemy(self) -> str: def dsn_with_query(self) -> str: """Some clients do not support queries in the dsn""" dsn = self.dsn + return self._update_query(dsn) + + def _update_query(self, uri: str) -> str: + # SEE https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS + new_params: dict[str, str] = {} if self.POSTGRES_CLIENT_NAME: - dsn += "?" + urllib.parse.urlencode( - {"application_name": self.POSTGRES_CLIENT_NAME} - ) - return dsn + new_params = { + "application_name": self.POSTGRES_CLIENT_NAME, + } + + if new_params: + parsed_uri = urlparse(uri) + query = dict(parse_qsl(parsed_uri.query)) + query.update(new_params) + updated_query = urlencode(query) + return urlunparse(parsed_uri._replace(query=updated_query)) + return uri class Config(BaseCustomSettings.Config): schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc] diff --git a/packages/settings-library/tests/test__pydantic_settings.py b/packages/settings-library/tests/test__pydantic_settings.py index 8cf3eadc30ff..956bf6a35015 100644 --- a/packages/settings-library/tests/test__pydantic_settings.py +++ b/packages/settings-library/tests/test__pydantic_settings.py @@ -15,6 +15,7 @@ from pydantic import BaseSettings, validator from pydantic.fields import ModelField, Undefined +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict def assert_field_specs( @@ -48,11 +49,10 @@ class Settings(BaseSettings): @validator("*", pre=True) @classmethod - def parse_none(cls, v, values, field: ModelField): + def _parse_none(cls, v, values, field: ModelField): # WARNING: In nullable fields, envs equal to null or none are parsed as None !! - if field.allow_none: - if isinstance(v, str) and v.lower() in ("null", "none"): - return None + if field.allow_none and isinstance(v, str) and v.lower() in ("null", "none"): + return None return v @@ -132,15 +132,21 @@ def test_fields_declarations(): def test_construct(monkeypatch): # from __init__ settings_from_init = Settings( - VALUE=1, VALUE_ALSO_REQUIRED=10, VALUE_NULLABLE_REQUIRED=None + VALUE=1, + VALUE_ALSO_REQUIRED=10, + VALUE_NULLABLE_REQUIRED=None, ) + print(settings_from_init.json(exclude_unset=True, indent=1)) # from env vars - monkeypatch.setenv("VALUE", "1") - monkeypatch.setenv("VALUE_ALSO_REQUIRED", "10") - monkeypatch.setenv( - "VALUE_NULLABLE_REQUIRED", "null" + setenvs_from_dict( + monkeypatch, + { + "VALUE": "1", + "VALUE_ALSO_REQUIRED": "10", + "VALUE_NULLABLE_REQUIRED": "null", + }, ) # WARNING: set this env to None would not work w/o ``parse_none`` validator! bug??? settings_from_env = Settings() diff --git a/packages/settings-library/tests/test_postgres.py b/packages/settings-library/tests/test_postgres.py index 1708acc78081..19dbfcf17947 100644 --- a/packages/settings-library/tests/test_postgres.py +++ b/packages/settings-library/tests/test_postgres.py @@ -3,7 +3,11 @@ # pylint: disable=unused-variable +from urllib.parse import urlparse + import pytest +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.postgres import PostgresSettings @@ -12,9 +16,16 @@ def env_file(): return ".env-sample" -def test_cached_property_dsn(mock_environment: dict): +@pytest.fixture +def mock_environment(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): + return mock_environment | setenvs_from_dict( + monkeypatch, {"POSTGRES_CLIENT_NAME": "Some &43 funky name"} + ) - settings = PostgresSettings() + +def test_cached_property_dsn(mock_environment: EnvVarsDict): + + settings = PostgresSettings.create_from_envs() # all are upper-case assert all(key == key.upper() for key in settings.dict()) @@ -28,20 +39,30 @@ def test_cached_property_dsn(mock_environment: dict): assert "dsn" in settings.dict() -def test_dsn_with_query(mock_environment: dict, monkeypatch): - +def test_dsn_with_query(mock_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): settings = PostgresSettings() - assert not settings.POSTGRES_CLIENT_NAME + assert settings.POSTGRES_CLIENT_NAME assert settings.dsn == "postgresql://foo:secret@localhost:5432/foodb" - - # now with app - monkeypatch.setenv("POSTGRES_CLIENT_NAME", "Some &43 funky name") - - settings_with_app = PostgresSettings() - - assert settings_with_app.POSTGRES_CLIENT_NAME assert ( - settings_with_app.dsn_with_query + settings.dsn_with_query == "postgresql://foo:secret@localhost:5432/foodb?application_name=Some+%2643+funky+name" ) + + with monkeypatch.context() as patch: + patch.delenv("POSTGRES_CLIENT_NAME") + settings = PostgresSettings() + + assert not settings.POSTGRES_CLIENT_NAME + assert settings.dsn == settings.dsn_with_query + + +def test_dsn_with_async_sqlalchemy_has_query( + mock_environment: EnvVarsDict, monkeypatch +): + settings = PostgresSettings() + + parsed_url = urlparse(settings.dsn_with_async_sqlalchemy) + assert parsed_url.scheme.split("+") == ["postgresql", "asyncpg"] + + assert not parsed_url.query diff --git a/services/catalog/src/simcore_service_catalog/core/events.py b/services/catalog/src/simcore_service_catalog/core/events.py index fb2329019b52..f22adbba4ece 100644 --- a/services/catalog/src/simcore_service_catalog/core/events.py +++ b/services/catalog/src/simcore_service_catalog/core/events.py @@ -3,7 +3,7 @@ from typing import TypeAlias from fastapi import FastAPI -from servicelib.db_async_engine import close_db_connection, connect_to_db +from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db from servicelib.logging_utils import log_context from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG diff --git a/services/payments/src/simcore_service_payments/services/healthchecks.py b/services/payments/src/simcore_service_payments/services/healthchecks.py index 98774700f44b..be6344c00ef7 100644 --- a/services/payments/src/simcore_service_payments/services/healthchecks.py +++ b/services/payments/src/simcore_service_payments/services/healthchecks.py @@ -2,10 +2,10 @@ import logging from models_library.healthchecks import LivenessResult +from servicelib.db_asyncpg_utils import check_postgres_liveness from sqlalchemy.ext.asyncio import AsyncEngine from .payments_gateway import PaymentsGatewayApi -from .postgres import check_postgres_liveness from .resource_usage_tracker import ResourceUsageTrackerApi _logger = logging.getLogger(__name__) diff --git a/services/payments/src/simcore_service_payments/services/postgres.py b/services/payments/src/simcore_service_payments/services/postgres.py index ba68eae0fac4..fd84fba45ce7 100644 --- a/services/payments/src/simcore_service_payments/services/postgres.py +++ b/services/payments/src/simcore_service_payments/services/postgres.py @@ -1,10 +1,5 @@ -import time -from datetime import timedelta - from fastapi import FastAPI -from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult -from servicelib.db_async_engine import close_db_connection, connect_to_db -from sqlalchemy.exc import SQLAlchemyError +from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db from sqlalchemy.ext.asyncio import AsyncEngine from ..core.settings import ApplicationSettings @@ -16,18 +11,6 @@ def get_engine(app: FastAPI) -> AsyncEngine: return engine -async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult: - try: - tic = time.time() - # test - async with engine.connect(): - ... - elapsed_time = time.time() - tic - return IsResponsive(elapsed=timedelta(seconds=elapsed_time)) - except SQLAlchemyError as err: - return IsNonResponsive(reason=f"{err}") - - def setup_postgres(app: FastAPI): app.state.engine = None diff --git a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/__init__.py b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/__init__.py index bca3083383ce..42062cb30ba6 100644 --- a/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/__init__.py +++ b/services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/__init__.py @@ -1,5 +1,5 @@ from fastapi import FastAPI -from servicelib.db_async_engine import close_db_connection, connect_to_db +from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db def setup(app: FastAPI): diff --git a/services/storage/src/simcore_service_storage/constants.py b/services/storage/src/simcore_service_storage/constants.py index f9c6a36f5c26..498a7c0eebb7 100644 --- a/services/storage/src/simcore_service_storage/constants.py +++ b/services/storage/src/simcore_service_storage/constants.py @@ -39,7 +39,7 @@ MAX_CONCURRENT_REST_CALLS: Final[int] = 10 # DATABASE ---------------------------- -APP_DB_ENGINE_KEY = f"{__name__}.db_engine" +APP_AIOPG_ENGINE_KEY = f"{__name__}.aiopg_engine" MAX_CONCURRENT_DB_TASKS: Final[int] = 2 # DATA STORAGE MANAGER ---------------------------------- diff --git a/services/storage/src/simcore_service_storage/db.py b/services/storage/src/simcore_service_storage/db.py index dd00a861b447..2dbb7dc8704c 100644 --- a/services/storage/src/simcore_service_storage/db.py +++ b/services/storage/src/simcore_service_storage/db.py @@ -2,7 +2,7 @@ from typing import Any from aiohttp import web -from aiopg.sa import Engine +from aiopg.sa.engine import Engine from servicelib.aiohttp.aiopg_utils import is_pg_responsive from servicelib.common_aiopg_utils import DataSourceName, create_pg_engine from servicelib.retry_policies import PostgresRetryPolicyUponInitialization @@ -13,7 +13,7 @@ ) from tenacity import retry -from .constants import APP_CONFIG_KEY, APP_DB_ENGINE_KEY +from .constants import APP_AIOPG_ENGINE_KEY, APP_CONFIG_KEY _logger = logging.getLogger(__name__) @@ -46,7 +46,8 @@ async def postgres_cleanup_ctx(app: web.Application): ) as engine: assert engine # nosec - app[APP_DB_ENGINE_KEY] = engine + app[APP_AIOPG_ENGINE_KEY] = engine + _logger.info("Created pg engine for %s", dsn) yield # ---------- _logger.info("Deleting pg engine for %s", dsn) @@ -55,11 +56,11 @@ async def postgres_cleanup_ctx(app: web.Application): async def is_service_responsive(app: web.Application) -> bool: """Returns true if the app can connect to db service""" - return await is_pg_responsive(engine=app[APP_DB_ENGINE_KEY]) + return await is_pg_responsive(engine=app[APP_AIOPG_ENGINE_KEY]) def get_engine_state(app: web.Application) -> dict[str, Any]: - engine: Engine | None = app.get(APP_DB_ENGINE_KEY) + engine: Engine | None = app.get(APP_AIOPG_ENGINE_KEY) if engine: engine_info: dict[str, Any] = get_pg_engine_stateinfo(engine) return engine_info @@ -67,7 +68,7 @@ def get_engine_state(app: web.Application) -> dict[str, Any]: def setup_db(app: web.Application): - app[APP_DB_ENGINE_KEY] = None + app[APP_AIOPG_ENGINE_KEY] = None # app is created at this point but not yet started _logger.debug("Setting up %s [service: %s] ...", __name__, "postgres") diff --git a/services/storage/src/simcore_service_storage/db_tokens.py b/services/storage/src/simcore_service_storage/db_tokens.py index 456d90366f9a..445a7c220d11 100644 --- a/services/storage/src/simcore_service_storage/db_tokens.py +++ b/services/storage/src/simcore_service_storage/db_tokens.py @@ -7,7 +7,7 @@ from models_library.users import UserID from simcore_postgres_database.storage_models import tokens -from .constants import APP_CONFIG_KEY, APP_DB_ENGINE_KEY +from .constants import APP_AIOPG_ENGINE_KEY, APP_CONFIG_KEY log = logging.getLogger(__name__) @@ -27,7 +27,7 @@ async def get_api_token_and_secret( app: web.Application, user_id: UserID ) -> tuple[str, str]: # from the client side together with the userid? - engine = app[APP_DB_ENGINE_KEY] + engine = app[APP_AIOPG_ENGINE_KEY] # defaults from config if any, othewise None api_token = app[APP_CONFIG_KEY].BF_API_KEY diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index 88d3715c36e1..db5a1ab288b5 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -38,8 +38,8 @@ from . import db_file_meta_data, db_projects, db_tokens from .constants import ( + APP_AIOPG_ENGINE_KEY, APP_CONFIG_KEY, - APP_DB_ENGINE_KEY, DATCORE_ID, EXPAND_DIR_MAX_ITEM_COUNT, MAX_CONCURRENT_S3_TASKS, @@ -1084,7 +1084,7 @@ def create_simcore_s3_data_manager(app: web.Application) -> SimcoreS3DataManager cfg: Settings = app[APP_CONFIG_KEY] assert cfg.STORAGE_S3 # nosec return SimcoreS3DataManager( - engine=app[APP_DB_ENGINE_KEY], + engine=app[APP_AIOPG_ENGINE_KEY], simcore_bucket_name=parse_obj_as(S3BucketName, cfg.STORAGE_S3.S3_BUCKET_NAME), app=app, settings=cfg, diff --git a/services/web/server/src/simcore_service_webserver/_constants.py b/services/web/server/src/simcore_service_webserver/_constants.py index 91b70f453074..aafa109b47da 100644 --- a/services/web/server/src/simcore_service_webserver/_constants.py +++ b/services/web/server/src/simcore_service_webserver/_constants.py @@ -4,8 +4,8 @@ from typing import Final from servicelib.aiohttp.application_keys import ( + APP_AIOPG_ENGINE_KEY, APP_CONFIG_KEY, - APP_DB_ENGINE_KEY, APP_FIRE_AND_FORGET_TASKS_KEY, APP_SETTINGS_KEY, ) @@ -30,7 +30,7 @@ __all__: tuple[str, ...] = ( "APP_CONFIG_KEY", - "APP_DB_ENGINE_KEY", + "APP_AIOPG_ENGINE_KEY", "APP_FIRE_AND_FORGET_TASKS_KEY", "APP_SETTINGS_KEY", "RQT_USERID_KEY", diff --git a/services/web/server/src/simcore_service_webserver/api_keys/_db.py b/services/web/server/src/simcore_service_webserver/api_keys/_db.py index a01a49c9a0b5..4a51464e1a9d 100644 --- a/services/web/server/src/simcore_service_webserver/api_keys/_db.py +++ b/services/web/server/src/simcore_service_webserver/api_keys/_db.py @@ -10,10 +10,11 @@ from models_library.basic_types import IdInt from models_library.products import ProductName from models_library.users import UserID -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY from simcore_postgres_database.models.api_keys import api_keys from sqlalchemy.dialects.postgresql import insert as pg_insert +from ..db.plugin import get_database_engine + _logger = logging.getLogger(__name__) @@ -23,7 +24,7 @@ class ApiKeyRepo: @classmethod def create_from_app(cls, app: web.Application): - return cls(engine=app[APP_DB_ENGINE_KEY]) + return cls(engine=get_database_engine(app)) async def list_names( self, *, user_id: UserID, product_name: ProductName diff --git a/services/web/server/src/simcore_service_webserver/db/_aiopg.py b/services/web/server/src/simcore_service_webserver/db/_aiopg.py new file mode 100644 index 000000000000..f6944e5ef679 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/db/_aiopg.py @@ -0,0 +1,105 @@ +""" +Helpers on aiopg + +SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529 +""" + +import logging +from collections.abc import AsyncIterator +from typing import Any, cast + +from aiohttp import web +from aiopg.sa import Engine, create_engine +from models_library.utils.json_serialization import json_dumps +from servicelib.aiohttp.aiopg_utils import is_pg_responsive +from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY +from servicelib.logging_utils import log_context +from servicelib.retry_policies import PostgresRetryPolicyUponInitialization +from simcore_postgres_database.errors import DBAPIError +from simcore_postgres_database.utils_aiopg import ( + DBMigrationError, + close_engine, + get_pg_engine_stateinfo, + raise_if_migration_not_ready, +) +from tenacity import retry + +from .settings import PostgresSettings, get_plugin_settings + +_logger = logging.getLogger(__name__) + + +@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs) +async def _ensure_pg_ready(settings: PostgresSettings) -> Engine: + engine: Engine = await create_engine( + settings.dsn, + application_name=settings.POSTGRES_CLIENT_NAME, + minsize=settings.POSTGRES_MINSIZE, + maxsize=settings.POSTGRES_MAXSIZE, + ) + + try: + await raise_if_migration_not_ready(engine) + except (DBMigrationError, DBAPIError): + await close_engine(engine) + raise + + return engine # tenacity rules guarantee exit with exc + + +async def postgres_cleanup_ctx(app: web.Application) -> AsyncIterator[None]: + + settings = get_plugin_settings(app) + + with log_context( + _logger, + logging.INFO, + "Connecting app[APP_AIOPG_ENGINE_KEY] to postgres with %s", + f"{settings=}", + ): + aiopg_engine = await _ensure_pg_ready(settings) + app[APP_AIOPG_ENGINE_KEY] = aiopg_engine + + _logger.info( + "app[APP_AIOPG_ENGINE_KEY] created %s", + json_dumps(get_engine_state(app), indent=1), + ) + + yield # ------------------- + + if aiopg_engine is not app.get(APP_AIOPG_ENGINE_KEY): + _logger.critical( + "app[APP_AIOPG_ENGINE_KEY] does not hold right db engine. Somebody has changed it??" + ) + + await close_engine(aiopg_engine) + + _logger.debug( + "app[APP_AIOPG_ENGINE_KEY] after shutdown %s (closed=%s): %s", + aiopg_engine.dsn, + aiopg_engine.closed, + json_dumps(get_engine_state(app), indent=1), + ) + + +def is_service_enabled(app: web.Application): + return app.get(APP_AIOPG_ENGINE_KEY) is not None + + +async def is_service_responsive(app: web.Application): + """Returns true if the app can connect to db service""" + if not is_service_enabled(app): + return False + return await is_pg_responsive(engine=app[APP_AIOPG_ENGINE_KEY]) + + +def get_engine_state(app: web.Application) -> dict[str, Any]: + engine: Engine | None = app.get(APP_AIOPG_ENGINE_KEY) + if engine: + pg_engine_stateinfo: dict[str, Any] = get_pg_engine_stateinfo(engine) + return pg_engine_stateinfo + return {} + + +def get_database_engine(app: web.Application) -> Engine: + return cast(Engine, app[APP_AIOPG_ENGINE_KEY]) diff --git a/services/web/server/src/simcore_service_webserver/db/_asyncpg.py b/services/web/server/src/simcore_service_webserver/db/_asyncpg.py new file mode 100644 index 000000000000..03bac23ea2c2 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/db/_asyncpg.py @@ -0,0 +1,38 @@ +""" +Helpers on asyncpg + +SEE migration aiopg->asyncpg https://github.com/ITISFoundation/osparc-simcore/issues/4529 +""" + +import logging +from collections.abc import AsyncIterator + +from aiohttp import web +from servicelib.aiohttp.db_asyncpg_engine import ( + close_db_connection, + connect_to_db, + get_async_engine, +) +from sqlalchemy.ext.asyncio import AsyncEngine + +from .settings import PostgresSettings, get_plugin_settings + +_logger = logging.getLogger(__name__) + + +async def postgres_cleanup_ctx(app: web.Application) -> AsyncIterator[None]: + settings: PostgresSettings = get_plugin_settings(app) + await connect_to_db(app, settings) + + assert get_async_engine(app) # nosec + assert isinstance(get_async_engine(app), AsyncEngine) # nosec + + yield + + await close_db_connection(app) + + +__all__: tuple[str, ...] = ( + "get_async_engine", + "postgres_cleanup_ctx", +) diff --git a/services/web/server/src/simcore_service_webserver/db/base_repository.py b/services/web/server/src/simcore_service_webserver/db/base_repository.py index 0e5902fbc187..f7c207fb1b00 100644 --- a/services/web/server/src/simcore_service_webserver/db/base_repository.py +++ b/services/web/server/src/simcore_service_webserver/db/base_repository.py @@ -2,7 +2,8 @@ from aiopg.sa.engine import Engine from models_library.users import UserID -from .._constants import APP_DB_ENGINE_KEY, RQT_USERID_KEY +from .._constants import RQT_USERID_KEY +from . import _aiopg class BaseRepository: @@ -15,12 +16,13 @@ def __init__(self, engine: Engine, user_id: UserID | None = None): @classmethod def create_from_request(cls, request: web.Request): return cls( - engine=request.app[APP_DB_ENGINE_KEY], user_id=request.get(RQT_USERID_KEY) + engine=_aiopg.get_database_engine(request.app), + user_id=request.get(RQT_USERID_KEY), ) @classmethod def create_from_app(cls, app: web.Application): - return cls(engine=app[APP_DB_ENGINE_KEY], user_id=None) + return cls(engine=_aiopg.get_database_engine(app), user_id=None) @property def engine(self) -> Engine: diff --git a/services/web/server/src/simcore_service_webserver/db/plugin.py b/services/web/server/src/simcore_service_webserver/db/plugin.py index 720637bf9935..b3dcdacf2809 100644 --- a/services/web/server/src/simcore_service_webserver/db/plugin.py +++ b/services/web/server/src/simcore_service_webserver/db/plugin.py @@ -3,95 +3,21 @@ """ import logging -from collections.abc import AsyncIterator -from typing import Any, cast from aiohttp import web -from aiopg.sa import Engine, create_engine -from models_library.utils.json_serialization import json_dumps -from servicelib.aiohttp.aiopg_utils import is_pg_responsive -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup -from servicelib.retry_policies import PostgresRetryPolicyUponInitialization -from simcore_postgres_database.errors import DBAPIError -from simcore_postgres_database.utils_aiopg import ( - DBMigrationError, - close_engine, - get_pg_engine_stateinfo, - raise_if_migration_not_ready, -) -from tenacity import retry -from .settings import PostgresSettings, get_plugin_settings +from . import _aiopg _logger = logging.getLogger(__name__) -@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs) -async def _ensure_pg_ready(settings: PostgresSettings) -> Engine: - - _logger.info("Connecting to postgres with %s", f"{settings=}") - engine: Engine = await create_engine( - settings.dsn, - application_name=settings.POSTGRES_CLIENT_NAME, - minsize=settings.POSTGRES_MINSIZE, - maxsize=settings.POSTGRES_MAXSIZE, - ) - - try: - await raise_if_migration_not_ready(engine) - except (DBMigrationError, DBAPIError): - await close_engine(engine) - raise - - _logger.info("Connection to postgres with %s succeeded", f"{settings=}") - return engine # tenacity rules guarantee exit with exc - - -async def postgres_cleanup_ctx(app: web.Application) -> AsyncIterator[None]: - - settings = get_plugin_settings(app) - aiopg_engine = await _ensure_pg_ready(settings) - app[APP_DB_ENGINE_KEY] = aiopg_engine - - _logger.info("pg engine created %s", json_dumps(get_engine_state(app), indent=1)) - - yield # ------------------- - - if aiopg_engine is not app.get(APP_DB_ENGINE_KEY): - _logger.critical("app does not hold right db engine. Somebody has changed it??") - - await close_engine(aiopg_engine) - - _logger.debug( - "pg engine created after shutdown %s (closed=%s): %s", - aiopg_engine.dsn, - aiopg_engine.closed, - json_dumps(get_engine_state(app), indent=1), - ) - - -def is_service_enabled(app: web.Application): - return app.get(APP_DB_ENGINE_KEY) is not None - - -async def is_service_responsive(app: web.Application): - """Returns true if the app can connect to db service""" - if not is_service_enabled(app): - return False - return await is_pg_responsive(engine=app[APP_DB_ENGINE_KEY]) - - -def get_engine_state(app: web.Application) -> dict[str, Any]: - engine: Engine | None = app.get(APP_DB_ENGINE_KEY) - if engine: - pg_engine_stateinfo: dict[str, Any] = get_pg_engine_stateinfo(engine) - return pg_engine_stateinfo - return {} - - -def get_database_engine(app: web.Application) -> Engine: - return cast(Engine, app[APP_DB_ENGINE_KEY]) +# API +get_database_engine = _aiopg.get_database_engine +get_engine_state = _aiopg.get_engine_state +is_service_responsive = _aiopg.is_service_responsive +is_service_enabled = _aiopg.is_service_enabled @app_module_setup( @@ -103,7 +29,8 @@ def get_database_engine(app: web.Application) -> Engine: def setup_db(app: web.Application): # ensures keys exist - app[APP_DB_ENGINE_KEY] = None + app[APP_AIOPG_ENGINE_KEY] = None + assert get_database_engine(app) is None # nosec - # async connection to db - app.cleanup_ctx.append(postgres_cleanup_ctx) + # init engines + app.cleanup_ctx.append(_aiopg.postgres_cleanup_ctx) diff --git a/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py b/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py index eba764a25046..f97f214d4a92 100644 --- a/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py +++ b/services/web/server/src/simcore_service_webserver/db_listener/_db_comp_tasks_listening_task.py @@ -18,10 +18,10 @@ from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from pydantic.types import PositiveInt -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects from sqlalchemy.sql import select +from ..db.plugin import get_database_engine from ..projects import exceptions, projects_api from ..projects.nodes_utils import update_node_outputs from ._utils import convert_state_from_db @@ -159,7 +159,7 @@ async def _comp_tasks_listening_task(app: web.Application) -> None: while True: try: # create a special connection here - db_engine = app[APP_DB_ENGINE_KEY] + db_engine = get_database_engine(app) _logger.info("listening to comp_task events...") await _listen(app, db_engine) except asyncio.CancelledError: # noqa: PERF203 diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py index 229932958197..48d781aee8de 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_tasks_users.py @@ -15,7 +15,7 @@ from tenacity.before_sleep import before_sleep_log from tenacity.wait import wait_exponential -from .._constants import APP_DB_ENGINE_KEY +from ..db.plugin import get_database_engine from ..login.utils import notify_user_logout from ..security.api import clean_auth_policy_cache from ..users.api import update_expired_users @@ -60,7 +60,7 @@ async def _update_expired_users(app: web.Application): """ It is resilient, i.e. if update goes wrong, it waits a bit and retries """ - engine: Engine = app[APP_DB_ENGINE_KEY] + engine: Engine = get_database_engine(app) assert engine # nosec if updated := await update_expired_users(engine): diff --git a/services/web/server/src/simcore_service_webserver/products/_events.py b/services/web/server/src/simcore_service_webserver/products/_events.py index 64726ce55c3c..f1e4601d7c7c 100644 --- a/services/web/server/src/simcore_service_webserver/products/_events.py +++ b/services/web/server/src/simcore_service_webserver/products/_events.py @@ -2,7 +2,6 @@ import tempfile from collections import OrderedDict from pathlib import Path -from typing import cast from aiohttp import web from aiopg.sa.engine import Engine @@ -14,7 +13,8 @@ get_or_create_product_group, ) -from .._constants import APP_DB_ENGINE_KEY, APP_PRODUCTS_KEY +from .._constants import APP_PRODUCTS_KEY +from ..db.plugin import get_database_engine from ..statics._constants import FRONTEND_APP_DEFAULT, FRONTEND_APPS_AVAILABLE from ._db import get_product_payment_fields, iter_products from ._model import Product @@ -46,7 +46,7 @@ async def auto_create_products_groups(app: web.Application) -> None: NOTE: could not add this in 'setup_groups' (groups plugin) since it has to be executed BEFORE 'load_products_on_startup' """ - engine = cast(Engine, app[APP_DB_ENGINE_KEY]) + engine = get_database_engine(app) async with engine.acquire() as connection: async for row in iter_products(connection): @@ -76,7 +76,7 @@ async def load_products_on_startup(app: web.Application): Loads info on products stored in the database into app's storage (i.e. memory) """ app_products: OrderedDict[str, Product] = OrderedDict() - engine: Engine = app[APP_DB_ENGINE_KEY] + engine: Engine = get_database_engine(app) async with engine.acquire() as connection: async for row in iter_products(connection): assert isinstance(row, RowProxy) # nosec diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index 7fa0bc000df3..89f07163890c 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -34,7 +34,7 @@ from models_library.workspaces import WorkspaceID from pydantic import parse_obj_as from pydantic.types import PositiveInt -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY from servicelib.logging_utils import get_log_record_extra, log_context from simcore_postgres_database.errors import UniqueViolation from simcore_postgres_database.models.groups import user_to_groups @@ -106,11 +106,11 @@ class ProjectDBAPI(BaseProjectDB): def __init__(self, app: web.Application) -> None: self._app = app - self._engine = cast(Engine, app.get(APP_DB_ENGINE_KEY)) + self._engine = cast(Engine, app.get(APP_AIOPG_ENGINE_KEY)) def _init_engine(self) -> None: # Delays creation of engine because it setup_db does it on_startup - self._engine = cast(Engine, self._app.get(APP_DB_ENGINE_KEY)) + self._engine = cast(Engine, self._app.get(APP_AIOPG_ENGINE_KEY)) if self._engine is None: msg = "Database subsystem was not initialized" raise ValueError(msg) diff --git a/services/web/server/src/simcore_service_webserver/scicrunch/db.py b/services/web/server/src/simcore_service_webserver/scicrunch/db.py index ca5df617ee8c..476e320f73d2 100644 --- a/services/web/server/src/simcore_service_webserver/scicrunch/db.py +++ b/services/web/server/src/simcore_service_webserver/scicrunch/db.py @@ -7,10 +7,10 @@ import sqlalchemy as sa from aiohttp import web from aiopg.sa.result import ResultProxy, RowProxy -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY from simcore_postgres_database.models.scicrunch_resources import scicrunch_resources from sqlalchemy.dialects.postgresql import insert as sa_pg_insert +from ..db.plugin import get_database_engine from .models import ResearchResource, ResearchResourceAtdB logger = logging.getLogger(__name__) @@ -26,7 +26,7 @@ class ResearchResourceRepository: # WARNING: interfaces to both ResarchResource and ResearchResourceAtDB def __init__(self, app: web.Application): - self._engine = app[APP_DB_ENGINE_KEY] + self._engine = get_database_engine(app) async def list_resources(self) -> list[ResearchResource]: async with self._engine.acquire() as conn: diff --git a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_core.py b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_core.py index a34a50832d94..dcafdf528deb 100644 --- a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_core.py +++ b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_core.py @@ -14,7 +14,7 @@ ) from sqlalchemy.dialects.postgresql import ARRAY, INTEGER -from .._constants import APP_DB_ENGINE_KEY +from ..db.plugin import get_database_engine from ._errors import FileToLarge, IncompatibleService from ._models import ViewerInfo from .settings import get_plugin_settings @@ -41,7 +41,7 @@ async def list_viewers_info( # consumers: deque = deque() - async with app[APP_DB_ENGINE_KEY].acquire() as conn: + async with get_database_engine(app).acquire() as conn: # FIXME: ADD CONDITION: service MUST be shared with EVERYBODY! query = services_consume_filetypes.select() if file_type: @@ -119,7 +119,7 @@ def _version(column_or_value): return await get_default_viewer(app, file_type, file_size) if service_key and service_version: - async with app[APP_DB_ENGINE_KEY].acquire() as conn: + async with get_database_engine(app).acquire() as conn: query = ( services_consume_filetypes.select() .where( diff --git a/services/web/server/src/simcore_service_webserver/tags/_handlers.py b/services/web/server/src/simcore_service_webserver/tags/_handlers.py index de0fc7dd5b1a..07925f4749e9 100644 --- a/services/web/server/src/simcore_service_webserver/tags/_handlers.py +++ b/services/web/server/src/simcore_service_webserver/tags/_handlers.py @@ -3,7 +3,6 @@ from aiohttp import web from aiopg.sa.engine import Engine from pydantic import parse_obj_as -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY from servicelib.aiohttp.requests_validation import ( parse_request_body_as, parse_request_path_parameters_as, @@ -17,6 +16,7 @@ ) from .._meta import API_VTAG as VTAG +from ..db.plugin import get_database_engine from ..login.decorators import login_required from ..security.decorators import permission_required from ..utils_aiohttp import envelope_json_response @@ -55,7 +55,7 @@ async def wrapper(request: web.Request) -> web.StreamResponse: @permission_required("tag.crud.*") @_handle_tags_exceptions async def create_tag(request: web.Request): - engine: Engine = request.app[APP_DB_ENGINE_KEY] + engine: Engine = get_database_engine(request.app) req_ctx = TagRequestContext.parse_obj(request) new_tag = await parse_request_body_as(TagCreate, request) @@ -77,7 +77,7 @@ async def create_tag(request: web.Request): @permission_required("tag.crud.*") @_handle_tags_exceptions async def list_tags(request: web.Request): - engine: Engine = request.app[APP_DB_ENGINE_KEY] + engine: Engine = get_database_engine(request.app) req_ctx = TagRequestContext.parse_obj(request) repo = TagsRepo(user_id=req_ctx.user_id) @@ -93,7 +93,7 @@ async def list_tags(request: web.Request): @permission_required("tag.crud.*") @_handle_tags_exceptions async def update_tag(request: web.Request): - engine: Engine = request.app[APP_DB_ENGINE_KEY] + engine: Engine = get_database_engine(request.app) req_ctx = TagRequestContext.parse_obj(request) path_params = parse_request_path_parameters_as(TagPathParams, request) tag_updates = await parse_request_body_as(TagUpdate, request) @@ -112,7 +112,7 @@ async def update_tag(request: web.Request): @permission_required("tag.crud.*") @_handle_tags_exceptions async def delete_tag(request: web.Request): - engine: Engine = request.app[APP_DB_ENGINE_KEY] + engine: Engine = get_database_engine(request.app) req_ctx = TagRequestContext.parse_obj(request) path_params = parse_request_path_parameters_as(TagPathParams, request) diff --git a/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py b/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py index dfea0bf713bf..49989b3fa313 100644 --- a/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py +++ b/services/web/server/tests/unit/with_dbs/01/notifications/test_notifications__db_comp_tasks_listening_task.py @@ -17,7 +17,7 @@ from models_library.projects import ProjectAtDB, ProjectID from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.webserver_login import UserInfoDict -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY from simcore_postgres_database.models.comp_pipeline import StateType from simcore_postgres_database.models.comp_tasks import NodeClass, comp_tasks from simcore_postgres_database.models.users import UserRole @@ -139,7 +139,7 @@ async def test_listen_comp_tasks_task( task_class: NodeClass, faker: Faker, ): - db_engine: aiopg.sa.Engine = client.app[APP_DB_ENGINE_KEY] + db_engine: aiopg.sa.Engine = client.app[APP_AIOPG_ENGINE_KEY] some_project = await project(logged_user) pipeline(project_id=f"{some_project.uuid}") task = comp_task( diff --git a/services/web/server/tests/unit/with_dbs/01/test_db.py b/services/web/server/tests/unit/with_dbs/01/test_db.py index e4f77f379e0a..4b4dce7b1344 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_db.py +++ b/services/web/server/tests/unit/with_dbs/01/test_db.py @@ -5,21 +5,100 @@ from pathlib import Path +import aiopg.sa +import asyncpg +import pytest +import sqlalchemy as sa import yaml +from aiohttp import web from aiohttp.test_utils import TestServer +from pytest_mock import MockFixture, MockType +from simcore_service_webserver.application_settings import ( + ApplicationSettings, + get_application_settings, +) +from simcore_service_webserver.db import _aiopg, _asyncpg from simcore_service_webserver.db.plugin import ( is_service_enabled, is_service_responsive, + setup_db, ) +from simcore_service_webserver.login.storage import AsyncpgStorage, get_plugin_storage +from sqlalchemy.ext.asyncio import AsyncEngine + + +@pytest.fixture +def mock_asyncpg_in_setup_db(mocker: MockFixture) -> MockType: + + original_setup = setup_db + + mock_setup_db = mocker.patch( + "simcore_service_webserver.application.setup_db", autospec=True + ) + + def _wrapper_setup_db(app: web.Application): + original_setup(app) + + # NEW engine ! + app.cleanup_ctx.append(_asyncpg.postgres_cleanup_ctx) + + mock_setup_db.side_effect = _wrapper_setup_db + return mock_setup_db + + +async def test_all_pg_engines_in_app( + mock_asyncpg_in_setup_db: MockType, web_server: TestServer +): + assert mock_asyncpg_in_setup_db.called + + app = web_server.app + assert app + + settings: ApplicationSettings = get_application_settings(app) + assert settings.WEBSERVER_DB + assert settings.WEBSERVER_DB.POSTGRES_CLIENT_NAME + + # (1) aiopg engine (deprecated) + aiopg_engine = _aiopg.get_database_engine(app) + assert aiopg_engine + assert isinstance(aiopg_engine, aiopg.sa.Engine) + + # (2) asyncpg engine via sqlalchemy.ext.asyncio (new) + asyncpg_engine: AsyncEngine = _asyncpg.get_async_engine(app) + assert asyncpg_engine + assert isinstance(asyncpg_engine, AsyncEngine) + + # (3) low-level asyncpg Pool (deprecated) + # Will be replaced by (2) + login_storage: AsyncpgStorage = get_plugin_storage(app) + assert login_storage.pool + assert isinstance(login_storage.pool, asyncpg.Pool) + + # they ALL point to the SAME database + assert aiopg_engine.dsn + assert asyncpg_engine.url + + query = sa.text('SELECT "version_num" FROM "alembic_version"') + async with login_storage.pool.acquire() as conn: + result_pool = await conn.fetchval(str(query)) + + async with asyncpg_engine.connect() as conn: + result_asyncpg = (await conn.execute(query)).scalar_one_or_none() + + async with aiopg_engine.acquire() as conn: + result_aiopg = await (await conn.execute(query)).scalar() + + assert result_pool == result_asyncpg + assert result_pool == result_aiopg def test_uses_same_postgres_version( docker_compose_file: Path, osparc_simcore_root_dir: Path ): - with open(docker_compose_file) as fh: + with Path.open(docker_compose_file) as fh: fixture = yaml.safe_load(fh) - with open(osparc_simcore_root_dir / "services" / "docker-compose.yml") as fh: + with Path.open(osparc_simcore_root_dir / "services" / "docker-compose.yml") as fh: expected = yaml.safe_load(fh) assert ( diff --git a/services/web/server/tests/unit/with_dbs/01/test_groups_classifiers.py b/services/web/server/tests/unit/with_dbs/01/test_groups_classifiers.py index 7425b466206a..b2fc82f44e69 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_groups_classifiers.py +++ b/services/web/server/tests/unit/with_dbs/01/test_groups_classifiers.py @@ -7,7 +7,7 @@ import pytest import sqlalchemy as sa from servicelib.common_aiopg_utils import DataSourceName, create_pg_engine -from simcore_service_webserver._constants import APP_DB_ENGINE_KEY +from simcore_service_webserver._constants import APP_AIOPG_ENGINE_KEY from simcore_service_webserver.groups._classifiers import GroupClassifierRepository from sqlalchemy.sql import text @@ -35,7 +35,7 @@ async def app(postgres_dsn: dict, inject_tables): ) async with create_pg_engine(dsn) as engine: - fake_app = {APP_DB_ENGINE_KEY: engine} + fake_app = {APP_AIOPG_ENGINE_KEY: engine} yield fake_app diff --git a/services/web/server/tests/unit/with_dbs/03/meta_modeling/test_meta_modeling_iterations.py b/services/web/server/tests/unit/with_dbs/03/meta_modeling/test_meta_modeling_iterations.py index f4bfac46564d..20cb885bdfa3 100644 --- a/services/web/server/tests/unit/with_dbs/03/meta_modeling/test_meta_modeling_iterations.py +++ b/services/web/server/tests/unit/with_dbs/03/meta_modeling/test_meta_modeling_iterations.py @@ -23,7 +23,7 @@ ) from servicelib.aiohttp import status from simcore_postgres_database.models.projects import projects -from simcore_service_webserver._constants import APP_DB_ENGINE_KEY +from simcore_service_webserver._constants import APP_AIOPG_ENGINE_KEY from simcore_service_webserver.director_v2.api import get_project_run_policy from simcore_service_webserver.meta_modeling._handlers import ( Page, @@ -62,7 +62,7 @@ async def context_with_logged_user(client: TestClient, logged_user: UserInfoDict yield assert client.app - engine = client.app[APP_DB_ENGINE_KEY] + engine = client.app[APP_AIOPG_ENGINE_KEY] async with engine.acquire() as conn: # cascade deletes everything except projects_vc_snapshot await conn.execute(projects.delete()) diff --git a/services/web/server/tests/unit/with_dbs/03/products/test_products_db.py b/services/web/server/tests/unit/with_dbs/03/products/test_products_db.py index d51fe139f4e9..06022c2f9fa0 100644 --- a/services/web/server/tests/unit/with_dbs/03/products/test_products_db.py +++ b/services/web/server/tests/unit/with_dbs/03/products/test_products_db.py @@ -22,7 +22,7 @@ WebFeedback, products, ) -from simcore_service_webserver.db.plugin import APP_DB_ENGINE_KEY +from simcore_service_webserver.db.plugin import APP_AIOPG_ENGINE_KEY from simcore_service_webserver.products._db import ProductRepository from simcore_service_webserver.products._middlewares import ( _get_app_default_product_name, @@ -42,7 +42,7 @@ async def product_row(app: web.Application, product_data: dict[str, Any]) -> Row Note that product_data is a SUBSET of product_row (e.g. modified dattimes etc)! """ - engine = app[APP_DB_ENGINE_KEY] + engine = app[APP_AIOPG_ENGINE_KEY] assert engine async with engine.acquire() as conn: diff --git a/services/web/server/tests/unit/with_dbs/03/test_users_api.py b/services/web/server/tests/unit/with_dbs/03/test_users_api.py index 28b70592ce8b..89b5ddea4747 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_users_api.py +++ b/services/web/server/tests/unit/with_dbs/03/test_users_api.py @@ -11,7 +11,7 @@ from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from pytest_simcore.helpers.webserver_login import NewUser from servicelib.aiohttp import status -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY from simcore_postgres_database.models.users import UserStatus from simcore_service_webserver.users.api import ( get_user_name_and_email, @@ -67,7 +67,7 @@ async def _rq_login(): await assert_status(r1, status.HTTP_200_OK) # apply update - expired = await update_expired_users(client.app[APP_DB_ENGINE_KEY]) + expired = await update_expired_users(client.app[APP_AIOPG_ENGINE_KEY]) if has_expired: assert expired == [user["id"]] else: diff --git a/services/web/server/tests/unit/with_dbs/03/version_control/conftest.py b/services/web/server/tests/unit/with_dbs/03/version_control/conftest.py index 64c0052efd65..7343176760e8 100644 --- a/services/web/server/tests/unit/with_dbs/03/version_control/conftest.py +++ b/services/web/server/tests/unit/with_dbs/03/version_control/conftest.py @@ -30,7 +30,7 @@ ) from simcore_service_webserver._meta import API_VTAG as VX from simcore_service_webserver.db.models import UserRole -from simcore_service_webserver.db.plugin import APP_DB_ENGINE_KEY +from simcore_service_webserver.db.plugin import APP_AIOPG_ENGINE_KEY from simcore_service_webserver.log import setup_logging from simcore_service_webserver.projects.models import ProjectDict from tenacity.asyncio import AsyncRetrying @@ -159,7 +159,7 @@ async def user_project( # cleanup repos assert client.app - engine = client.app[APP_DB_ENGINE_KEY] + engine = client.app[APP_AIOPG_ENGINE_KEY] async with engine.acquire() as conn: # cascade deletes everything except projects_vc_snapshot await conn.execute(projects_vc_repos.delete()) diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index ba3bed4b5938..f4f527179b17 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -41,12 +41,13 @@ from pydantic import ByteSize, parse_obj_as from pytest_mock import MockerFixture from pytest_simcore.helpers.dict_tools import ConfigDict +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from pytest_simcore.helpers.webserver_login import NewUser, UserInfoDict from pytest_simcore.helpers.webserver_parametrizations import MockedStorageSubsystem from pytest_simcore.helpers.webserver_projects import NewProject from redis import Redis -from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY +from servicelib.aiohttp.application_keys import APP_AIOPG_ENGINE_KEY from servicelib.aiohttp.long_running_tasks.client import LRTask from servicelib.aiohttp.long_running_tasks.server import ProgressPercent, TaskProgress from servicelib.common_aiopg_utils import DSN @@ -125,6 +126,7 @@ def app_cfg(default_app_cfg: ConfigDict, unused_tcp_port_factory) -> ConfigDict: @pytest.fixture def app_environment( + monkeypatch: pytest.MonkeyPatch, app_cfg: ConfigDict, monkeypatch_setenv_from_app_config: Callable[[ConfigDict], dict[str, str]], ) -> EnvVarsDict: @@ -140,7 +142,14 @@ def app_environment(app_environment: dict[str, str], monkeypatch: pytest.MonkeyP """ print("+ web_server:") cfg = deepcopy(app_cfg) - return monkeypatch_setenv_from_app_config(cfg) + envs = monkeypatch_setenv_from_app_config(cfg) + + # + # NOTE: this emulates hostname: "wb-{{.Node.Hostname}}-{{.Task.Slot}}" in docker-compose that + # affects PostgresSettings.POSTGRES_CLIENT_NAME + # + extra = setenvs_from_dict(monkeypatch, {"HOSTNAME": "wb-test_host.0"}) + return envs | extra @pytest.fixture @@ -193,7 +202,7 @@ def web_server( assert isinstance(postgres_db, sa.engine.Engine) - pg_settings = dict(e.split("=") for e in app[APP_DB_ENGINE_KEY].dsn.split()) + pg_settings = dict(e.split("=") for e in app[APP_AIOPG_ENGINE_KEY].dsn.split()) assert pg_settings["host"] == postgres_db.url.host assert int(pg_settings["port"]) == postgres_db.url.port assert pg_settings["user"] == postgres_db.url.username