From db536f657b848ca464fee8f6223b1cc38b0ebf63 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 15:08:47 +0200 Subject: [PATCH 01/12] migrates _methods_api --- .../payments/_methods_api.py | 8 +- .../payments/_methods_db.py | 90 ++++++++++--------- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/payments/_methods_api.py b/services/web/server/src/simcore_service_webserver/payments/_methods_api.py index a5fefc9ebd2a..c8be49aa9bb3 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_methods_api.py +++ b/services/web/server/src/simcore_service_webserver/payments/_methods_api.py @@ -25,7 +25,7 @@ from . import _rpc from ._autorecharge_db import get_wallet_autorecharge from ._methods_db import ( - PaymentsMethodsDB, + PaymentsMethodsGetDB, delete_payment_method, get_successful_payment_method, insert_init_payment_method, @@ -52,7 +52,7 @@ def _get_payment_methods_from_fake_gateway(fake: Faker): def _to_api_model( - entry: PaymentsMethodsDB, payment_method_details_from_gateway: dict[str, Any] + entry: PaymentsMethodsGetDB, payment_method_details_from_gateway: dict[str, Any] ) -> PaymentMethodGet: assert entry.completed_at # nosec @@ -107,12 +107,12 @@ async def _ack_creation_of_wallet_payment_method( payment_method_id: PaymentMethodID, completion_state: InitPromptAckFlowState, message: str | None = None, -) -> PaymentsMethodsDB: +) -> PaymentsMethodsGetDB: """Acks as completed (i.e. SUCCESSFUL, FAILED, CANCELED )""" assert completion_state != InitPromptAckFlowState.PENDING # nosec # annotate - updated: PaymentsMethodsDB = await udpate_payment_method( + updated: PaymentsMethodsGetDB = await udpate_payment_method( app, payment_method_id=payment_method_id, state=completion_state, diff --git a/services/web/server/src/simcore_service_webserver/payments/_methods_db.py b/services/web/server/src/simcore_service_webserver/payments/_methods_db.py index 745c0e7caac2..33380a29fa61 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_methods_db.py +++ b/services/web/server/src/simcore_service_webserver/payments/_methods_db.py @@ -1,10 +1,9 @@ import datetime import logging -import simcore_postgres_database.aiopg_errors as db_errors import sqlalchemy as sa +import sqlalchemy.exc from aiohttp import web -from aiopg.sa.result import ResultProxy from models_library.api_schemas_webserver.wallets import PaymentMethodID from models_library.users import UserID from models_library.wallets import WalletID @@ -13,10 +12,13 @@ InitPromptAckFlowState, payments_methods, ) -from sqlalchemy import literal_column -from sqlalchemy.sql import func +from simcore_postgres_database.utils_repos import ( + pass_or_acquire_connection, + transaction_context, +) +from sqlalchemy.ext.asyncio import AsyncConnection -from ..db.plugin import get_database_engine_legacy +from ..db.plugin import get_asyncpg_engine from .errors import ( PaymentMethodAlreadyAckedError, PaymentMethodNotFoundError, @@ -26,7 +28,7 @@ _logger = logging.getLogger(__name__) -class PaymentsMethodsDB(BaseModel): +class PaymentsMethodsGetDB(BaseModel): payment_method_id: PaymentMethodID user_id: UserID wallet_id: WalletID @@ -40,13 +42,14 @@ class PaymentsMethodsDB(BaseModel): async def insert_init_payment_method( app: web.Application, + connection: AsyncConnection | None = None, *, payment_method_id: str, user_id: UserID, wallet_id: WalletID, initiated_at: datetime.datetime, ) -> None: - async with get_database_engine_legacy(app).acquire() as conn: + async with transaction_context(get_asyncpg_engine(app), connection) as conn: try: await conn.execute( payments_methods.insert().values( @@ -56,21 +59,22 @@ async def insert_init_payment_method( initiated_at=initiated_at, ) ) - except db_errors.UniqueViolation as err: + except sqlalchemy.exc.IntegrityError as err: raise PaymentMethodUniqueViolationError( payment_method_id=payment_method_id ) from err async def list_successful_payment_methods( - app, + app: web.Application, + connection: AsyncConnection | None = None, *, user_id: UserID, wallet_id: WalletID, -) -> list[PaymentsMethodsDB]: - async with get_database_engine_legacy(app).acquire() as conn: - result: ResultProxy = await conn.execute( - payments_methods.select() +) -> list[PaymentsMethodsGetDB]: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + result = await conn.execute( + sa.select(payments_methods) .where( (payments_methods.c.user_id == user_id) & (payments_methods.c.wallet_id == wallet_id) @@ -78,43 +82,45 @@ async def list_successful_payment_methods( ) .order_by(payments_methods.c.created.desc()) ) # newest first - rows = await result.fetchall() or [] - return TypeAdapter(list[PaymentsMethodsDB]).validate_python(rows) + rows = result.fetchall() + return TypeAdapter(list[PaymentsMethodsGetDB]).validate_python(rows) async def get_successful_payment_method( - app, + app: web.Application, + connection: AsyncConnection | None = None, *, user_id: UserID, wallet_id: WalletID, payment_method_id: PaymentMethodID, -) -> PaymentsMethodsDB: - async with get_database_engine_legacy(app).acquire() as conn: - result: ResultProxy = await conn.execute( - payments_methods.select().where( +) -> PaymentsMethodsGetDB: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + result = await conn.execute( + sa.select(payments_methods).where( (payments_methods.c.user_id == user_id) & (payments_methods.c.wallet_id == wallet_id) & (payments_methods.c.payment_method_id == payment_method_id) & (payments_methods.c.state == InitPromptAckFlowState.SUCCESS) ) ) - row = await result.first() + row = result.one_or_none() if row is None: raise PaymentMethodNotFoundError(payment_method_id=payment_method_id) - return PaymentsMethodsDB.model_validate(row) + return PaymentsMethodsGetDB.model_validate(row) async def get_pending_payment_methods_ids( app: web.Application, + connection: AsyncConnection | None = None, ) -> list[PaymentMethodID]: - async with get_database_engine_legacy(app).acquire() as conn: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: result = await conn.execute( sa.select(payments_methods.c.payment_method_id) .where(payments_methods.c.completed_at.is_(None)) .order_by(payments_methods.c.initiated_at.asc()) # oldest first ) - rows = await result.fetchall() or [] + rows = result.fetchall() return [ TypeAdapter(PaymentMethodID).validate_python(row.payment_method_id) for row in rows @@ -124,10 +130,11 @@ async def get_pending_payment_methods_ids( async def udpate_payment_method( app: web.Application, payment_method_id: PaymentMethodID, + connection: AsyncConnection | None = None, *, state: InitPromptAckFlowState, state_message: str | None, -) -> PaymentsMethodsDB: +) -> PaymentsMethodsGetDB: """ Raises: @@ -142,17 +149,16 @@ async def udpate_payment_method( if state_message: optional["state_message"] = state_message - async with get_database_engine_legacy(app).acquire() as conn, conn.begin(): - row = await ( - await conn.execute( - sa.select( - payments_methods.c.initiated_at, - payments_methods.c.completed_at, - ) - .where(payments_methods.c.payment_method_id == payment_method_id) - .with_for_update() + async with transaction_context(get_asyncpg_engine(app), connection) as conn: + result = await conn.execute( + sa.select( + payments_methods.c.initiated_at, + payments_methods.c.completed_at, ) - ).fetchone() + .where(payments_methods.c.payment_method_id == payment_method_id) + .with_for_update() + ) + row = result.one_or_none() if row is None: raise PaymentMethodNotFoundError(payment_method_id=payment_method_id) @@ -162,24 +168,24 @@ async def udpate_payment_method( result = await conn.execute( payments_methods.update() - .values(completed_at=func.now(), state=state, **optional) + .values(completed_at=sa.func.now(), state=state, **optional) .where(payments_methods.c.payment_method_id == payment_method_id) - .returning(literal_column("*")) + .returning(payments_methods) ) - row = await result.first() - assert row, "execute above should have caught this" # nosec + row = result.one() - return PaymentsMethodsDB.model_validate(row) + return PaymentsMethodsGetDB.model_validate(row) async def delete_payment_method( app: web.Application, + connection: AsyncConnection | None = None, *, user_id: UserID, wallet_id: WalletID, payment_method_id: PaymentMethodID, -): - async with get_database_engine_legacy(app).acquire() as conn: +) -> None: + async with transaction_context(get_asyncpg_engine(app), connection) as conn: await conn.execute( payments_methods.delete().where( (payments_methods.c.user_id == user_id) From 6ef23fb98cb297ab3b7e6527504b9bdaf3cbed6f Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 15:11:13 +0200 Subject: [PATCH 02/12] =?UTF-8?q?=F0=9F=90=9B=20Fix:=20rename=20PaymentsAu?= =?UTF-8?q?torechargeDB=20to=20PaymentsAutorechargeGetDB=20for=20consisten?= =?UTF-8?q?cy=20in=20autorecharge=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../payments/_autorecharge_api.py | 12 ++++++------ .../payments/_autorecharge_db.py | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_api.py b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_api.py index 346e48652a74..2c83ff4b05e6 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_api.py +++ b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_api.py @@ -11,7 +11,7 @@ from models_library.wallets import WalletID from ._autorecharge_db import ( - PaymentsAutorechargeDB, + PaymentsAutorechargeGetDB, get_wallet_autorecharge, replace_wallet_autorecharge, ) @@ -23,7 +23,7 @@ def _from_db_to_api_model( - db_model: PaymentsAutorechargeDB, min_balance_in_credits: NonNegativeDecimal + db_model: PaymentsAutorechargeGetDB, min_balance_in_credits: NonNegativeDecimal ) -> GetWalletAutoRecharge: return GetWalletAutoRecharge( enabled=db_model.enabled, @@ -36,8 +36,8 @@ def _from_db_to_api_model( def _from_api_to_db_model( wallet_id: WalletID, api_model: ReplaceWalletAutoRecharge -) -> PaymentsAutorechargeDB: - return PaymentsAutorechargeDB( +) -> PaymentsAutorechargeGetDB: + return PaymentsAutorechargeGetDB( wallet_id=wallet_id, enabled=api_model.enabled, primary_payment_method_id=api_model.payment_method_id, @@ -64,7 +64,7 @@ async def get_wallet_payment_autorecharge( app, user_id=user_id, wallet_id=wallet_id, product_name=product_name ) settings = get_plugin_settings(app) - got: PaymentsAutorechargeDB | None = await get_wallet_autorecharge( + got: PaymentsAutorechargeGetDB | None = await get_wallet_autorecharge( app, wallet_id=wallet_id ) if not got: @@ -104,7 +104,7 @@ async def replace_wallet_payment_autorecharge( ) settings = get_plugin_settings(app) - got: PaymentsAutorechargeDB = await replace_wallet_autorecharge( + got: PaymentsAutorechargeGetDB = await replace_wallet_autorecharge( app, user_id=user_id, wallet_id=wallet_id, diff --git a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py index b3ac90985f2e..ea2e378f5e52 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py +++ b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py @@ -18,7 +18,7 @@ AutoRechargeID: TypeAlias = PositiveInt -class PaymentsAutorechargeDB(BaseModel): +class PaymentsAutorechargeGetDB(BaseModel): wallet_id: WalletID enabled: bool primary_payment_method_id: PaymentMethodID @@ -31,12 +31,12 @@ async def get_wallet_autorecharge( app: web.Application, *, wallet_id: WalletID, -) -> PaymentsAutorechargeDB | None: +) -> PaymentsAutorechargeGetDB | None: async with get_database_engine_legacy(app).acquire() as conn: stmt = AutoRechargeStmts.get_wallet_autorecharge(wallet_id) result = await conn.execute(stmt) row = await result.first() - return PaymentsAutorechargeDB.model_validate(row) if row else None + return PaymentsAutorechargeGetDB.model_validate(row) if row else None async def replace_wallet_autorecharge( @@ -44,8 +44,8 @@ async def replace_wallet_autorecharge( *, user_id: UserID, wallet_id: WalletID, - new: PaymentsAutorechargeDB, -) -> PaymentsAutorechargeDB: + new: PaymentsAutorechargeGetDB, +) -> PaymentsAutorechargeGetDB: """ Raises: InvalidPaymentMethodError: if `new` includes some invalid 'primary_payment_method_id' @@ -73,4 +73,4 @@ async def replace_wallet_autorecharge( result = await conn.execute(stmt) row = await result.first() assert row # nosec - return PaymentsAutorechargeDB.model_validate(row) + return PaymentsAutorechargeGetDB.model_validate(row) From dbb11f1a8897b170fa8abd971587ba0812b357cc Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 15:40:08 +0200 Subject: [PATCH 03/12] =?UTF-8?q?=F0=9F=90=9B=20Fix:=20update=20database?= =?UTF-8?q?=20connection=20handling=20in=20autorecharge=20functions=20to?= =?UTF-8?q?=20use=20asyncpg=20engine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../payments/_autorecharge_db.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py index ea2e378f5e52..4cd6f3f4ad4a 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py +++ b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py @@ -8,8 +8,13 @@ from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict, PositiveInt from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStmts +from simcore_postgres_database.utils_repos import ( + pass_or_acquire_connection, + transaction_context, +) +from sqlalchemy.ext.asyncio import AsyncConnection -from ..db.plugin import get_database_engine_legacy +from ..db.plugin import get_asyncpg_engine from .errors import InvalidPaymentMethodError _logger = logging.getLogger(__name__) @@ -29,18 +34,20 @@ class PaymentsAutorechargeGetDB(BaseModel): async def get_wallet_autorecharge( app: web.Application, + connection: AsyncConnection | None = None, *, wallet_id: WalletID, ) -> PaymentsAutorechargeGetDB | None: - async with get_database_engine_legacy(app).acquire() as conn: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: stmt = AutoRechargeStmts.get_wallet_autorecharge(wallet_id) result = await conn.execute(stmt) - row = await result.first() + row = result.one_or_none() return PaymentsAutorechargeGetDB.model_validate(row) if row else None async def replace_wallet_autorecharge( app: web.Application, + connection: AsyncConnection | None = None, *, user_id: UserID, wallet_id: WalletID, @@ -51,7 +58,7 @@ async def replace_wallet_autorecharge( InvalidPaymentMethodError: if `new` includes some invalid 'primary_payment_method_id' """ - async with get_database_engine_legacy(app).acquire() as conn: + async with transaction_context(get_asyncpg_engine(app), connection) as conn: stmt = AutoRechargeStmts.is_valid_payment_method( user_id=user_id, wallet_id=new.wallet_id, @@ -71,6 +78,5 @@ async def replace_wallet_autorecharge( monthly_limit_in_usd=new.monthly_limit_in_usd, ) result = await conn.execute(stmt) - row = await result.first() - assert row # nosec + row = result.one() return PaymentsAutorechargeGetDB.model_validate(row) From a09d4053c16a0567620041d2290ea8e5ae4c3d6b Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 15:50:44 +0200 Subject: [PATCH 04/12] =?UTF-8?q?=F0=9F=90=9B=20Fix:=20rename=20PaymentsTr?= =?UTF-8?q?ansactionsDB=20to=20PaymentsTransactionsGetDB=20for=20consisten?= =?UTF-8?q?cy=20and=20update=20database=20connection=20handling=20to=20use?= =?UTF-8?q?=20asyncpg=20engine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../utils_payments.py | 44 +++++++++---------- .../test_models_payments_transactions.py | 26 ++++++----- .../payments/_onetime_api.py | 6 +-- .../payments/_onetime_db.py | 33 +++++++++----- 4 files changed, 58 insertions(+), 51 deletions(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_payments.py b/packages/postgres-database/src/simcore_postgres_database/utils_payments.py index de4db3abe11b..9c5693ba7f6c 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_payments.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_payments.py @@ -5,17 +5,16 @@ from typing import Final, TypeAlias import sqlalchemy as sa -from aiopg.sa.connection import SAConnection -from aiopg.sa.result import ResultProxy, RowProxy +import sqlalchemy.exc +from sqlalchemy.ext.asyncio import AsyncConnection -from . import aiopg_errors from .models.payments_transactions import PaymentTransactionState, payments_transactions _logger = logging.getLogger(__name__) PaymentID: TypeAlias = str -PaymentTransactionRow: TypeAlias = RowProxy +PaymentTransactionRow: TypeAlias = sa.Row UNSET: Final[str] = "__UNSET__" @@ -39,7 +38,7 @@ class PaymentAlreadyAcked(PaymentFailure): ... async def insert_init_payment_transaction( - connection: SAConnection, + connection: AsyncConnection, *, payment_id: str, price_dollars: Decimal, @@ -66,14 +65,14 @@ async def insert_init_payment_transaction( initiated_at=initiated_at, ) ) - except aiopg_errors.UniqueViolation: + except sqlalchemy.exc.IntegrityError: return PaymentAlreadyExists(payment_id) return payment_id async def update_payment_transaction_state( - connection: SAConnection, + connection: AsyncConnection, *, payment_id: str, completion_state: PaymentTransactionState, @@ -101,16 +100,15 @@ async def update_payment_transaction_state( optional["invoice_url"] = invoice_url async with connection.begin(): - row = await ( - await connection.execute( - sa.select( - payments_transactions.c.initiated_at, - payments_transactions.c.completed_at, - ) - .where(payments_transactions.c.payment_id == payment_id) - .with_for_update() + result = await connection.execute( + sa.select( + payments_transactions.c.initiated_at, + payments_transactions.c.completed_at, ) - ).fetchone() + .where(payments_transactions.c.payment_id == payment_id) + .with_for_update() + ) + row = result.one_or_none() if row is None: return PaymentNotFound(payment_id=payment_id) @@ -125,16 +123,14 @@ async def update_payment_transaction_state( payments_transactions.update() .values(completed_at=sa.func.now(), state=completion_state, **optional) .where(payments_transactions.c.payment_id == payment_id) - .returning(sa.literal_column("*")) + .returning(payments_transactions) ) - row = await result.first() - assert row, "execute above should have caught this" # nosec - assert isinstance(row, RowProxy) # nosec + row = result.one() return row async def get_user_payments_transactions( - connection: SAConnection, + connection: AsyncConnection, *, user_id: int, offset: int | None = None, @@ -149,7 +145,7 @@ async def get_user_payments_transactions( # NOTE: what if between these two calls there are new rows? can we get this in an atomic call?å stmt = ( - payments_transactions.select() + sa.select(payments_transactions) .where(payments_transactions.c.user_id == user_id) .order_by(payments_transactions.c.created.desc()) ) # newest first @@ -162,6 +158,6 @@ async def get_user_payments_transactions( # InvalidRowCountInLimitClause: LIMIT must not be negative stmt = stmt.limit(limit) - result: ResultProxy = await connection.execute(stmt) - rows = await result.fetchall() or [] + result = await connection.execute(stmt) + rows = result.fetchall() return total_number_of_items, rows diff --git a/packages/postgres-database/tests/test_models_payments_transactions.py b/packages/postgres-database/tests/test_models_payments_transactions.py index 6dde13b1abed..a26fae6aae33 100644 --- a/packages/postgres-database/tests/test_models_payments_transactions.py +++ b/packages/postgres-database/tests/test_models_payments_transactions.py @@ -10,8 +10,6 @@ import pytest import sqlalchemy as sa -from aiopg.sa.connection import SAConnection -from aiopg.sa.result import RowProxy from faker import Faker from pytest_simcore.helpers.faker_factories import random_payment_transaction, utcnow from simcore_postgres_database.models.payments_transactions import ( @@ -26,9 +24,10 @@ insert_init_payment_transaction, update_payment_transaction_state, ) +from sqlalchemy.ext.asyncio import AsyncConnection -async def test_numerics_precission_and_scale(connection: SAConnection): +async def test_numerics_precission_and_scale(connection: AsyncConnection): # https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.Numeric # precision: This parameter specifies the total number of digits that can be stored, both before and after the decimal point. # scale: This parameter specifies the number of digits that can be stored to the right of the decimal point. @@ -58,7 +57,7 @@ def _remove_not_required(data: dict[str, Any]) -> dict[str, Any]: @pytest.fixture -def init_transaction(connection: SAConnection): +def init_transaction(connection: AsyncConnection): async def _init(payment_id: str): # get payment_id from payment-gateway values = _remove_not_required(random_payment_transaction(payment_id=payment_id)) @@ -81,7 +80,7 @@ def payment_id() -> str: async def test_init_transaction_sets_it_as_pending( - connection: SAConnection, init_transaction: Callable, payment_id: str + connection: AsyncConnection, init_transaction: Callable, payment_id: str ): values = await init_transaction(payment_id) assert values["payment_id"] == payment_id @@ -94,11 +93,11 @@ async def test_init_transaction_sets_it_as_pending( payments_transactions.c.state_message, ).where(payments_transactions.c.payment_id == payment_id) ) - row: RowProxy | None = await result.fetchone() + row = result.one_or_none() assert row is not None # tests that defaults are right? - assert dict(row.items()) == { + assert dict(row._mapping.items()) == { "completed_at": None, "state": PaymentTransactionState.PENDING, "state_message": None, @@ -127,7 +126,7 @@ def invoice_url(faker: Faker, expected_state: PaymentTransactionState) -> str | ], ) async def test_complete_transaction( - connection: SAConnection, + connection: AsyncConnection, init_transaction: Callable, payment_id: str, expected_state: PaymentTransactionState, @@ -152,7 +151,7 @@ async def test_complete_transaction( async def test_update_transaction_failures_and_exceptions( - connection: SAConnection, + connection: AsyncConnection, init_transaction: Callable, payment_id: str, ): @@ -188,7 +187,9 @@ def user_id() -> int: @pytest.fixture -def create_fake_user_transactions(connection: SAConnection, user_id: int) -> Callable: +def create_fake_user_transactions( + connection: AsyncConnection, user_id: int +) -> Callable: async def _go(expected_total=5): payment_ids = [] for _ in range(expected_total): @@ -204,7 +205,7 @@ async def _go(expected_total=5): async def test_get_user_payments_transactions( - connection: SAConnection, create_fake_user_transactions: Callable, user_id: int + connection: AsyncConnection, create_fake_user_transactions: Callable, user_id: int ): expected_payments_ids = await create_fake_user_transactions() expected_total = len(expected_payments_ids) @@ -216,7 +217,7 @@ async def test_get_user_payments_transactions( async def test_get_user_payments_transactions_with_pagination_options( - connection: SAConnection, create_fake_user_transactions: Callable, user_id: int + connection: AsyncConnection, create_fake_user_transactions: Callable, user_id: int ): expected_payments_ids = await create_fake_user_transactions() expected_total = len(expected_payments_ids) @@ -244,3 +245,4 @@ async def test_get_user_payments_transactions_with_pagination_options( connection, user_id=user_id, limit=0 ) assert not rows + assert not rows diff --git a/services/web/server/src/simcore_service_webserver/payments/_onetime_api.py b/services/web/server/src/simcore_service_webserver/payments/_onetime_api.py index 6d427cfc023a..f0afdb9d168c 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_onetime_api.py +++ b/services/web/server/src/simcore_service_webserver/payments/_onetime_api.py @@ -24,7 +24,7 @@ from simcore_postgres_database.utils_payments import insert_init_payment_transaction from yarl import URL -from ..db.plugin import get_database_engine_legacy +from ..db.plugin import get_asyncpg_engine from ..products import products_service from ..resource_usage.service import add_credits_to_wallet from ..users import users_service @@ -46,7 +46,7 @@ def _to_api_model( - transaction: _onetime_db.PaymentsTransactionsDB, + transaction: _onetime_db.PaymentsTransactionsGetDB, ) -> PaymentTransaction: data: dict[str, Any] = { "payment_id": transaction.payment_id, @@ -90,7 +90,7 @@ async def _fake_init_payment( .with_query(id=payment_id) ) # (2) Annotate INIT transaction - async with get_database_engine_legacy(app).acquire() as conn: + async with get_asyncpg_engine(app).begin() as conn: await insert_init_payment_transaction( conn, payment_id=payment_id, diff --git a/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py b/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py index d6014509461f..2e6c6a4efdea 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py +++ b/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py @@ -20,8 +20,13 @@ get_user_payments_transactions, update_payment_transaction_state, ) +from simcore_postgres_database.utils_repos import ( + pass_or_acquire_connection, + transaction_context, +) +from sqlalchemy.ext.asyncio import AsyncConnection -from ..db.plugin import get_database_engine_legacy +from ..db.plugin import get_asyncpg_engine from .errors import PaymentCompletedError, PaymentNotFoundError _logger = logging.getLogger(__name__) @@ -30,7 +35,7 @@ # # NOTE: this will be moved to the payments service # NOTE: with https://sqlmodel.tiangolo.com/ we would only define this once! -class PaymentsTransactionsDB(BaseModel): +class PaymentsTransactionsGetDB(BaseModel): payment_id: PaymentID price_dollars: Decimal # accepts negatives osparc_credits: Decimal # accepts negatives @@ -48,43 +53,47 @@ class PaymentsTransactionsDB(BaseModel): async def list_user_payment_transactions( - app, + app: web.Application, + connection: AsyncConnection | None = None, *, user_id: UserID, offset: PositiveInt, limit: PositiveInt, -) -> tuple[int, list[PaymentsTransactionsDB]]: +) -> tuple[int, list[PaymentsTransactionsGetDB]]: """List payments done by a give user (any wallet) Sorted by newest-first """ - async with get_database_engine_legacy(app).acquire() as conn: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: total_number_of_items, rows = await get_user_payments_transactions( conn, user_id=user_id, offset=offset, limit=limit ) - page = TypeAdapter(list[PaymentsTransactionsDB]).validate_python(rows) + page = TypeAdapter(list[PaymentsTransactionsGetDB]).validate_python(rows) return total_number_of_items, page -async def get_pending_payment_transactions_ids(app: web.Application) -> list[PaymentID]: - async with get_database_engine_legacy(app).acquire() as conn: +async def get_pending_payment_transactions_ids( + app: web.Application, connection: AsyncConnection | None = None +) -> list[PaymentID]: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: result = await conn.execute( sa.select(payments_transactions.c.payment_id) .where(payments_transactions.c.completed_at == None) # noqa: E711 .order_by(payments_transactions.c.initiated_at.asc()) # oldest first ) - rows = await result.fetchall() or [] + rows = result.fetchall() return [TypeAdapter(PaymentID).validate_python(row.payment_id) for row in rows] async def complete_payment_transaction( app: web.Application, + connection: AsyncConnection | None = None, *, payment_id: PaymentID, completion_state: PaymentTransactionState, state_message: str | None, invoice_url: HttpUrl | None = None, -) -> PaymentsTransactionsDB: +) -> PaymentsTransactionsGetDB: """ Raises: @@ -95,7 +104,7 @@ async def complete_payment_transaction( if invoice_url: optional_kwargs["invoice_url"] = invoice_url - async with get_database_engine_legacy(app).acquire() as conn: + async with transaction_context(get_asyncpg_engine(app), connection) as conn: row = await update_payment_transaction_state( conn, payment_id=payment_id, @@ -111,4 +120,4 @@ async def complete_payment_transaction( raise PaymentCompletedError(payment_id=row.payment_id) assert row # nosec - return PaymentsTransactionsDB.model_validate(row) + return PaymentsTransactionsGetDB.model_validate(row) From 88bec0d7db3dae841f20340d0a472c5eac65eb75 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:10:14 +0200 Subject: [PATCH 05/12] =?UTF-8?q?=F0=9F=90=9B=20Fix:=20refactor=20payment?= =?UTF-8?q?=20transaction=20state=20updates=20to=20use=20asyncpg=20engine?= =?UTF-8?q?=20and=20improve=20connection=20handling=20in=20tests=20and=20p?= =?UTF-8?q?ayment=20completion=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../utils_payments.py | 3 +- .../test_models_payments_transactions.py | 178 ++++++++++-------- .../payments/_onetime_db.py | 4 +- 3 files changed, 100 insertions(+), 85 deletions(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_payments.py b/packages/postgres-database/src/simcore_postgres_database/utils_payments.py index 9c5693ba7f6c..4f0c7bded762 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_payments.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_payments.py @@ -125,8 +125,7 @@ async def update_payment_transaction_state( .where(payments_transactions.c.payment_id == payment_id) .returning(payments_transactions) ) - row = result.one() - return row + return result.one() async def get_user_payments_transactions( diff --git a/packages/postgres-database/tests/test_models_payments_transactions.py b/packages/postgres-database/tests/test_models_payments_transactions.py index a26fae6aae33..e8ccc44a004d 100644 --- a/packages/postgres-database/tests/test_models_payments_transactions.py +++ b/packages/postgres-database/tests/test_models_payments_transactions.py @@ -24,23 +24,25 @@ insert_init_payment_transaction, update_payment_transaction_state, ) -from sqlalchemy.ext.asyncio import AsyncConnection +from simcore_postgres_database.utils_repos import transaction_context +from sqlalchemy.ext.asyncio import AsyncEngine -async def test_numerics_precission_and_scale(connection: AsyncConnection): +async def test_numerics_precission_and_scale(asyncpg_engine: AsyncEngine): # https://docs.sqlalchemy.org/en/20/core/type_basics.html#sqlalchemy.types.Numeric # precision: This parameter specifies the total number of digits that can be stored, both before and after the decimal point. # scale: This parameter specifies the number of digits that can be stored to the right of the decimal point. - for order_of_magnitude in range(8): - expected = 10**order_of_magnitude + 0.123 - got = await connection.scalar( - payments_transactions.insert() - .values(**random_payment_transaction(price_dollars=expected)) - .returning(payments_transactions.c.price_dollars) - ) - assert isinstance(got, decimal.Decimal) - assert float(got) == expected + async with asyncpg_engine.begin() as connection: + for order_of_magnitude in range(8): + expected = 10**order_of_magnitude + 0.123 + got = await connection.scalar( + payments_transactions.insert() + .values(**random_payment_transaction(price_dollars=expected)) + .returning(payments_transactions.c.price_dollars) + ) + assert isinstance(got, decimal.Decimal) + assert float(got) == expected def _remove_not_required(data: dict[str, Any]) -> dict[str, Any]: @@ -57,7 +59,7 @@ def _remove_not_required(data: dict[str, Any]) -> dict[str, Any]: @pytest.fixture -def init_transaction(connection: AsyncConnection): +def init_transaction(asyncpg_engine: AsyncEngine): async def _init(payment_id: str): # get payment_id from payment-gateway values = _remove_not_required(random_payment_transaction(payment_id=payment_id)) @@ -66,7 +68,8 @@ async def _init(payment_id: str): values["initiated_at"] = utcnow() # insert - ok = await insert_init_payment_transaction(connection, **values) + async with asyncpg_engine.begin() as connection: + ok = await insert_init_payment_transaction(connection, **values) assert ok return values @@ -80,19 +83,20 @@ def payment_id() -> str: async def test_init_transaction_sets_it_as_pending( - connection: AsyncConnection, init_transaction: Callable, payment_id: str + asyncpg_engine: AsyncEngine, init_transaction: Callable, payment_id: str ): values = await init_transaction(payment_id) assert values["payment_id"] == payment_id # check init-ed but not completed! - result = await connection.execute( - sa.select( - payments_transactions.c.completed_at, - payments_transactions.c.state, - payments_transactions.c.state_message, - ).where(payments_transactions.c.payment_id == payment_id) - ) + async with asyncpg_engine.connect() as connection: + result = await connection.execute( + sa.select( + payments_transactions.c.completed_at, + payments_transactions.c.state, + payments_transactions.c.state_message, + ).where(payments_transactions.c.payment_id == payment_id) + ) row = result.one_or_none() assert row is not None @@ -126,59 +130,64 @@ def invoice_url(faker: Faker, expected_state: PaymentTransactionState) -> str | ], ) async def test_complete_transaction( - connection: AsyncConnection, + asyncpg_engine: AsyncEngine, init_transaction: Callable, payment_id: str, expected_state: PaymentTransactionState, expected_message: str | None, invoice_url: str | None, ): + # init await init_transaction(payment_id) - payment_row = await update_payment_transaction_state( - connection, - payment_id=payment_id, - completion_state=expected_state, - state_message=expected_message, - invoice_url=invoice_url, - ) + async with asyncpg_engine.connect() as connection: + # NOTE: internal function uses transaction + payment_row = await update_payment_transaction_state( + connection, + payment_id=payment_id, + completion_state=expected_state, + state_message=expected_message, + invoice_url=invoice_url, + ) - assert isinstance(payment_row, PaymentTransactionRow) - assert payment_row.state_message == expected_message - assert payment_row.state == expected_state - assert payment_row.initiated_at < payment_row.completed_at - assert PaymentTransactionState(payment_row.state).is_completed() + assert isinstance(payment_row, PaymentTransactionRow) + assert payment_row.state_message == expected_message + assert payment_row.state == expected_state + assert payment_row.initiated_at < payment_row.completed_at + assert PaymentTransactionState(payment_row.state).is_completed() async def test_update_transaction_failures_and_exceptions( - connection: AsyncConnection, + asyncpg_engine: AsyncEngine, init_transaction: Callable, payment_id: str, ): - kwargs = { - "connection": connection, - "payment_id": payment_id, - "completion_state": PaymentTransactionState.SUCCESS, - } - ok = await update_payment_transaction_state(**kwargs) - assert isinstance(ok, PaymentNotFound) - assert not ok - - # init & complete - await init_transaction(payment_id) - ok = await update_payment_transaction_state(**kwargs) - assert isinstance(ok, PaymentTransactionRow) - assert ok + async with asyncpg_engine.connect() as connection: + kwargs = { + "connection": connection, + "payment_id": payment_id, + "completion_state": PaymentTransactionState.SUCCESS, + } + + ok = await update_payment_transaction_state(**kwargs) + assert isinstance(ok, PaymentNotFound) + assert not ok + + # init & complete + await init_transaction(payment_id) + ok = await update_payment_transaction_state(**kwargs) + assert isinstance(ok, PaymentTransactionRow) + assert ok - # repeat -> fails - ok = await update_payment_transaction_state(**kwargs) - assert isinstance(ok, PaymentAlreadyAcked) - assert not ok + # repeat -> fails + ok = await update_payment_transaction_state(**kwargs) + assert isinstance(ok, PaymentAlreadyAcked) + assert not ok - with pytest.raises(ValueError): - kwargs.update(completion_state=PaymentTransactionState.PENDING) - await update_payment_transaction_state(**kwargs) + with pytest.raises(ValueError, match="cannot update state with"): # noqa: PT012 + kwargs.update(completion_state=PaymentTransactionState.PENDING) + await update_payment_transaction_state(**kwargs) @pytest.fixture @@ -188,14 +197,18 @@ def user_id() -> int: @pytest.fixture def create_fake_user_transactions( - connection: AsyncConnection, user_id: int + asyncpg_engine: AsyncEngine, user_id: int ) -> Callable: + + assert asyncpg_engine + async def _go(expected_total=5): payment_ids = [] for _ in range(expected_total): values = _remove_not_required(random_payment_transaction(user_id=user_id)) - payment_id = await insert_init_payment_transaction(connection, **values) + async with transaction_context(asyncpg_engine) as connection: + payment_id = await insert_init_payment_transaction(connection, **values) assert payment_id payment_ids.append(payment_id) @@ -205,19 +218,21 @@ async def _go(expected_total=5): async def test_get_user_payments_transactions( - connection: AsyncConnection, create_fake_user_transactions: Callable, user_id: int + asyncpg_engine: AsyncEngine, create_fake_user_transactions: Callable, user_id: int ): expected_payments_ids = await create_fake_user_transactions() expected_total = len(expected_payments_ids) # test offset and limit defaults - total, rows = await get_user_payments_transactions(connection, user_id=user_id) + async with asyncpg_engine.connect() as connection: + total, rows = await get_user_payments_transactions(connection, user_id=user_id) + assert total == expected_total assert [r.payment_id for r in rows] == expected_payments_ids[::-1], "newest first" async def test_get_user_payments_transactions_with_pagination_options( - connection: AsyncConnection, create_fake_user_transactions: Callable, user_id: int + asyncpg_engine: AsyncEngine, create_fake_user_transactions: Callable, user_id: int ): expected_payments_ids = await create_fake_user_transactions() expected_total = len(expected_payments_ids) @@ -226,23 +241,24 @@ async def test_get_user_payments_transactions_with_pagination_options( offset = int(expected_total / 4) limit = int(expected_total / 2) - total, rows = await get_user_payments_transactions( - connection, user_id=user_id, limit=limit, offset=offset - ) - assert total == expected_total - assert [r.payment_id for r in rows] == expected_payments_ids[::-1][ - offset : (offset + limit) - ], "newest first" - - # test offset>=expected_total? - total, rows = await get_user_payments_transactions( - connection, user_id=user_id, offset=expected_total - ) - assert not rows - - # test limit==0? - total, rows = await get_user_payments_transactions( - connection, user_id=user_id, limit=0 - ) - assert not rows - assert not rows + async with asyncpg_engine.connect() as connection: + total, rows = await get_user_payments_transactions( + connection, user_id=user_id, limit=limit, offset=offset + ) + assert total == expected_total + assert [r.payment_id for r in rows] == expected_payments_ids[::-1][ + offset : (offset + limit) + ], "newest first" + + # test offset>=expected_total? + total, rows = await get_user_payments_transactions( + connection, user_id=user_id, offset=expected_total + ) + assert not rows + + # test limit==0? + total, rows = await get_user_payments_transactions( + connection, user_id=user_id, limit=0 + ) + assert not rows + assert not rows diff --git a/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py b/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py index 2e6c6a4efdea..a7e4b224cebe 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py +++ b/services/web/server/src/simcore_service_webserver/payments/_onetime_db.py @@ -22,7 +22,6 @@ ) from simcore_postgres_database.utils_repos import ( pass_or_acquire_connection, - transaction_context, ) from sqlalchemy.ext.asyncio import AsyncConnection @@ -104,7 +103,8 @@ async def complete_payment_transaction( if invoice_url: optional_kwargs["invoice_url"] = invoice_url - async with transaction_context(get_asyncpg_engine(app), connection) as conn: + async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: + # NOTE: update_payment_transaction_state() uses a transaction internally, therefore we use pass_or_acquire_connection(...) row = await update_payment_transaction_state( conn, payment_id=payment_id, From 10af811b34edba0bbed3d6080ea375e0a885ea3f Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 16:12:13 +0200 Subject: [PATCH 06/12] minor --- .../src/simcore_postgres_database/utils_payments.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_payments.py b/packages/postgres-database/src/simcore_postgres_database/utils_payments.py index 4f0c7bded762..31d182a48984 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_payments.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_payments.py @@ -6,6 +6,7 @@ import sqlalchemy as sa import sqlalchemy.exc +from sqlalchemy.engine import Row from sqlalchemy.ext.asyncio import AsyncConnection from .models.payments_transactions import PaymentTransactionState, payments_transactions @@ -14,7 +15,7 @@ PaymentID: TypeAlias = str -PaymentTransactionRow: TypeAlias = sa.Row +PaymentTransactionRow: TypeAlias = Row UNSET: Final[str] = "__UNSET__" From 57e08e53eb81891d47bf0138d9d66606e3ea5b03 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 18:18:05 +0200 Subject: [PATCH 07/12] fixes pylint --- .../tests/test_models_payments_transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/postgres-database/tests/test_models_payments_transactions.py b/packages/postgres-database/tests/test_models_payments_transactions.py index e8ccc44a004d..1ad923c7876e 100644 --- a/packages/postgres-database/tests/test_models_payments_transactions.py +++ b/packages/postgres-database/tests/test_models_payments_transactions.py @@ -3,6 +3,7 @@ # pylint: disable=unexpected-keyword-arg # pylint: disable=unused-argument # pylint: disable=unused-variable +# pytlin: disable=protected-access import decimal from collections.abc import Callable @@ -101,7 +102,7 @@ async def test_init_transaction_sets_it_as_pending( assert row is not None # tests that defaults are right? - assert dict(row._mapping.items()) == { + assert row._asdict() == { "completed_at": None, "state": PaymentTransactionState.PENDING, "state_message": None, From 15cf69934e1ffcad087223a451f74044e56e19c8 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 18:20:27 +0200 Subject: [PATCH 08/12] rm unused exceptions and models using aiopg --- .../simcore_service_webserver/garbage_collector/_core_guests.py | 2 -- .../simcore_service_webserver/garbage_collector/_core_utils.py | 1 - .../web/server/src/simcore_service_webserver/projects/models.py | 2 -- 3 files changed, 5 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_guests.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_guests.py index 6fda91dbb6af..0477c8c1c9b8 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_guests.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_guests.py @@ -8,7 +8,6 @@ from models_library.users import UserID, UserNameID from redis.asyncio import Redis from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE -from simcore_postgres_database.aiopg_errors import DatabaseError from simcore_postgres_database.models.users import UserRole from ..projects._projects_repository_legacy import ProjectDBAPI @@ -162,7 +161,6 @@ async def remove_guest_user_with_all_its_resources( await users_service.delete_user_without_projects(app, user_id=user_id) except ( - DatabaseError, asyncpg.exceptions.PostgresError, ProjectNotFoundError, UserNotFoundError, diff --git a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_utils.py b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_utils.py index 40f64423b2e8..0416dc441911 100644 --- a/services/web/server/src/simcore_service_webserver/garbage_collector/_core_utils.py +++ b/services/web/server/src/simcore_service_webserver/garbage_collector/_core_utils.py @@ -137,7 +137,6 @@ async def replace_current_owner( ) except ( - DatabaseError, asyncpg.exceptions.PostgresError, ProjectNotFoundError, UserNotFoundError, diff --git a/services/web/server/src/simcore_service_webserver/projects/models.py b/services/web/server/src/simcore_service_webserver/projects/models.py index 84bf04f8d531..5ba5615fce89 100644 --- a/services/web/server/src/simcore_service_webserver/projects/models.py +++ b/services/web/server/src/simcore_service_webserver/projects/models.py @@ -2,7 +2,6 @@ from enum import Enum from typing import Any, TypeAlias -from aiopg.sa.result import RowProxy from common_library.dict_tools import remap_keys from models_library.api_schemas_webserver.projects import ProjectPatch from models_library.api_schemas_webserver.projects_ui import StudyUI @@ -19,7 +18,6 @@ from simcore_postgres_database.models.projects import ProjectTemplateType, ProjectType ProjectDict: TypeAlias = dict[str, Any] -ProjectProxy: TypeAlias = RowProxy class ProjectTypeAPI(str, Enum): From a098dcff449320b9ee49f0da60ac26fc1e5fc1e7 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 18:35:18 +0200 Subject: [PATCH 09/12] fixes appkey --- .../src/servicelib/aiohttp/db_asyncpg_engine.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py b/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py index 9e5056f67bc5..14af08084a00 100644 --- a/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py +++ b/packages/service-library/src/servicelib/aiohttp/db_asyncpg_engine.py @@ -17,23 +17,22 @@ from ..db_asyncpg_utils import create_async_engine_and_database_ready from ..logging_utils import log_context -APP_DB_ASYNC_ENGINE_KEY: Final[str] = f"{__name__ }.AsyncEngine" - +DB_ASYNC_ENGINE_APPKEY: Final = web.AppKey("DB_ASYNC_ENGINE", AsyncEngine) _logger = logging.getLogger(__name__) def _set_async_engine_to_app_state(app: web.Application, engine: AsyncEngine): - if exists := app.get(APP_DB_ASYNC_ENGINE_KEY, None): - msg = f"An instance of {type(exists)} already in app[{APP_DB_ASYNC_ENGINE_KEY}]={exists}" + if exists := app.get(DB_ASYNC_ENGINE_APPKEY, None): + msg = f"An instance of {type(exists)} already in app[{DB_ASYNC_ENGINE_APPKEY}]={exists}" raise ValueError(msg) - app[APP_DB_ASYNC_ENGINE_KEY] = engine + app[DB_ASYNC_ENGINE_APPKEY] = engine return get_async_engine(app) def get_async_engine(app: web.Application) -> AsyncEngine: - engine: AsyncEngine = app[APP_DB_ASYNC_ENGINE_KEY] + engine: AsyncEngine = app[DB_ASYNC_ENGINE_APPKEY] assert engine # nosec return engine From eccda816a785b67e2fe0a8f922bbcb3ed5bf559f Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 18:46:53 +0200 Subject: [PATCH 10/12] =?UTF-8?q?=F0=9F=90=9B=20Fix:=20rename=20AutoRechar?= =?UTF-8?q?geStmts=20to=20AutoRechargeStatements=20for=20consistency=20and?= =?UTF-8?q?=20update=20references=20across=20the=20codebase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../utils_payments_autorecharge.py | 2 +- .../tests/test_utils_payments_autorecharge.py | 144 ++++++++++-------- .../db/auto_recharge_repo.py | 8 +- .../payments/_autorecharge_db.py | 8 +- 4 files changed, 86 insertions(+), 76 deletions(-) diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_payments_autorecharge.py b/packages/postgres-database/src/simcore_postgres_database/utils_payments_autorecharge.py index c8e482f26eed..27fbc2836200 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_payments_autorecharge.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_payments_autorecharge.py @@ -7,7 +7,7 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert -class AutoRechargeStmts: +class AutoRechargeStatements: @staticmethod def is_valid_payment_method(user_id, wallet_id, payment_method_id) -> sa.sql.Select: return sa.select(payments_methods.c.payment_method_id).where( diff --git a/packages/postgres-database/tests/test_utils_payments_autorecharge.py b/packages/postgres-database/tests/test_utils_payments_autorecharge.py index 1746b8720cc9..7fa6218430b7 100644 --- a/packages/postgres-database/tests/test_utils_payments_autorecharge.py +++ b/packages/postgres-database/tests/test_utils_payments_autorecharge.py @@ -4,37 +4,37 @@ # pylint: disable=too-many-arguments import datetime -from typing import TypeAlias +from collections.abc import AsyncIterable import pytest -import sqlalchemy as sa -from aiopg.sa.connection import SAConnection -from aiopg.sa.result import RowProxy from faker import Faker from pytest_simcore.helpers.faker_factories import random_payment_method, utcnow +from pytest_simcore.helpers.postgres_tools import insert_and_get_row_lifespan from simcore_postgres_database.models.payments_methods import ( InitPromptAckFlowState, payments_methods, ) -from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStmts +from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStatements +from sqlalchemy.engine.row import Row +from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine # # HELPERS # -async def _get_auto_recharge(connection, wallet_id) -> RowProxy | None: +async def _get_auto_recharge(connection: AsyncConnection, wallet_id) -> Row | None: # has recharge trigger? - stmt = AutoRechargeStmts.get_wallet_autorecharge(wallet_id) + stmt = AutoRechargeStatements.get_wallet_autorecharge(wallet_id) result = await connection.execute(stmt) - return await result.first() + return result.first() async def _is_valid_payment_method( - connection, user_id, wallet_id, payment_method_id + connection: AsyncConnection, user_id, wallet_id, payment_method_id ) -> bool: - stmt = AutoRechargeStmts.is_valid_payment_method( + stmt = AutoRechargeStatements.is_valid_payment_method( user_id, wallet_id, payment_method_id ) primary_payment_method_id = await connection.scalar(stmt) @@ -42,37 +42,43 @@ async def _is_valid_payment_method( async def _upsert_autorecharge( - connection, + connection: AsyncConnection, wallet_id, enabled, primary_payment_method_id, top_up_amount_in_usd, monthly_limit_in_usd, -) -> RowProxy: +) -> Row: # using this primary payment-method, create an autorecharge # NOTE: requires the entire - stmt = AutoRechargeStmts.upsert_wallet_autorecharge( + stmt = AutoRechargeStatements.upsert_wallet_autorecharge( wallet_id=wallet_id, enabled=enabled, primary_payment_method_id=primary_payment_method_id, top_up_amount_in_usd=top_up_amount_in_usd, monthly_limit_in_usd=monthly_limit_in_usd, ) - row = await (await connection.execute(stmt)).first() - assert row - return row + result = await connection.execute(stmt) + return result.one() -async def _update_autorecharge(connection, wallet_id, **settings) -> int | None: - stmt = AutoRechargeStmts.update_wallet_autorecharge(wallet_id, **settings) +async def _update_autorecharge( + connection: AsyncConnection, wallet_id, **settings +) -> int | None: + stmt = AutoRechargeStatements.update_wallet_autorecharge(wallet_id, **settings) return await connection.scalar(stmt) -PaymentMethodRow: TypeAlias = RowProxy +class PaymentMethodRow(dict): + # Convert dict to Row-like object for compatibility + def __getattr__(self, key): + return self[key] @pytest.fixture -async def payment_method(connection: SAConnection, faker: Faker) -> PaymentMethodRow: +async def payment_method( + asyncpg_engine: AsyncEngine, faker: Faker +) -> AsyncIterable[PaymentMethodRow]: payment_method_id = faker.uuid4().upper() raw_payment_method = random_payment_method( @@ -81,57 +87,61 @@ async def payment_method(connection: SAConnection, faker: Faker) -> PaymentMetho completed_at=utcnow() + datetime.timedelta(seconds=1), state=InitPromptAckFlowState.SUCCESS, ) - result = await connection.execute( - payments_methods.insert() - .values(**raw_payment_method) - .returning(sa.literal_column("*")) - ) - row = await result.first() - assert row - assert row.payment_method_id == payment_method_id - wallet_id = row.wallet_id - user_id = row.user_id - - assert await _is_valid_payment_method( - connection, user_id, wallet_id, payment_method_id - ) - return row + + # pylint: disable=contextmanager-generator-missing-cleanup + async with insert_and_get_row_lifespan( + asyncpg_engine, + table=payments_methods, + values=raw_payment_method, + pk_col=payments_methods.c.payment_method_id, + pk_value=payment_method_id, + ) as row_data: + wallet_id = row_data["wallet_id"] + user_id = row_data["user_id"] + + async with asyncpg_engine.connect() as connection: + assert await _is_valid_payment_method( + connection, user_id, wallet_id, payment_method_id + ) + + yield PaymentMethodRow(row_data) async def test_payments_automation_workflow( - connection: SAConnection, payment_method: PaymentMethodRow + asyncpg_engine: AsyncEngine, payment_method: PaymentMethodRow ): payment_method_id = payment_method.payment_method_id wallet_id = payment_method.wallet_id - # get - auto_recharge = await _get_auto_recharge(connection, wallet_id) - assert auto_recharge is None - - # new - await _upsert_autorecharge( - connection, - wallet_id, - enabled=True, - primary_payment_method_id=payment_method_id, - top_up_amount_in_usd=100, - monthly_limit_in_usd=None, - ) - - auto_recharge = await _get_auto_recharge(connection, wallet_id) - assert auto_recharge is not None - assert auto_recharge.primary_payment_method_id == payment_method_id - assert auto_recharge.enabled is True - - # upsert: deactivate countdown - auto_recharge = await _upsert_autorecharge( - connection, - wallet_id, - enabled=True, - primary_payment_method_id=payment_method_id, - top_up_amount_in_usd=100, - monthly_limit_in_usd=10000, # <---- - ) - assert auto_recharge.monthly_limit_in_usd == 10000 - - await _update_autorecharge(connection, wallet_id, monthly_limit_in_usd=None) + async with asyncpg_engine.begin() as connection: + # get + auto_recharge = await _get_auto_recharge(connection, wallet_id) + assert auto_recharge is None + + # new + await _upsert_autorecharge( + connection, + wallet_id, + enabled=True, + primary_payment_method_id=payment_method_id, + top_up_amount_in_usd=100, + monthly_limit_in_usd=None, + ) + + auto_recharge = await _get_auto_recharge(connection, wallet_id) + assert auto_recharge is not None + assert auto_recharge.primary_payment_method_id == payment_method_id + assert auto_recharge.enabled is True + + # upsert: deactivate countdown + auto_recharge = await _upsert_autorecharge( + connection, + wallet_id, + enabled=True, + primary_payment_method_id=payment_method_id, + top_up_amount_in_usd=100, + monthly_limit_in_usd=10000, # <---- + ) + assert auto_recharge.monthly_limit_in_usd == 10000 + + await _update_autorecharge(connection, wallet_id, monthly_limit_in_usd=None) diff --git a/services/payments/src/simcore_service_payments/db/auto_recharge_repo.py b/services/payments/src/simcore_service_payments/db/auto_recharge_repo.py index aa98896cf133..be1f21185b1f 100644 --- a/services/payments/src/simcore_service_payments/db/auto_recharge_repo.py +++ b/services/payments/src/simcore_service_payments/db/auto_recharge_repo.py @@ -6,7 +6,7 @@ from models_library.users import UserID from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict, PositiveInt -from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStmts +from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStatements from .base import BaseRepository @@ -33,7 +33,7 @@ async def get_wallet_autorecharge( """ async with self.db_engine.begin() as conn: - stmt = AutoRechargeStmts.get_wallet_autorecharge(wallet_id) + stmt = AutoRechargeStatements.get_wallet_autorecharge(wallet_id) result = await conn.execute(stmt) row = result.first() return PaymentsAutorechargeDB.model_validate(row) if row else None @@ -50,7 +50,7 @@ async def replace_wallet_autorecharge( """ async with self.db_engine.begin() as conn: - stmt = AutoRechargeStmts.is_valid_payment_method( + stmt = AutoRechargeStatements.is_valid_payment_method( user_id=user_id, wallet_id=new.wallet_id, payment_method_id=new.primary_payment_method_id, @@ -61,7 +61,7 @@ async def replace_wallet_autorecharge( payment_method_id=new.primary_payment_method_id ) - stmt = AutoRechargeStmts.upsert_wallet_autorecharge( + stmt = AutoRechargeStatements.upsert_wallet_autorecharge( wallet_id=wallet_id, enabled=new.enabled, primary_payment_method_id=new.primary_payment_method_id, diff --git a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py index 4cd6f3f4ad4a..7cd076085250 100644 --- a/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py +++ b/services/web/server/src/simcore_service_webserver/payments/_autorecharge_db.py @@ -7,7 +7,7 @@ from models_library.users import UserID from models_library.wallets import WalletID from pydantic import BaseModel, ConfigDict, PositiveInt -from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStmts +from simcore_postgres_database.utils_payments_autorecharge import AutoRechargeStatements from simcore_postgres_database.utils_repos import ( pass_or_acquire_connection, transaction_context, @@ -39,7 +39,7 @@ async def get_wallet_autorecharge( wallet_id: WalletID, ) -> PaymentsAutorechargeGetDB | None: async with pass_or_acquire_connection(get_asyncpg_engine(app), connection) as conn: - stmt = AutoRechargeStmts.get_wallet_autorecharge(wallet_id) + stmt = AutoRechargeStatements.get_wallet_autorecharge(wallet_id) result = await conn.execute(stmt) row = result.one_or_none() return PaymentsAutorechargeGetDB.model_validate(row) if row else None @@ -59,7 +59,7 @@ async def replace_wallet_autorecharge( """ async with transaction_context(get_asyncpg_engine(app), connection) as conn: - stmt = AutoRechargeStmts.is_valid_payment_method( + stmt = AutoRechargeStatements.is_valid_payment_method( user_id=user_id, wallet_id=new.wallet_id, payment_method_id=new.primary_payment_method_id, @@ -70,7 +70,7 @@ async def replace_wallet_autorecharge( payment_method_id=new.primary_payment_method_id ) - stmt = AutoRechargeStmts.upsert_wallet_autorecharge( + stmt = AutoRechargeStatements.upsert_wallet_autorecharge( wallet_id=wallet_id, enabled=new.enabled, primary_payment_method_id=new.primary_payment_method_id, From e149e70fd873f26543b626e2b0647adb7605a0a4 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 19:05:11 +0200 Subject: [PATCH 11/12] Update packages/postgres-database/tests/test_models_payments_transactions.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../tests/test_models_payments_transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/postgres-database/tests/test_models_payments_transactions.py b/packages/postgres-database/tests/test_models_payments_transactions.py index 1ad923c7876e..2a484fa1de14 100644 --- a/packages/postgres-database/tests/test_models_payments_transactions.py +++ b/packages/postgres-database/tests/test_models_payments_transactions.py @@ -3,7 +3,7 @@ # pylint: disable=unexpected-keyword-arg # pylint: disable=unused-argument # pylint: disable=unused-variable -# pytlin: disable=protected-access +# pylint: disable=protected-access import decimal from collections.abc import Callable From 0208ce7ed5de0db46e1d73f79e11d3674022577e Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Mon, 6 Oct 2025 19:05:21 +0200 Subject: [PATCH 12/12] Update packages/postgres-database/tests/test_models_payments_transactions.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../postgres-database/tests/test_models_payments_transactions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/postgres-database/tests/test_models_payments_transactions.py b/packages/postgres-database/tests/test_models_payments_transactions.py index 2a484fa1de14..e435acdc4fb8 100644 --- a/packages/postgres-database/tests/test_models_payments_transactions.py +++ b/packages/postgres-database/tests/test_models_payments_transactions.py @@ -262,4 +262,3 @@ async def test_get_user_payments_transactions_with_pagination_options( connection, user_id=user_id, limit=0 ) assert not rows - assert not rows