diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_user_preferences.py b/packages/postgres-database/src/simcore_postgres_database/utils_user_preferences.py index 48857d189fb3..4e803d44752d 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_user_preferences.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_user_preferences.py @@ -1,8 +1,8 @@ from typing import Any import sqlalchemy as sa -from aiopg.sa.connection import SAConnection from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncConnection from .models.user_preferences import ( user_preferences_frontend, @@ -10,8 +10,7 @@ ) -class CouldNotCreateOrUpdateUserPreferenceError(Exception): - ... +class CouldNotCreateOrUpdateUserPreferenceError(Exception): ... class BasePreferencesRepo: @@ -20,7 +19,7 @@ class BasePreferencesRepo: @classmethod async def save( cls, - conn: SAConnection, + conn: AsyncConnection, *, user_id: int, product_name: str, @@ -49,7 +48,7 @@ async def save( @classmethod async def load( cls, - conn: SAConnection, + conn: AsyncConnection, *, user_id: int, product_name: str, diff --git a/packages/postgres-database/tests/conftest.py b/packages/postgres-database/tests/conftest.py index 1d2dde70f974..4aaa0dad0b43 100644 --- a/packages/postgres-database/tests/conftest.py +++ b/packages/postgres-database/tests/conftest.py @@ -38,6 +38,7 @@ user_to_groups, users, ) +from sqlalchemy.engine.row import Row from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine pytest_plugins = [ @@ -324,18 +325,26 @@ async def _creator(project_uuid: uuid.UUID) -> ProjectNode: @pytest.fixture -def create_fake_product( - connection: aiopg.sa.connection.SAConnection, -) -> Callable[..., Awaitable[RowProxy]]: - async def _creator(product_name: str) -> RowProxy: - result = await connection.execute( - sa.insert(products) - .values(name=product_name, host_regex=".*") - .returning(sa.literal_column("*")) - ) - assert result - row = await result.first() - assert row +async def create_fake_product( + asyncpg_engine: AsyncEngine, +) -> AsyncIterator[Callable[[str], Awaitable[Row]]]: + created_product_names = set() + + async def _creator(product_name: str) -> Row: + async with asyncpg_engine.begin() as connection: + result = await connection.execute( + sa.insert(products) + .values(name=product_name, host_regex=".*") + .returning(sa.literal_column("*")) + ) + assert result + row = result.one() + created_product_names.add(row.name) return row - return _creator + yield _creator + + async with asyncpg_engine.begin() as conn: + await conn.execute( + products.delete().where(products.c.name.in_(created_product_names)) + ) diff --git a/packages/postgres-database/tests/test_utils_groups_extra_properties.py b/packages/postgres-database/tests/test_utils_groups_extra_properties.py index e7900de60828..ce3891439e64 100644 --- a/packages/postgres-database/tests/test_utils_groups_extra_properties.py +++ b/packages/postgres-database/tests/test_utils_groups_extra_properties.py @@ -21,6 +21,7 @@ GroupExtraPropertiesRepo, ) from sqlalchemy import literal_column +from sqlalchemy.engine.row import Row from sqlalchemy.ext.asyncio import AsyncEngine @@ -84,7 +85,7 @@ async def test_get( connection: aiopg.sa.connection.SAConnection, registered_user: RowProxy, product_name: str, - create_fake_product: Callable[..., Awaitable[RowProxy]], + create_fake_product: Callable[[str], Awaitable[Row]], create_fake_group_extra_properties: Callable[..., Awaitable[GroupExtraProperties]], ): with pytest.raises(GroupExtraPropertiesNotFoundError): @@ -106,7 +107,7 @@ async def test_get_v2( asyncpg_engine: AsyncEngine, registered_user: RowProxy, product_name: str, - create_fake_product: Callable[..., Awaitable[RowProxy]], + create_fake_product: Callable[[str], Awaitable[Row]], create_fake_group_extra_properties: Callable[..., Awaitable[GroupExtraProperties]], ): with pytest.raises(GroupExtraPropertiesNotFoundError): @@ -157,7 +158,7 @@ async def test_get_aggregated_properties_for_user_returns_properties_in_expected connection: aiopg.sa.connection.SAConnection, product_name: str, registered_user: RowProxy, - create_fake_product: Callable[..., Awaitable[RowProxy]], + create_fake_product: Callable[[str], Awaitable[Row]], create_fake_group: Callable[..., Awaitable[RowProxy]], create_fake_group_extra_properties: Callable[..., Awaitable[GroupExtraProperties]], everyone_group_id: int, @@ -227,7 +228,7 @@ async def test_get_aggregated_properties_for_user_returns_properties_in_expected connection: aiopg.sa.connection.SAConnection, product_name: str, registered_user: RowProxy, - create_fake_product: Callable[..., Awaitable[RowProxy]], + create_fake_product: Callable[[str], Awaitable[Row]], create_fake_group: Callable[..., Awaitable[RowProxy]], create_fake_group_extra_properties: Callable[..., Awaitable[GroupExtraProperties]], everyone_group_id: int, @@ -274,7 +275,7 @@ async def test_get_aggregated_properties_for_user_returns_property_values_as_tru connection: aiopg.sa.connection.SAConnection, product_name: str, registered_user: RowProxy, - create_fake_product: Callable[..., Awaitable[RowProxy]], + create_fake_product: Callable[[str], Awaitable[Row]], create_fake_group: Callable[..., Awaitable[RowProxy]], create_fake_group_extra_properties: Callable[..., Awaitable[GroupExtraProperties]], everyone_group_id: int, @@ -385,7 +386,7 @@ async def test_get_aggregated_properties_for_user_returns_property_values_as_tru connection: aiopg.sa.connection.SAConnection, product_name: str, registered_user: RowProxy, - create_fake_product: Callable[..., Awaitable[RowProxy]], + create_fake_product: Callable[[str], Awaitable[Row]], create_fake_group: Callable[..., Awaitable[RowProxy]], create_fake_group_extra_properties: Callable[..., Awaitable[GroupExtraProperties]], everyone_group_id: int, diff --git a/packages/postgres-database/tests/test_utils_user_preferences.py b/packages/postgres-database/tests/test_utils_user_preferences.py index d432708e9e9c..0f70dcccf20b 100644 --- a/packages/postgres-database/tests/test_utils_user_preferences.py +++ b/packages/postgres-database/tests/test_utils_user_preferences.py @@ -5,8 +5,6 @@ from typing import Any import pytest -from aiopg.sa.connection import SAConnection -from aiopg.sa.result import RowProxy from faker import Faker from pytest_simcore.helpers.faker_factories import random_user from simcore_postgres_database.models.users import UserRole, users @@ -15,6 +13,8 @@ FrontendUserPreferencesRepo, UserServicesUserPreferencesRepo, ) +from sqlalchemy.engine.row import Row +from sqlalchemy.ext.asyncio import AsyncEngine @pytest.fixture @@ -28,7 +28,9 @@ def preference_two() -> str: @pytest.fixture -async def product_name(create_fake_product: Callable[..., Awaitable[RowProxy]]) -> str: +async def product_name( + create_fake_product: Callable[[str], Awaitable[Row]], +) -> str: product = await create_fake_product("fake-product") return product[0] @@ -39,7 +41,7 @@ def preference_repo(request: pytest.FixtureRequest) -> type[BasePreferencesRepo] async def _assert_save_get_preference( - connection: SAConnection, + asyncpg_engine: AsyncEngine, preference_repo: type[BasePreferencesRepo], *, user_id: int, @@ -47,37 +49,40 @@ async def _assert_save_get_preference( product_name: str, payload: Any, ) -> None: - await preference_repo.save( - connection, - user_id=user_id, - preference_name=preference_name, - product_name=product_name, - payload=payload, - ) - get_res_2: Any | None = await preference_repo.load( - connection, - user_id=user_id, - preference_name=preference_name, - product_name=product_name, - ) + async with asyncpg_engine.begin() as connection: + await preference_repo.save( + connection, + user_id=user_id, + preference_name=preference_name, + product_name=product_name, + payload=payload, + ) + async with asyncpg_engine.connect() as connection: + get_res_2: Any | None = await preference_repo.load( + connection, + user_id=user_id, + preference_name=preference_name, + product_name=product_name, + ) assert get_res_2 is not None assert get_res_2 == payload async def _assert_preference_not_saved( - connection: SAConnection, + asyncpg_engine: AsyncEngine, preference_repo: type[BasePreferencesRepo], *, user_id: int, preference_name: str, product_name: str, ) -> None: - not_found: Any | None = await preference_repo.load( - connection, - user_id=user_id, - preference_name=preference_name, - product_name=product_name, - ) + async with asyncpg_engine.connect() as connection: + not_found: Any | None = await preference_repo.load( + connection, + user_id=user_id, + preference_name=preference_name, + product_name=product_name, + ) assert not_found is None @@ -92,26 +97,27 @@ def _get_random_payload( pytest.fail(f"Did not define a casa for {preference_repo=}. Please add one.") -async def _get_user_id(connection: SAConnection, faker: Faker) -> int: +async def _create_user_id(asyncpg_engine: AsyncEngine, faker: Faker) -> int: data = random_user(role=faker.random_element(elements=UserRole)) - user_id = await connection.scalar( - users.insert().values(**data).returning(users.c.id) - ) + async with asyncpg_engine.begin() as connection: + user_id = await connection.scalar( + users.insert().values(**data).returning(users.c.id) + ) assert user_id return user_id async def test_user_preference_repo_workflow( - connection: SAConnection, + asyncpg_engine: AsyncEngine, preference_repo: type[BasePreferencesRepo], preference_one: str, product_name: str, faker: Faker, ): - user_id = await _get_user_id(connection, faker) + user_id = await _create_user_id(asyncpg_engine, faker) # preference is not saved await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, @@ -124,7 +130,7 @@ async def test_user_preference_repo_workflow( # store the preference for the first time await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, @@ -134,7 +140,7 @@ async def test_user_preference_repo_workflow( # updating the preference still works await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, @@ -144,14 +150,14 @@ async def test_user_preference_repo_workflow( async def test__same_preference_name_product_name__different_users( - connection: SAConnection, + asyncpg_engine: AsyncEngine, preference_repo: type[BasePreferencesRepo], preference_one: str, product_name: str, faker: Faker, ): - user_id_1 = await _get_user_id(connection, faker) - user_id_2 = await _get_user_id(connection, faker) + user_id_1 = await _create_user_id(asyncpg_engine, faker) + user_id_2 = await _create_user_id(asyncpg_engine, faker) payload_1 = _get_random_payload(faker, preference_repo) payload_2 = _get_random_payload(faker, preference_repo) @@ -159,14 +165,14 @@ async def test__same_preference_name_product_name__different_users( # save preference for first user await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id_1, preference_name=preference_one, product_name=product_name, ) await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id_1, preference_name=preference_one, @@ -176,14 +182,14 @@ async def test__same_preference_name_product_name__different_users( # save preference for second user await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id_2, preference_name=preference_one, product_name=product_name, ) await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id_2, preference_name=preference_one, @@ -193,8 +199,8 @@ async def test__same_preference_name_product_name__different_users( async def test__same_user_preference_name__different_product_name( - connection: SAConnection, - create_fake_product: Callable[..., Awaitable[RowProxy]], + asyncpg_engine: AsyncEngine, + create_fake_product: Callable[[str], Awaitable[Row]], preference_repo: type[BasePreferencesRepo], preference_one: str, faker: Faker, @@ -202,7 +208,7 @@ async def test__same_user_preference_name__different_product_name( product_1 = (await create_fake_product("fake-product-1"))[0] product_2 = (await create_fake_product("fake-product-2"))[0] - user_id = await _get_user_id(connection, faker) + user_id = await _create_user_id(asyncpg_engine, faker) payload_1 = _get_random_payload(faker, preference_repo) payload_2 = _get_random_payload(faker, preference_repo) @@ -210,14 +216,14 @@ async def test__same_user_preference_name__different_product_name( # save for first product_name await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, product_name=product_1, ) await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, @@ -227,14 +233,14 @@ async def test__same_user_preference_name__different_product_name( # save for second product_name await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, product_name=product_2, ) await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, @@ -244,14 +250,14 @@ async def test__same_user_preference_name__different_product_name( async def test__same_product_name_user__different_preference_name( - connection: SAConnection, + asyncpg_engine: AsyncEngine, preference_repo: type[BasePreferencesRepo], preference_one: str, preference_two: str, product_name: str, faker: Faker, ): - user_id = await _get_user_id(connection, faker) + user_id = await _create_user_id(asyncpg_engine, faker) payload_1 = _get_random_payload(faker, preference_repo) payload_2 = _get_random_payload(faker, preference_repo) @@ -259,14 +265,14 @@ async def test__same_product_name_user__different_preference_name( # save first preference await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, product_name=product_name, ) await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_one, @@ -276,14 +282,14 @@ async def test__same_product_name_user__different_preference_name( # save second preference await _assert_preference_not_saved( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_two, product_name=product_name, ) await _assert_save_get_preference( - connection, + asyncpg_engine, preference_repo, user_id=user_id, preference_name=preference_two, diff --git a/packages/service-library/src/servicelib/db_async_engine.py b/packages/service-library/src/servicelib/db_async_engine.py deleted file mode 100644 index dd9166e46fa2..000000000000 --- a/packages/service-library/src/servicelib/db_async_engine.py +++ /dev/null @@ -1,59 +0,0 @@ -import logging -import warnings - -from fastapi import FastAPI -from settings_library.postgres import PostgresSettings -from simcore_postgres_database.utils_aiosqlalchemy import ( # type: ignore[import-not-found] # this on is unclear - get_pg_engine_stateinfo, - raise_if_migration_not_ready, -) -from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from tenacity import retry - -from .logging_utils import log_context -from .retry_policies import PostgresRetryPolicyUponInitialization - -_logger = logging.getLogger(__name__) - - -@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs) -async def connect_to_db(app: FastAPI, settings: PostgresSettings) -> None: - warnings.warn( - "The 'connect_to_db' function is deprecated and will be removed in a future release. " - "Please use 'postgres_lifespan' instead for managing the database connection lifecycle.", - DeprecationWarning, - stacklevel=2, - ) - with log_context( - _logger, logging.DEBUG, f"connection to db {settings.dsn_with_async_sqlalchemy}" - ): - engine: AsyncEngine = create_async_engine( - settings.dsn_with_async_sqlalchemy, - pool_size=settings.POSTGRES_MINSIZE, - max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE, - connect_args={ - "server_settings": {"application_name": settings.POSTGRES_CLIENT_NAME} - }, - pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects - future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released - ) - - with log_context(_logger, logging.DEBUG, "migration"): - try: - await raise_if_migration_not_ready(engine) - except Exception: - # NOTE: engine must be closed because retry will create a new engine - await engine.dispose() - raise - - app.state.engine = engine - _logger.debug( - "Setup engine: %s", - await get_pg_engine_stateinfo(engine), - ) - - -async def close_db_connection(app: FastAPI) -> None: - with log_context(_logger, logging.DEBUG, f"db disconnect of {app.state.engine}"): - if engine := app.state.engine: - await engine.dispose() diff --git a/packages/service-library/src/servicelib/db_asyncpg_utils.py b/packages/service-library/src/servicelib/db_asyncpg_utils.py index 4abbcd3ac66f..f9dfd27c2d8c 100644 --- a/packages/service-library/src/servicelib/db_asyncpg_utils.py +++ b/packages/service-library/src/servicelib/db_asyncpg_utils.py @@ -1,5 +1,7 @@ +import contextlib import logging import time +from collections.abc import AsyncIterator from datetime import timedelta from models_library.healthchecks import IsNonResponsive, IsResponsive, LivenessResult @@ -8,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from tenacity import retry +from .logging_utils import log_context from .retry_policies import PostgresRetryPolicyUponInitialization _logger = logging.getLogger(__name__) @@ -29,6 +32,7 @@ async def create_async_engine_and_database_ready( server_settings = None if settings.POSTGRES_CLIENT_NAME: + assert isinstance(settings.POSTGRES_CLIENT_NAME, str) # nosec server_settings = { "application_name": settings.POSTGRES_CLIENT_NAME, } @@ -63,3 +67,34 @@ async def check_postgres_liveness(engine: AsyncEngine) -> LivenessResult: return IsResponsive(elapsed=timedelta(seconds=elapsed_time)) except SQLAlchemyError as err: return IsNonResponsive(reason=f"{err}") + + +@contextlib.asynccontextmanager +async def with_async_pg_engine( + settings: PostgresSettings, +) -> AsyncIterator[AsyncEngine]: + """ + Creates an asyncpg engine and ensures it is properly closed after use. + """ + try: + with log_context( + _logger, + logging.DEBUG, + f"connection to db {settings.dsn_with_async_sqlalchemy}", + ): + server_settings = None + if settings.POSTGRES_CLIENT_NAME: + assert isinstance(settings.POSTGRES_CLIENT_NAME, str) + + engine = create_async_engine( + settings.dsn_with_async_sqlalchemy, + pool_size=settings.POSTGRES_MINSIZE, + max_overflow=settings.POSTGRES_MAXSIZE - settings.POSTGRES_MINSIZE, + connect_args={"server_settings": server_settings}, + pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects + future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released + ) + yield engine + finally: + with log_context(_logger, logging.DEBUG, f"db disconnect of {engine}"): + await engine.dispose() diff --git a/packages/simcore-sdk/requirements/_base.in b/packages/simcore-sdk/requirements/_base.in index 3ec9341a1779..9be327aed363 100644 --- a/packages/simcore-sdk/requirements/_base.in +++ b/packages/simcore-sdk/requirements/_base.in @@ -13,10 +13,9 @@ aiocache aiofiles aiohttp -aiopg[sa] -opentelemetry-instrumentation-aiopg packaging pint +sqlalchemy[asyncio] pydantic[email] tenacity tqdm diff --git a/packages/simcore-sdk/requirements/_base.txt b/packages/simcore-sdk/requirements/_base.txt index 44dd9b8ec12d..16690f9cea44 100644 --- a/packages/simcore-sdk/requirements/_base.txt +++ b/packages/simcore-sdk/requirements/_base.txt @@ -32,8 +32,6 @@ aiohttp==3.11.13 # -c requirements/../../../requirements/constraints.txt # -r requirements/_base.in # aiodocker -aiopg==1.4.0 - # via -r requirements/_base.in aiormq==6.8.1 # via aio-pika aiosignal==1.3.2 @@ -51,8 +49,6 @@ arrow==1.3.0 # -r requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in -async-timeout==4.0.3 - # via aiopg asyncpg==0.30.0 # via sqlalchemy attrs==25.1.0 @@ -160,9 +156,7 @@ opentelemetry-api==1.30.0 # opentelemetry-exporter-otlp-proto-grpc # opentelemetry-exporter-otlp-proto-http # opentelemetry-instrumentation - # opentelemetry-instrumentation-aiopg # opentelemetry-instrumentation-asyncpg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-logging # opentelemetry-instrumentation-redis # opentelemetry-instrumentation-requests @@ -180,18 +174,12 @@ opentelemetry-exporter-otlp-proto-http==1.30.0 # via opentelemetry-exporter-otlp opentelemetry-instrumentation==0.51b0 # via - # opentelemetry-instrumentation-aiopg # opentelemetry-instrumentation-asyncpg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-logging # opentelemetry-instrumentation-redis # opentelemetry-instrumentation-requests -opentelemetry-instrumentation-aiopg==0.51b0 - # via -r requirements/_base.in opentelemetry-instrumentation-asyncpg==0.51b0 # via -r requirements/../../../packages/postgres-database/requirements/_base.in -opentelemetry-instrumentation-dbapi==0.51b0 - # via opentelemetry-instrumentation-aiopg opentelemetry-instrumentation-logging==0.51b0 # via -r requirements/../../../packages/service-library/requirements/_base.in opentelemetry-instrumentation-redis==0.51b0 @@ -212,7 +200,6 @@ opentelemetry-semantic-conventions==0.51b0 # via # opentelemetry-instrumentation # opentelemetry-instrumentation-asyncpg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-redis # opentelemetry-instrumentation-requests # opentelemetry-sdk @@ -264,9 +251,7 @@ protobuf==5.29.3 psutil==7.0.0 # via -r requirements/../../../packages/service-library/requirements/_base.in psycopg2-binary==2.9.10 - # via - # aiopg - # sqlalchemy + # via sqlalchemy pycryptodome==3.21.0 # via stream-zip pydantic==2.10.6 @@ -429,7 +414,7 @@ sqlalchemy==1.4.54 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/postgres-database/requirements/_base.in - # aiopg + # -r requirements/_base.in # alembic stream-zip==0.0.83 # via -r requirements/../../../packages/service-library/requirements/_base.in @@ -484,8 +469,6 @@ wrapt==1.17.2 # via # deprecated # opentelemetry-instrumentation - # opentelemetry-instrumentation-aiopg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-redis yarl==1.18.3 # via diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py index ffcd384a9e47..73321e091278 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py @@ -1,107 +1,79 @@ import json import logging -import os -import socket -from typing import Any -import aiopg.sa import sqlalchemy as sa -import tenacity -from aiopg.sa.engine import Engine -from aiopg.sa.result import RowProxy from models_library.projects import ProjectID from models_library.users import UserID -from servicelib.common_aiopg_utils import DataSourceName, create_pg_engine -from servicelib.retry_policies import PostgresRetryPolicyUponInitialization +from pydantic import TypeAdapter +from servicelib.db_asyncpg_utils import create_async_engine_and_database_ready from settings_library.node_ports import NodePortsSettings from simcore_postgres_database.models.comp_tasks import comp_tasks from simcore_postgres_database.models.projects import projects -from simcore_postgres_database.utils_aiopg import ( - close_engine, - raise_if_migration_not_ready, -) -from sqlalchemy import and_ +from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine from .exceptions import NodeNotFound, ProjectNotFoundError -log = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) async def _get_node_from_db( - project_id: str, node_uuid: str, connection: aiopg.sa.SAConnection -) -> RowProxy: - log.debug( + project_id: str, node_uuid: str, connection: AsyncConnection +) -> sa.engine.Row: + _logger.debug( "Reading from comp_tasks table for node uuid %s, project %s", node_uuid, project_id, ) + rows_count = await connection.scalar( + sa.select(sa.func.count()) + .select_from(comp_tasks) + .where( + (comp_tasks.c.node_id == node_uuid) + & (comp_tasks.c.project_id == project_id), + ) + ) + if rows_count > 1: + _logger.error("the node id %s is not unique", node_uuid) result = await connection.execute( sa.select(comp_tasks).where( - and_( - comp_tasks.c.node_id == node_uuid, - comp_tasks.c.project_id == project_id, - ) + (comp_tasks.c.node_id == node_uuid) + & (comp_tasks.c.project_id == project_id) ) ) - if result.rowcount > 1: - log.error("the node id %s is not unique", node_uuid) - node: RowProxy | None = await result.first() + node = result.one_or_none() if not node: - log.error("the node id %s was not found", node_uuid) + _logger.error("the node id %s was not found", node_uuid) raise NodeNotFound(node_uuid) return node -@tenacity.retry(**PostgresRetryPolicyUponInitialization().kwargs) -async def _ensure_postgres_ready(dsn: DataSourceName) -> Engine: - engine: aiopg.sa.Engine = await create_pg_engine(dsn, minsize=1, maxsize=4) - try: - await raise_if_migration_not_ready(engine) - except Exception: - await close_engine(engine) - raise - return engine - - class DBContextManager: - def __init__(self, db_engine: aiopg.sa.Engine | None = None): - self._db_engine: aiopg.sa.Engine | None = db_engine + def __init__(self, db_engine: AsyncEngine | None = None) -> None: + self._db_engine: AsyncEngine | None = db_engine self._db_engine_created: bool = False @staticmethod - async def _create_db_engine() -> aiopg.sa.Engine: + async def _create_db_engine() -> AsyncEngine: settings = NodePortsSettings.create_from_envs() - dsn = DataSourceName( - application_name=f"{__name__}_{socket.gethostname()}_{os.getpid()}", - database=settings.POSTGRES_SETTINGS.POSTGRES_DB, - user=settings.POSTGRES_SETTINGS.POSTGRES_USER, - password=settings.POSTGRES_SETTINGS.POSTGRES_PASSWORD.get_secret_value(), - host=settings.POSTGRES_SETTINGS.POSTGRES_HOST, - port=settings.POSTGRES_SETTINGS.POSTGRES_PORT, + engine = await create_async_engine_and_database_ready( + settings.POSTGRES_SETTINGS ) - - engine: aiopg.sa.Engine = await _ensure_postgres_ready(dsn) + assert isinstance(engine, AsyncEngine) # nosec return engine - async def __aenter__(self): + async def __aenter__(self) -> AsyncEngine: if not self._db_engine: self._db_engine = await self._create_db_engine() self._db_engine_created = True return self._db_engine - async def __aexit__(self, exc_type, exc, tb): + async def __aexit__(self, exc_type, exc, tb) -> None: if self._db_engine and self._db_engine_created: - await close_engine(self._db_engine) - log.debug( - "engine '%s' after shutdown: closed=%s, size=%d", - self._db_engine.dsn, - self._db_engine.closed, - self._db_engine.size, - ) + await self._db_engine.dispose() class DBManager: - def __init__(self, db_engine: aiopg.sa.Engine | None = None): + def __init__(self, db_engine: AsyncEngine | None = None): self._db_engine = db_engine async def write_ports_configuration( @@ -111,20 +83,19 @@ async def write_ports_configuration( f"Writing port configuration to database for " f"project={project_id} node={node_uuid}: {json_configuration}" ) - log.debug(message) + _logger.debug(message) node_configuration = json.loads(json_configuration) - async with DBContextManager( - self._db_engine - ) as engine, engine.acquire() as connection: + async with ( + DBContextManager(self._db_engine) as engine, + engine.begin() as connection, + ): # update the necessary parts await connection.execute( comp_tasks.update() .where( - and_( - comp_tasks.c.node_id == node_uuid, - comp_tasks.c.project_id == project_id, - ) + (comp_tasks.c.node_id == node_uuid) + & (comp_tasks.c.project_id == project_id), ) .values( schema=node_configuration["schema"], @@ -137,13 +108,14 @@ async def write_ports_configuration( async def get_ports_configuration_from_node_uuid( self, project_id: str, node_uuid: str ) -> str: - log.debug( + _logger.debug( "Getting ports configuration of node %s from comp_tasks table", node_uuid ) - async with DBContextManager( - self._db_engine - ) as engine, engine.acquire() as connection: - node: RowProxy = await _get_node_from_db(project_id, node_uuid, connection) + async with ( + DBContextManager(self._db_engine) as engine, + engine.connect() as connection, + ): + node = await _get_node_from_db(project_id, node_uuid, connection) node_json_config = json.dumps( { "schema": node.schema, @@ -152,18 +124,19 @@ async def get_ports_configuration_from_node_uuid( "run_hash": node.run_hash, } ) - log.debug("Found and converted to json") + _logger.debug("Found and converted to json") return node_json_config async def get_project_owner_user_id(self, project_id: ProjectID) -> UserID: - async with DBContextManager( - self._db_engine - ) as engine, engine.acquire() as connection: - prj_owner: Any | None = await connection.scalar( + async with ( + DBContextManager(self._db_engine) as engine, + engine.connect() as connection, + ): + prj_owner = await connection.scalar( sa.select(projects.c.prj_owner).where( projects.c.uuid == f"{project_id}" ) ) if prj_owner is None: raise ProjectNotFoundError(project_id) - return UserID(prj_owner) + return TypeAdapter(UserID).validate_python(prj_owner) diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/database.py b/services/api-server/src/simcore_service_api_server/api/dependencies/database.py index b8f65c953294..186afa73bbe6 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/database.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/database.py @@ -5,7 +5,9 @@ from aiopg.sa import Engine from fastapi import Depends from fastapi.requests import Request +from sqlalchemy.ext.asyncio import AsyncEngine +from ...db.events import get_asyncpg_engine from ...db.repositories import BaseRepository logger = logging.getLogger(__name__) @@ -15,6 +17,10 @@ def get_db_engine(request: Request) -> Engine: return cast(Engine, request.app.state.engine) +def get_db_asyncpg_engine(request: Request) -> AsyncEngine: + return get_asyncpg_engine(request.app) + + def get_repository(repo_type: type[BaseRepository]) -> Callable: async def _get_repo( engine: Annotated[Engine, Depends(get_db_engine)], diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py index b86595f3d92f..a0e751fa2dcb 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py @@ -18,6 +18,7 @@ from pydantic.types import PositiveInt from servicelib.fastapi.requests_decorators import cancel_on_disconnect from servicelib.logging_utils import log_context +from sqlalchemy.ext.asyncio import AsyncEngine from ..._service_solvers import SolverService from ...exceptions.custom_errors import InsufficientCreditsError, MissingWalletError @@ -51,7 +52,7 @@ from ...services_http.storage import StorageApi, to_file_api_model from ..dependencies.application import get_reverse_url_mapper from ..dependencies.authentication import get_current_user_id, get_product_name -from ..dependencies.database import Engine, get_db_engine +from ..dependencies.database import get_db_asyncpg_engine from ..dependencies.rabbitmq import get_log_check_timeout, get_log_distributor from ..dependencies.services import get_api_client from ..dependencies.webserver_http import AuthSession, get_webserver_session @@ -262,7 +263,7 @@ async def get_job_outputs( version: VersionStr, job_id: JobID, user_id: Annotated[PositiveInt, Depends(get_current_user_id)], - db_engine: Annotated[Engine, Depends(get_db_engine)], + async_pg_engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)], webserver_api: Annotated[AuthSession, Depends(get_webserver_session)], storage_client: Annotated[StorageApi, Depends(get_api_client(StorageApi))], ): @@ -289,7 +290,7 @@ async def get_job_outputs( user_id=user_id, project_uuid=job_id, node_uuid=UUID(node_ids[0]), - db_engine=db_engine, + db_engine=async_pg_engine, ) results: dict[str, ArgumentTypes] = {} diff --git a/services/api-server/src/simcore_service_api_server/core/events.py b/services/api-server/src/simcore_service_api_server/core/events.py index d918d8469c8d..093351b92983 100644 --- a/services/api-server/src/simcore_service_api_server/core/events.py +++ b/services/api-server/src/simcore_service_api_server/core/events.py @@ -4,7 +4,13 @@ from fastapi import FastAPI from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG -from ..db.events import close_db_connection, connect_to_db +from ..db.events import ( + asyncpg_close_db_connection, + asyncpg_connect_to_db, + close_db_connection, + connect_to_db, +) +from .settings import ApplicationSettings logger = logging.getLogger(__name__) @@ -14,10 +20,13 @@ async def _on_startup() -> None: logger.info("Application starting ...") if app.state.settings.API_SERVER_POSTGRES: # database + assert isinstance(app.state.settings, ApplicationSettings) # nosec await connect_to_db(app) + await asyncpg_connect_to_db(app, app.state.settings.API_SERVER_POSTGRES) assert app.state.engine # nosec + assert app.state.asyncpg_engine # nosec - print(APP_STARTED_BANNER_MSG, flush=True) + print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201 return _on_startup @@ -27,7 +36,9 @@ async def _on_shutdown() -> None: logger.info("Application stopping, ...") if app.state.settings.API_SERVER_POSTGRES: + assert isinstance(app.state.settings, ApplicationSettings) # nosec try: + await asyncpg_close_db_connection(app) await close_db_connection(app) except Exception as err: # pylint: disable=broad-except @@ -38,6 +49,6 @@ async def _on_shutdown() -> None: stack_info=app.state.settings.debug, ) - print(APP_FINISHED_BANNER_MSG, flush=True) + print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201 return _on_shutdown diff --git a/services/api-server/src/simcore_service_api_server/db/events.py b/services/api-server/src/simcore_service_api_server/db/events.py index 25ee257596c2..9bd8328a8040 100644 --- a/services/api-server/src/simcore_service_api_server/db/events.py +++ b/services/api-server/src/simcore_service_api_server/db/events.py @@ -1,7 +1,10 @@ import logging +from typing import cast from aiopg.sa import Engine, create_engine from fastapi import FastAPI +from servicelib.db_asyncpg_utils import create_async_engine_and_database_ready +from servicelib.logging_utils import log_context from servicelib.retry_policies import PostgresRetryPolicyUponInitialization from settings_library.postgres import PostgresSettings from simcore_postgres_database.utils_aiopg import ( @@ -9,6 +12,8 @@ get_pg_engine_info, raise_if_migration_not_ready, ) +from simcore_postgres_database.utils_aiosqlalchemy import get_pg_engine_stateinfo +from sqlalchemy.ext.asyncio import AsyncEngine from tenacity import retry from .._meta import PROJECT_NAME @@ -53,3 +58,30 @@ async def close_db_connection(app: FastAPI) -> None: await close_engine(engine) logger.debug("Disconnected from %s", engine.dsn) + + +async def asyncpg_connect_to_db(app: FastAPI, settings: PostgresSettings) -> None: + with log_context( + logger, + logging.DEBUG, + f"Connecting and migraging {settings.dsn_with_async_sqlalchemy}", + ): + engine = await create_async_engine_and_database_ready(settings) + + app.state.asyncpg_engine = engine + logger.debug( + "Setup engine: %s", + await get_pg_engine_stateinfo(engine), + ) + + +async def asyncpg_close_db_connection(app: FastAPI) -> None: + with log_context( + logger, logging.DEBUG, f"db disconnect of {app.state.asyncpg_engine}" + ): + if engine := app.state.asyncpg_engine: + await engine.dispose() + + +def get_asyncpg_engine(app: FastAPI) -> AsyncEngine: + return cast(AsyncEngine, app.state.asyncpg_engine) diff --git a/services/api-server/src/simcore_service_api_server/services_http/solver_job_outputs.py b/services/api-server/src/simcore_service_api_server/services_http/solver_job_outputs.py index 5457a259f8c6..4554bc0ccc35 100644 --- a/services/api-server/src/simcore_service_api_server/services_http/solver_job_outputs.py +++ b/services/api-server/src/simcore_service_api_server/services_http/solver_job_outputs.py @@ -1,12 +1,12 @@ import logging from typing import Any, TypeAlias -import aiopg from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_nodes_io import BaseFileLink, NodeID, NodeIDStr from pydantic import StrictBool, StrictFloat, StrictInt, TypeAdapter from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_v2 import DBManager, Nodeports +from sqlalchemy.ext.asyncio import AsyncEngine from ..exceptions.backend_errors import SolverOutputNotFoundError @@ -19,7 +19,7 @@ async def get_solver_output_results( - user_id: int, project_uuid: ProjectID, node_uuid: NodeID, db_engine: aiopg.sa.Engine + user_id: int, project_uuid: ProjectID, node_uuid: NodeID, db_engine: AsyncEngine ) -> dict[str, ResultsTypes]: """ Wraps calls via node_ports to retrieve project's output diff --git a/services/api-server/tests/unit/conftest.py b/services/api-server/tests/unit/conftest.py index ef41c0331d68..3c1b33864a8b 100644 --- a/services/api-server/tests/unit/conftest.py +++ b/services/api-server/tests/unit/conftest.py @@ -218,6 +218,8 @@ def auth( engine.freesize = 3 engine.maxsize = 10 app.state.engine = engine + async_engine = mocker.MagicMock() + app.state.asyncpg_engine = async_engine # NOTE: here, instead of using the database, we patch repositories interface mocker.patch( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py index 28b9373cec43..92c92967890c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py @@ -33,6 +33,7 @@ from servicelib.logging_utils import log_catch, log_context from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient from servicelib.redis import RedisClientSDK +from sqlalchemy.ext.asyncio import AsyncEngine from ...constants import UNDEFINED_STR_METADATA from ...core.errors import ( @@ -159,6 +160,7 @@ async def _triage_changed_tasks( @dataclass class BaseCompScheduler(ABC): db_engine: Engine + asyncpg_db_engine: AsyncEngine rabbitmq_client: RabbitMQClient rabbitmq_rpc_client: RabbitMQRPCClient settings: ComputationalBackendSettings diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py index cc33c129f1b6..b46e849500bd 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py @@ -282,7 +282,7 @@ async def _process_task_result( if isinstance(result, TaskOutputData): # success! await parse_output_data( - self.db_engine, + self.asyncpg_db_engine, task.job_id, result, ) @@ -307,7 +307,7 @@ async def _process_task_result( simcore_platform_status = SimcorePlatformStatus.BAD # we need to remove any invalid files in the storage await clean_task_output_and_log_files_if_invalid( - self.db_engine, user_id, project_id, node_id + self.asyncpg_db_engine, user_id, project_id, node_id ) except TaskSchedulingError as err: task_final_state = RunningState.FAILED diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py index edda456f3031..3182a2d9ae5e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py @@ -6,7 +6,7 @@ from ...core.settings import AppSettings from ..dask_clients_pool import DaskClientsPool -from ..db import get_db_engine +from ..db import get_asyncpg_engine, get_db_engine from ..rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client from ..redis import get_redis_client_manager from ._scheduler_base import BaseCompScheduler @@ -27,5 +27,6 @@ def create_scheduler(app: FastAPI) -> BaseCompScheduler: rabbitmq_rpc_client=get_rabbitmq_rpc_client(app), redis_client=get_redis_client_manager(app).client(RedisDatabase.LOCKS), db_engine=get_db_engine(app), + asyncpg_db_engine=get_asyncpg_engine(app), service_runtime_heartbeat_interval=app_settings.SERVICE_TRACKING_HEARTBEAT, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index b7d593bf0931..7e9a9039bbb6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -359,7 +359,7 @@ async def send_computation_tasks( try: # This instance is created only once so it can be reused in calls below node_ports = await dask_utils.create_node_ports( - db_engine=self.app.state.engine, + db_engine=self.app.state.asyncpg_engine, user_id=user_id, project_id=project_id, node_id=node_id, @@ -439,14 +439,14 @@ async def get_tasks_status(self, job_ids: list[str]) -> list[DaskClientTaskState def _get_pipeline_statuses( dask_scheduler: distributed.Scheduler, ) -> dict[dask.typing.Key, DaskSchedulerTaskState | None]: - statuses: dict[ - dask.typing.Key, DaskSchedulerTaskState | None - ] = dask_scheduler.get_task_status(keys=job_ids) + statuses: dict[dask.typing.Key, DaskSchedulerTaskState | None] = ( + dask_scheduler.get_task_status(keys=job_ids) + ) return statuses - task_statuses: dict[ - dask.typing.Key, DaskSchedulerTaskState | None - ] = await self.backend.client.run_on_scheduler(_get_pipeline_statuses) + task_statuses: dict[dask.typing.Key, DaskSchedulerTaskState | None] = ( + await self.backend.client.run_on_scheduler(_get_pipeline_statuses) + ) assert isinstance(task_statuses, dict) # nosec _logger.debug("found dask task statuses: %s", f"{task_statuses=}") @@ -578,10 +578,10 @@ def _get_worker_used_resources( with log_catch(_logger, reraise=False): # NOTE: this runs directly on the dask-scheduler and may rise exceptions - used_resources_per_worker: dict[ - str, dict[str, Any] - ] = await dask_utils.wrap_client_async_routine( - self.backend.client.run_on_scheduler(_get_worker_used_resources) + used_resources_per_worker: dict[str, dict[str, Any]] = ( + await dask_utils.wrap_client_async_routine( + self.backend.client.run_on_scheduler(_get_worker_used_resources) + ) ) # let's update the scheduler info, with default to 0s since sometimes diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index c2ab4781cc7a..6c02fd37d752 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -6,7 +6,6 @@ from uuid import uuid4 import distributed -from aiopg.sa.engine import Engine from common_library.json_serialization import json_dumps from dask_task_models_library.container_tasks.io import ( FileUrl, @@ -38,6 +37,7 @@ ) from simcore_sdk.node_ports_v2 import FileLinkType, Port, links, port_utils from simcore_sdk.node_ports_v2.links import ItemValue as _NPItemValue +from sqlalchemy.ext.asyncio import AsyncEngine from ..constants import UNDEFINED_DOCKER_LABEL from ..core.errors import ( @@ -106,7 +106,10 @@ def parse_dask_job_id( async def create_node_ports( - db_engine: Engine, user_id: UserID, project_id: ProjectID, node_id: NodeID + db_engine: AsyncEngine, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, ) -> node_ports_v2.Nodeports: """ This function create a nodeports object by fetching the node state from the database @@ -135,7 +138,7 @@ async def create_node_ports( async def parse_output_data( - db_engine: Engine, + db_engine: AsyncEngine, job_id: str, data: TaskOutputData, ports: node_ports_v2.Nodeports | None = None, @@ -402,7 +405,7 @@ async def get_service_log_file_download_link( async def clean_task_output_and_log_files_if_invalid( - db_engine: Engine, + db_engine: AsyncEngine, user_id: UserID, project_id: ProjectID, node_id: NodeID, diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 9783320ea2f5..ca022c2a5a1c 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -16,7 +16,6 @@ import aioboto3 import aiodocker -import aiopg.sa import httpx import pytest import sqlalchemy as sa @@ -79,6 +78,7 @@ from simcore_service_director_v2.core.settings import AppSettings from simcore_service_director_v2.modules import storage as dv2_modules_storage from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncEngine from tenacity import TryAgain from tenacity.asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -293,8 +293,8 @@ def workbench_dynamic_services( @pytest.fixture -async def db_manager(aiopg_engine: aiopg.sa.engine.Engine) -> DBManager: - return DBManager(aiopg_engine) +async def db_manager(sqlalchemy_async_engine: AsyncEngine) -> DBManager: + return DBManager(sqlalchemy_async_engine) def _is_docker_r_clone_plugin_installed() -> bool: diff --git a/services/director-v2/tests/unit/_helpers.py b/services/director-v2/tests/unit/_helpers.py index 45632d0454a2..aba737fe932f 100644 --- a/services/director-v2/tests/unit/_helpers.py +++ b/services/director-v2/tests/unit/_helpers.py @@ -1,8 +1,7 @@ +from collections.abc import Callable from dataclasses import dataclass -from typing import Any, Callable +from typing import Any -import aiopg -import aiopg.sa import sqlalchemy as sa from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID @@ -31,12 +30,12 @@ class RunningProject(PublishedProject): async def set_comp_task_outputs( - aiopg_engine: aiopg.sa.engine.Engine, + sqlalchemy_async_engine: AsyncEngine, node_id: NodeID, outputs_schema: dict[str, Any], outputs: dict[str, Any], ) -> None: - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute( comp_tasks.update() .where(comp_tasks.c.node_id == f"{node_id}") @@ -45,12 +44,12 @@ async def set_comp_task_outputs( async def set_comp_task_inputs( - aiopg_engine: aiopg.sa.engine.Engine, + sqlalchemy_async_engine: AsyncEngine, node_id: NodeID, inputs_schema: dict[str, Any], inputs: dict[str, Any], ) -> None: - async with aiopg_engine.acquire() as conn: + async with sqlalchemy_async_engine.begin() as conn: await conn.execute( comp_tasks.update() .where(comp_tasks.c.node_id == f"{node_id}") diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 7a6ab8e439de..f9c748633197 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -48,6 +48,7 @@ def disable_postgres(mocker) -> None: def mock_setup(app: FastAPI, *args, **kwargs) -> None: app.state.engine = mock.AsyncMock() + app.state.asyncpg_engine = mock.AsyncMock() mocker.patch("simcore_service_director_v2.modules.db.setup", side_effect=mock_setup) @@ -152,7 +153,7 @@ def mock_service_inspect( @pytest.fixture def scheduler_data_from_service_inspect( - mock_service_inspect: Mapping[str, Any] + mock_service_inspect: Mapping[str, Any], ) -> SchedulerData: return SchedulerData.from_service_inspect(mock_service_inspect) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 2527a4cbac6f..15937ff5871c 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -189,7 +189,7 @@ async def dask_client( }[request.param]() try: - assert client.app.state.engine is not None + assert client.app.state.asyncpg_engine is not None # check we can run some simple python script def _square(x): @@ -207,8 +207,8 @@ def neg(x): result = await future assert result == -285 except AttributeError: - # enforces existance of 'app.state.engine' and sets to None - client.app.state.engine = None + # enforces existance of 'app.state.asyncpg_engine' and sets to None + client.app.state.asyncpg_engine = None return client @@ -390,7 +390,9 @@ def fct_that_raise_exception(): ) # type: ignore assert task_exception assert isinstance(task_exception, exc) - task_traceback = await future.traceback(timeout=_ALLOW_TIME_FOR_GATEWAY_TO_CREATE_WORKERS) # type: ignore + task_traceback = await future.traceback( + timeout=_ALLOW_TIME_FOR_GATEWAY_TO_CREATE_WORKERS + ) # type: ignore assert task_traceback trace = traceback.format_exception(task_exception) assert trace @@ -625,7 +627,9 @@ def fake_sidecar_fct( assert published_computation_task[0].job_id in list_of_persisted_datasets assert list_of_persisted_datasets[0] == published_computation_task[0].job_id # get the persisted future from the scheduler back - task_future = await dask_client.backend.client.get_dataset(name=published_computation_task[0].job_id) # type: ignore + task_future = await dask_client.backend.client.get_dataset( + name=published_computation_task[0].job_id + ) # type: ignore assert task_future assert isinstance(task_future, distributed.Future) assert task_future.key == published_computation_task[0].job_id diff --git a/services/director-v2/tests/unit/test_modules_osparc_variables.py b/services/director-v2/tests/unit/test_modules_osparc_variables.py index a28696f7b183..61a2996797f7 100644 --- a/services/director-v2/tests/unit/test_modules_osparc_variables.py +++ b/services/director-v2/tests/unit/test_modules_osparc_variables.py @@ -280,7 +280,6 @@ def compose_spec(): def test_auto_inject_environments_added_to_all_services_in_compose( compose_spec: ComposeSpecLabelDict, ): - before = deepcopy(compose_spec) after = auto_inject_environments(compose_spec) @@ -290,7 +289,6 @@ def test_auto_inject_environments_added_to_all_services_in_compose( auto_injected_envs = set(_NEW_ENVIRONMENTS.keys()) for name, service in compose_spec.get("services", {}).items(): - # all services have environment specs assert service["environment"], f"expected in {name} service" diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py index 7c991d3390da..a3a509e54530 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py @@ -13,8 +13,6 @@ from typing import Any from unittest import mock -import aiopg -import aiopg.sa import pytest from _helpers import PublishedProject, set_comp_task_inputs, set_comp_task_outputs from dask_task_models_library.container_tasks.io import ( @@ -63,6 +61,7 @@ parse_dask_job_id, parse_output_data, ) +from sqlalchemy.ext.asyncio import AsyncEngine from yarl import URL pytest_simcore_core_services_selection = [ @@ -243,7 +242,7 @@ def fake_task_output_data( async def test_parse_output_data( - aiopg_engine: aiopg.sa.engine.Engine, + sqlalchemy_async_engine: AsyncEngine, published_project: PublishedProject, user_id: UserID, fake_io_schema: dict[str, dict[str, str]], @@ -254,7 +253,7 @@ async def test_parse_output_data( sleeper_task: CompTaskAtDB = published_project.tasks[1] no_outputs = {} await set_comp_task_outputs( - aiopg_engine, sleeper_task.node_id, fake_io_schema, no_outputs + sqlalchemy_async_engine, sleeper_task.node_id, fake_io_schema, no_outputs ) # mock the set_value function so we can test it is called correctly mocked_node_ports_set_value_fct = mocker.patch( @@ -269,7 +268,7 @@ async def test_parse_output_data( published_project.project.uuid, sleeper_task.node_id, ) - await parse_output_data(aiopg_engine, dask_job_id, fake_task_output_data) + await parse_output_data(sqlalchemy_async_engine, dask_job_id, fake_task_output_data) # the FileUrl types are converted to a pure url expected_values = { @@ -298,7 +297,7 @@ def _app_config_with_db( async def test_compute_input_data( _app_config_with_db: None, - aiopg_engine: aiopg.sa.engine.Engine, + sqlalchemy_async_engine: AsyncEngine, initialized_app: FastAPI, user_id: UserID, published_project: PublishedProject, @@ -328,7 +327,7 @@ async def test_compute_input_data( for key, value_type in fake_io_schema.items() } await set_comp_task_inputs( - aiopg_engine, sleeper_task.node_id, fake_io_schema, fake_inputs + sqlalchemy_async_engine, sleeper_task.node_id, fake_io_schema, fake_inputs ) # mock the get_value function so we can test it is called correctly @@ -347,7 +346,7 @@ def return_fake_input_value(*args, **kwargs): side_effect=return_fake_input_value(), ) node_ports = await create_node_ports( - db_engine=initialized_app.state.engine, + db_engine=sqlalchemy_async_engine, user_id=user_id, project_id=published_project.project.uuid, node_id=sleeper_task.node_id, @@ -376,7 +375,7 @@ def tasks_file_link_scheme(tasks_file_link_type: FileLinkType) -> tuple: async def test_compute_output_data_schema( _app_config_with_db: None, - aiopg_engine: aiopg.sa.engine.Engine, + sqlalchemy_async_engine: AsyncEngine, initialized_app: FastAPI, user_id: UserID, published_project: PublishedProject, @@ -389,11 +388,11 @@ async def test_compute_output_data_schema( # simulate pre-created file links no_outputs = {} await set_comp_task_outputs( - aiopg_engine, sleeper_task.node_id, fake_io_schema, no_outputs + sqlalchemy_async_engine, sleeper_task.node_id, fake_io_schema, no_outputs ) node_ports = await create_node_ports( - db_engine=initialized_app.state.engine, + db_engine=sqlalchemy_async_engine, user_id=user_id, project_id=published_project.project.uuid, node_id=sleeper_task.node_id, @@ -425,7 +424,7 @@ async def test_compute_output_data_schema( @pytest.mark.parametrize("entry_exists_returns", [True, False]) async def test_clean_task_output_and_log_files_if_invalid( - aiopg_engine: aiopg.sa.engine.Engine, + sqlalchemy_async_engine: AsyncEngine, user_id: UserID, published_project: PublishedProject, mocked_node_ports_filemanager_fcts: dict[str, mock.MagicMock], @@ -438,9 +437,9 @@ async def test_clean_task_output_and_log_files_if_invalid( # BEFORE the task is actually run. In case there is a failure at running # the task, these entries shall be cleaned up. The way to check this is # by asking storage if these file really exist. If not they get deleted. - mocked_node_ports_filemanager_fcts[ - "entry_exists" - ].return_value = entry_exists_returns + mocked_node_ports_filemanager_fcts["entry_exists"].return_value = ( + entry_exists_returns + ) sleeper_task = published_project.tasks[1] @@ -456,11 +455,11 @@ async def test_clean_task_output_and_log_files_if_invalid( if value_type["type"] == "data:*/*" } await set_comp_task_outputs( - aiopg_engine, sleeper_task.node_id, fake_io_schema, fake_outputs + sqlalchemy_async_engine, sleeper_task.node_id, fake_io_schema, fake_outputs ) # this should ask for the 2 files + the log file await clean_task_output_and_log_files_if_invalid( - aiopg_engine, + sqlalchemy_async_engine, user_id, published_project.project.uuid, published_project.tasks[1].node_id, @@ -500,7 +499,7 @@ def _add_is_directory(entry: mock._Call) -> mock._Call: # noqa: SLF001 "req_example", NodeRequirements.model_config["json_schema_extra"]["examples"] ) def test_node_requirements_correctly_convert_to_dask_resources( - req_example: dict[str, Any] + req_example: dict[str, Any], ): node_reqs = NodeRequirements(**req_example) assert node_reqs diff --git a/services/dynamic-sidecar/requirements/_base.txt b/services/dynamic-sidecar/requirements/_base.txt index 493485a9d6f0..5ff8a3fc5ea6 100644 --- a/services/dynamic-sidecar/requirements/_base.txt +++ b/services/dynamic-sidecar/requirements/_base.txt @@ -57,8 +57,6 @@ aiohttp==3.11.13 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/simcore-sdk/requirements/_base.in # aiodocker -aiopg==1.4.0 - # via -r requirements/../../../packages/simcore-sdk/requirements/_base.in aioprocessing==2.0.1 # via -r requirements/_base.in aiormq==6.8.1 @@ -88,8 +86,6 @@ arrow==1.3.0 # -r requirements/_base.in asgiref==3.8.1 # via opentelemetry-instrumentation-asgi -async-timeout==4.0.3 - # via aiopg asyncpg==0.30.0 # via sqlalchemy attrs==25.1.0 @@ -283,10 +279,8 @@ opentelemetry-api==1.30.0 # opentelemetry-exporter-otlp-proto-grpc # opentelemetry-exporter-otlp-proto-http # opentelemetry-instrumentation - # opentelemetry-instrumentation-aiopg # opentelemetry-instrumentation-asgi # opentelemetry-instrumentation-asyncpg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-fastapi # opentelemetry-instrumentation-httpx # opentelemetry-instrumentation-logging @@ -308,25 +302,19 @@ opentelemetry-exporter-otlp-proto-http==1.30.0 # via opentelemetry-exporter-otlp opentelemetry-instrumentation==0.51b0 # via - # opentelemetry-instrumentation-aiopg # opentelemetry-instrumentation-asgi # opentelemetry-instrumentation-asyncpg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-fastapi # opentelemetry-instrumentation-httpx # opentelemetry-instrumentation-logging # opentelemetry-instrumentation-redis # opentelemetry-instrumentation-requests -opentelemetry-instrumentation-aiopg==0.51b0 - # via -r requirements/../../../packages/simcore-sdk/requirements/_base.in opentelemetry-instrumentation-asgi==0.51b0 # via opentelemetry-instrumentation-fastapi opentelemetry-instrumentation-asyncpg==0.51b0 # via # -r requirements/../../../packages/postgres-database/requirements/_base.in # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/postgres-database/requirements/_base.in -opentelemetry-instrumentation-dbapi==0.51b0 - # via opentelemetry-instrumentation-aiopg opentelemetry-instrumentation-fastapi==0.51b0 # via -r requirements/../../../packages/service-library/requirements/_fastapi.in opentelemetry-instrumentation-httpx==0.51b0 @@ -359,7 +347,6 @@ opentelemetry-semantic-conventions==0.51b0 # opentelemetry-instrumentation # opentelemetry-instrumentation-asgi # opentelemetry-instrumentation-asyncpg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-fastapi # opentelemetry-instrumentation-httpx # opentelemetry-instrumentation-redis @@ -449,9 +436,7 @@ psutil==7.0.0 # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in # -r requirements/_base.in psycopg2-binary==2.9.10 - # via - # aiopg - # sqlalchemy + # via sqlalchemy pycryptodome==3.21.0 # via stream-zip pydantic==2.10.6 @@ -742,7 +727,7 @@ sqlalchemy==1.4.54 # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/postgres-database/requirements/_base.in # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/postgres-database/requirements/_base.in - # aiopg + # -r requirements/../../../packages/simcore-sdk/requirements/_base.in # alembic starlette==0.46.0 # via @@ -860,8 +845,6 @@ wrapt==1.17.2 # via # deprecated # opentelemetry-instrumentation - # opentelemetry-instrumentation-aiopg - # opentelemetry-instrumentation-dbapi # opentelemetry-instrumentation-httpx # opentelemetry-instrumentation-redis wsproto==1.2.0 diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index 22e8c4729ede..ddb26e0e3a7a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -137,8 +137,9 @@ def create_base_app() -> FastAPI: logger.debug(settings.model_dump_json(indent=2)) # minimal + assert settings.SC_BOOT_MODE # nosec app = FastAPI( - debug=settings.SC_BOOT_MODE.is_devel_mode(), # pylint: disable=no-member + debug=settings.SC_BOOT_MODE.is_devel_mode(), title=PROJECT_NAME, description=SUMMARY, version=API_VERSION, @@ -199,7 +200,8 @@ def create_app(): # ERROR HANDLERS ------------ app.add_exception_handler( - NodeNotFound, node_not_found_error_handler # type: ignore[arg-type] + NodeNotFound, + node_not_found_error_handler, # type: ignore[arg-type] ) app.add_exception_handler(BaseDynamicSidecarError, http_error_handler) # type: ignore[arg-type] diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/external_dependencies.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/external_dependencies.py index 278f29e7ad19..d10d1ce58c57 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/external_dependencies.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/external_dependencies.py @@ -2,7 +2,7 @@ from fastapi import FastAPI from servicelib.utils import logged_gather -from .postgres import wait_for_postgres_liveness +from ..modules.database import wait_for_database_liveness from .rabbitmq import wait_for_rabbitmq_liveness from .registry import wait_for_registries_liveness from .storage import wait_for_storage_liveness @@ -23,7 +23,7 @@ def setup_check_dependencies(app: FastAPI) -> None: async def on_startup() -> None: liveliness_results = await logged_gather( *[ - wait_for_postgres_liveness(app), + wait_for_database_liveness(app), wait_for_rabbitmq_liveness(app), wait_for_registries_liveness(app), wait_for_storage_liveness(app), diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/postgres.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/postgres.py deleted file mode 100644 index 813f0729842e..000000000000 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/postgres.py +++ /dev/null @@ -1,18 +0,0 @@ -from fastapi import FastAPI -from servicelib.common_aiopg_utils import is_postgres_responsive_async -from settings_library.postgres import PostgresSettings - -from ..modules.service_liveness import wait_for_service_liveness -from .settings import ApplicationSettings - - -async def wait_for_postgres_liveness(app: FastAPI) -> None: - app_settings: ApplicationSettings = app.state.settings - postgres_settings: PostgresSettings = app_settings.POSTGRES_SETTINGS - - await wait_for_service_liveness( - is_postgres_responsive_async, - service_name="Postgres", - endpoint=postgres_settings.dsn, - dsn=postgres_settings.dsn, - ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/database.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/database.py new file mode 100644 index 000000000000..a1ccfb9805c0 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/database.py @@ -0,0 +1,26 @@ +from fastapi import FastAPI +from servicelib.db_asyncpg_utils import check_postgres_liveness, with_async_pg_engine +from settings_library.postgres import PostgresSettings + +from ..core.settings import ApplicationSettings +from .service_liveness import ( + wait_for_service_liveness, +) + + +async def wait_for_database_liveness(app: FastAPI) -> None: + """ + Checks if the postgres engine is alive and can be used. + """ + + app_settings = app.state.settings + assert isinstance(app_settings, ApplicationSettings) # nosec + postgres_settings = app_settings.POSTGRES_SETTINGS + assert isinstance(postgres_settings, PostgresSettings) # nosec + async with with_async_pg_engine(postgres_settings) as engine: + await wait_for_service_liveness( + check_postgres_liveness, + engine, + service_name="Postgres", + endpoint=postgres_settings.dsn, + ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py index 0d010794e235..3942e23b1845 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py @@ -40,7 +40,7 @@ async def save_preferences( service_key=service_key, service_version=service_version, value=dir_content ) - async with DBContextManager() as engine, engine.acquire() as conn: + async with DBContextManager() as engine, engine.begin() as conn: await UserServicesUserPreferencesRepo.save( conn, user_id=user_id, @@ -61,7 +61,7 @@ async def load_preferences( ) -> None: preference_class = get_model_class(service_key) - async with DBContextManager() as engine, engine.acquire() as conn: + async with DBContextManager() as engine, engine.connect() as conn: payload = await UserServicesUserPreferencesRepo.load( conn, user_id=user_id, diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index 5ad10622acd9..b0cf6b67413e 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -8,7 +8,7 @@ import json import logging import sys -from collections.abc import AsyncIterable, Iterable, Iterator +from collections.abc import Iterable, Iterator from pathlib import Path from unittest.mock import AsyncMock @@ -40,6 +40,7 @@ "pytest_simcore.docker_swarm", "pytest_simcore.faker_users_data", "pytest_simcore.minio_service", + "pytest_simcore.postgres_service", "pytest_simcore.pytest_global_environs", "pytest_simcore.rabbit_service", "pytest_simcore.redis_service", @@ -152,7 +153,7 @@ def mock_storage_check(mocker: MockerFixture) -> None: @pytest.fixture def mock_postgres_check(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_dynamic_sidecar.core.external_dependencies.wait_for_postgres_liveness", + "simcore_service_dynamic_sidecar.core.external_dependencies.wait_for_database_liveness", ) @@ -358,7 +359,7 @@ def mock_metrics_params(faker: Faker) -> CreateServiceMetricsAdditionalParams: @pytest.fixture -def cleanup_reserved_disk_space() -> AsyncIterable[None]: +def cleanup_reserved_disk_space() -> Iterator[None]: remove_reserved_disk_space() yield remove_reserved_disk_space() diff --git a/services/dynamic-sidecar/tests/unit/conftest.py b/services/dynamic-sidecar/tests/unit/conftest.py index 47488a06e48e..75b9d316c103 100644 --- a/services/dynamic-sidecar/tests/unit/conftest.py +++ b/services/dynamic-sidecar/tests/unit/conftest.py @@ -72,7 +72,7 @@ async def test_client( @pytest.fixture async def ensure_external_volumes( app: FastAPI, -) -> AsyncIterator[tuple[DockerVolume]]: +) -> AsyncIterator[tuple[DockerVolume, ...]]: """ensures inputs and outputs volumes for the service are present Emulates creation of volumes by the directorv2 when it spawns the dynamic-sidecar service diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py index 27ec615b6319..182743dca578 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers.py @@ -5,10 +5,10 @@ import asyncio import json -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, AsyncIterator from inspect import signature from pathlib import Path -from typing import Any, AsyncIterator, Final +from typing import Any, Final from unittest.mock import AsyncMock, Mock from uuid import uuid4 @@ -135,7 +135,6 @@ async def _start_containers( async def _docker_ps_a_container_names() -> list[str]: - # TODO: replace with aiodocker this is legacy by now command = 'docker ps -a --format "{{.Names}}"' success, stdout, *_ = await async_command(command=command, timeout=None) @@ -163,7 +162,9 @@ async def _assert_compose_spec_pulled(compose_spec: str, settings: ApplicationSe @pytest.fixture -def mock_environment(mock_rabbitmq_envs: EnvVarsDict) -> EnvVarsDict: +def mock_environment( + mock_environment: EnvVarsDict, mock_rabbitmq_envs: EnvVarsDict +) -> EnvVarsDict: return mock_rabbitmq_envs @@ -174,12 +175,12 @@ def app(app: FastAPI) -> FastAPI: @pytest.fixture -def test_client( +async def test_client( ensure_shared_store_dir: Path, - ensure_run_in_sequence_context_is_empty: None, ensure_external_volumes: tuple[DockerVolume], - cleanup_containers: AsyncIterator[None], test_client: TestClient, + cleanup_containers: AsyncIterator[None], + ensure_run_in_sequence_context_is_empty: None, ) -> TestClient: """creates external volumes and provides a client to dy-sidecar service""" return test_client @@ -451,7 +452,6 @@ def _expected_error_string(status_code: int) -> dict[str, Any]: } for container in started_containers: - # inspect container response = await test_client.get(f"/{API_VTAG}/containers/{container}") assert response.status_code == mock_aiodocker_containers_get, response.text @@ -702,9 +702,9 @@ def define_inactivity_command( @pytest.fixture def mock_shared_store(app: FastAPI) -> None: shared_store: SharedStore = app.state.shared_store - shared_store.original_to_container_names[ + shared_store.original_to_container_names["mock_container_name"] = ( "mock_container_name" - ] = "mock_container_name" + ) async def test_containers_activity_command_failed( diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py index 75bc03dab744..9d8fdbbb2d64 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py @@ -389,9 +389,11 @@ async def test_create_containers_task( mock_metrics_params: CreateServiceMetricsAdditionalParams, shared_store: SharedStore, ) -> None: - last_progress_message: tuple[str, float] | None = None + last_progress_message: tuple[str, ProgressPercent | None] | None = None - async def create_progress(message: str, percent: float, _: TaskId) -> None: + async def create_progress( + message: str, percent: ProgressPercent | None, _: TaskId + ) -> None: nonlocal last_progress_message last_progress_message = (message, percent) print(message, percent) diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_health.py b/services/dynamic-sidecar/tests/unit/test_api_rest_health.py index b7dca61cf637..c94d81b8db7f 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_health.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_health.py @@ -1,7 +1,6 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -import pytest from async_asgi_testclient import TestClient from fastapi import status from simcore_service_dynamic_sidecar.models.schemas.application_health import ( @@ -9,11 +8,6 @@ ) -@pytest.fixture -def test_client(test_client: TestClient) -> TestClient: - return test_client - - async def test_is_healthy(test_client: TestClient) -> None: test_client.application.state.application_health.is_healthy = True response = await test_client.get("/health") diff --git a/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py b/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py index 1383f1654162..8baea6080148 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rpc__disk_usage.py @@ -2,8 +2,7 @@ # pylint:disable=redefined-outer-name # pylint:disable=unused-argument -from collections.abc import Awaitable, Callable -from typing import AsyncIterable +from collections.abc import AsyncIterable, Awaitable, Callable from unittest.mock import AsyncMock import pytest diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py index bf647faa5e4d..e39c908dbc90 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py @@ -97,10 +97,10 @@ async def test__get_containers_inspect_from_names( started_services: None, container_names: list[str], faker: Faker ): MISSING_CONTAINER_NAME = f"missing-container-{faker.uuid4()}" - container_details: dict[ - str, DockerContainer | None - ] = await _get_containers_inspect_from_names( - [*container_names, MISSING_CONTAINER_NAME] + container_details: dict[str, DockerContainer | None] = ( + await _get_containers_inspect_from_names( + [*container_names, MISSING_CONTAINER_NAME] + ) ) # containers which do not exist always return None assert MISSING_CONTAINER_NAME in container_details diff --git a/services/dynamic-sidecar/tests/unit/test_core_registry.py b/services/dynamic-sidecar/tests/unit/test_core_registry.py index c253d2762212..32e71fe2b173 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_registry.py +++ b/services/dynamic-sidecar/tests/unit/test_core_registry.py @@ -77,7 +77,9 @@ def mock_registry_settings_with_auth( monkeypatch.setenv( "DY_DEPLOYMENT_REGISTRY_SETTINGS", _get_registry_config( - url=docker_registry, user="testuser", password="testpassword" # noqa: S106 + url=docker_registry, + user="testuser", + password="testpassword", # noqa: S106 ), ) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py index e8e00ac9c316..ff5ccfccee66 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -38,7 +38,6 @@ from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.utils import logged_gather from settings_library.rabbit import RabbitSettings -from simcore_service_dynamic_sidecar.core.application import create_app from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings from simcore_service_dynamic_sidecar.modules.notifications import ( PortNotifier, @@ -80,10 +79,10 @@ def mock_environment( @pytest.fixture async def app( + app: FastAPI, mock_environment: EnvVarsDict, mock_registry_service: AsyncMock, mock_storage_check: None, - mock_postgres_check: None, mocker: MockerFixture, ) -> AsyncIterable[FastAPI]: mocker.patch( @@ -91,7 +90,6 @@ async def app( return_value=[], ) - app: FastAPI = create_app() async with LifespanManager(app): yield app diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py index 52c573bc86c2..16b57b218c95 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py @@ -2,8 +2,8 @@ # pylint:disable=unused-argument import asyncio +from collections.abc import AsyncIterator from pathlib import Path -from typing import AsyncIterator from unittest.mock import AsyncMock import pytest diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py index f4e5c2dd18c2..fe106cd55c89 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py @@ -4,10 +4,10 @@ import asyncio import inspect +from collections.abc import AsyncIterator, Iterator from dataclasses import dataclass from inspect import FullArgSpec from pathlib import Path -from typing import AsyncIterator, Iterator from unittest.mock import AsyncMock import pytest diff --git a/services/storage/src/simcore_service_storage/modules/db/__init__.py b/services/storage/src/simcore_service_storage/modules/db/__init__.py index 41372f5a2c27..4fb5dacb2a7c 100644 --- a/services/storage/src/simcore_service_storage/modules/db/__init__.py +++ b/services/storage/src/simcore_service_storage/modules/db/__init__.py @@ -1,8 +1,7 @@ import logging from fastapi import FastAPI -from servicelib.db_async_engine import close_db_connection -from servicelib.fastapi.db_asyncpg_engine import connect_to_db +from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db from servicelib.retry_policies import PostgresRetryPolicyUponInitialization from sqlalchemy.ext.asyncio import AsyncEngine from tenacity import retry diff --git a/services/web/server/src/simcore_service_webserver/users/_preferences_repository.py b/services/web/server/src/simcore_service_webserver/users/_preferences_repository.py index e64ce5e579b9..316da7534bc6 100644 --- a/services/web/server/src/simcore_service_webserver/users/_preferences_repository.py +++ b/services/web/server/src/simcore_service_webserver/users/_preferences_repository.py @@ -4,7 +4,7 @@ from models_library.users import UserID from simcore_postgres_database.utils_user_preferences import FrontendUserPreferencesRepo -from ..db.plugin import get_database_engine +from ..db.plugin import get_asyncpg_engine def _get_user_preference_name(user_id: UserID, preference_name: PreferenceName) -> str: @@ -18,7 +18,7 @@ async def get_user_preference( product_name: ProductName, preference_class: type[FrontendUserPreference], ) -> FrontendUserPreference | None: - async with get_database_engine(app).acquire() as conn: + async with get_asyncpg_engine(app).connect() as conn: preference_payload: dict | None = await FrontendUserPreferencesRepo.load( conn, user_id=user_id, @@ -42,7 +42,7 @@ async def set_user_preference( product_name: ProductName, preference: FrontendUserPreference, ) -> None: - async with get_database_engine(app).acquire() as conn: + async with get_asyncpg_engine(app).begin() as conn: await FrontendUserPreferencesRepo.save( conn, user_id=user_id,