Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
from typing import Final, TypeAlias

import sqlalchemy as sa
from aiopg.sa.connection import SAConnection
from aiopg.sa.result import ResultProxy, RowProxy
import sqlalchemy.exc
from sqlalchemy.engine import Row
from sqlalchemy.ext.asyncio import AsyncConnection

from . import aiopg_errors
from .models.payments_transactions import PaymentTransactionState, payments_transactions

_logger = logging.getLogger(__name__)


PaymentID: TypeAlias = str
PaymentTransactionRow: TypeAlias = RowProxy
PaymentTransactionRow: TypeAlias = Row


UNSET: Final[str] = "__UNSET__"
Expand All @@ -39,7 +39,7 @@ class PaymentAlreadyAcked(PaymentFailure): ...


async def insert_init_payment_transaction(
connection: SAConnection,
connection: AsyncConnection,
*,
payment_id: str,
price_dollars: Decimal,
Expand All @@ -66,14 +66,14 @@ async def insert_init_payment_transaction(
initiated_at=initiated_at,
)
)
except aiopg_errors.UniqueViolation:
except sqlalchemy.exc.IntegrityError:
return PaymentAlreadyExists(payment_id)

return payment_id


async def update_payment_transaction_state(
connection: SAConnection,
connection: AsyncConnection,
*,
payment_id: str,
completion_state: PaymentTransactionState,
Expand Down Expand Up @@ -101,16 +101,15 @@ async def update_payment_transaction_state(
optional["invoice_url"] = invoice_url

async with connection.begin():
row = await (
await connection.execute(
sa.select(
payments_transactions.c.initiated_at,
payments_transactions.c.completed_at,
)
.where(payments_transactions.c.payment_id == payment_id)
.with_for_update()
result = await connection.execute(
sa.select(
payments_transactions.c.initiated_at,
payments_transactions.c.completed_at,
)
).fetchone()
.where(payments_transactions.c.payment_id == payment_id)
.with_for_update()
)
row = result.one_or_none()

if row is None:
return PaymentNotFound(payment_id=payment_id)
Expand All @@ -125,16 +124,13 @@ async def update_payment_transaction_state(
payments_transactions.update()
.values(completed_at=sa.func.now(), state=completion_state, **optional)
.where(payments_transactions.c.payment_id == payment_id)
.returning(sa.literal_column("*"))
.returning(payments_transactions)
)
row = await result.first()
assert row, "execute above should have caught this" # nosec
assert isinstance(row, RowProxy) # nosec
return row
return result.one()


async def get_user_payments_transactions(
connection: SAConnection,
connection: AsyncConnection,
*,
user_id: int,
offset: int | None = None,
Expand All @@ -149,7 +145,7 @@ async def get_user_payments_transactions(

# NOTE: what if between these two calls there are new rows? can we get this in an atomic call?å
stmt = (
payments_transactions.select()
sa.select(payments_transactions)
.where(payments_transactions.c.user_id == user_id)
.order_by(payments_transactions.c.created.desc())
) # newest first
Expand All @@ -162,6 +158,6 @@ async def get_user_payments_transactions(
# InvalidRowCountInLimitClause: LIMIT must not be negative
stmt = stmt.limit(limit)

result: ResultProxy = await connection.execute(stmt)
rows = await result.fetchall() or []
result = await connection.execute(stmt)
rows = result.fetchall()
return total_number_of_items, rows
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sqlalchemy.dialects.postgresql import insert as pg_insert


class AutoRechargeStmts:
class AutoRechargeStatements:
@staticmethod
def is_valid_payment_method(user_id, wallet_id, payment_method_id) -> sa.sql.Select:
return sa.select(payments_methods.c.payment_method_id).where(
Expand Down
Loading
Loading