Skip to content

Commit 32a63e9

Browse files
improvements and tests
1 parent d7c6e36 commit 32a63e9

File tree

19 files changed

+632
-91
lines changed

19 files changed

+632
-91
lines changed

packages/postgres-database/src/simcore_postgres_database/migration/versions/a3a58471b0f1_add_credit_transaction_classification_.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""add credit transaction classification enums
22
33
Revision ID: a3a58471b0f1
4-
Revises: 307017ee1a49
4+
Revises: f19905923355
55
Create Date: 2025-01-14 13:44:05.025647+00:00
66
77
"""
@@ -10,7 +10,7 @@
1010

1111
# revision identifiers, used by Alembic.
1212
revision = "a3a58471b0f1"
13-
down_revision = "307017ee1a49"
13+
down_revision = "f19905923355"
1414
branch_labels = None
1515
depends_on = None
1616

packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/resource_usage_tracker/credit_transactions.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,34 @@ async def get_wallet_total_credits(
3232
*,
3333
product_name: ProductName,
3434
wallet_id: WalletID,
35-
transaction_status: CreditTransactionStatus | None = None,
36-
project_id: ProjectID | None = None,
3735
) -> WalletTotalCredits:
3836
result = await rabbitmq_rpc_client.request(
3937
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
4038
_RPC_METHOD_NAME_ADAPTER.validate_python("get_wallet_total_credits"),
4139
product_name=product_name,
4240
wallet_id=wallet_id,
43-
transaction_status=transaction_status,
41+
timeout_s=_DEFAULT_TIMEOUT_S,
42+
)
43+
assert isinstance(result, WalletTotalCredits) # nosec
44+
return result
45+
46+
47+
@log_decorator(_logger, level=logging.DEBUG)
48+
async def get_project_wallet_total_credits(
49+
rabbitmq_rpc_client: RabbitMQRPCClient,
50+
*,
51+
product_name: ProductName,
52+
wallet_id: WalletID,
53+
project_id: ProjectID,
54+
transaction_status: CreditTransactionStatus | None = None,
55+
) -> WalletTotalCredits:
56+
result = await rabbitmq_rpc_client.request(
57+
RESOURCE_USAGE_TRACKER_RPC_NAMESPACE,
58+
_RPC_METHOD_NAME_ADAPTER.validate_python("get_project_wallet_total_credits"),
59+
product_name=product_name,
60+
wallet_id=wallet_id,
4461
project_id=project_id,
62+
transaction_status=transaction_status,
4563
timeout_s=_DEFAULT_TIMEOUT_S,
4664
)
4765
assert isinstance(result, WalletTotalCredits) # nosec

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rest/_resource_tracker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
async def get_credit_transactions_sum(
3535
wallet_total_credits: Annotated[
3636
WalletTotalCredits,
37-
Depends(credit_transactions.sum_credit_transactions_by_product_and_wallet),
37+
Depends(credit_transactions.sum_wallet_credits),
3838
],
3939
):
4040
return wallet_total_credits

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/api/rpc/_credit_transactions.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from models_library.wallets import WalletID
1010
from servicelib.rabbitmq import RPCRouter
1111

12-
from ...services import credit_transactions
12+
from ...services import credit_transactions, service_runs
1313

1414
router = RPCRouter()
1515

@@ -20,20 +20,33 @@ async def get_wallet_total_credits(
2020
*,
2121
product_name: ProductName,
2222
wallet_id: WalletID,
23-
# internal filters
23+
) -> WalletTotalCredits:
24+
return await credit_transactions.sum_wallet_credits(
25+
db_engine=app.state.engine,
26+
product_name=product_name,
27+
wallet_id=wallet_id,
28+
)
29+
30+
31+
@router.expose(reraise_if_error_type=())
32+
async def get_project_wallet_total_credits(
33+
app: FastAPI,
34+
*,
35+
product_name: ProductName,
36+
wallet_id: WalletID,
37+
project_id: ProjectID,
2438
transaction_status: CreditTransactionStatus | None = None,
25-
project_id: ProjectID | None = None,
2639
) -> WalletTotalCredits:
27-
return await credit_transactions.sum_credit_transactions_by_product_and_wallet(
40+
return await service_runs.sum_project_wallet_total_credits(
2841
db_engine=app.state.engine,
2942
product_name=product_name,
3043
wallet_id=wallet_id,
31-
transaction_status=transaction_status,
3244
project_id=project_id,
45+
transaction_status=transaction_status,
3346
)
3447

3548

36-
@router.expose(reraise_if_error_type=())
49+
@router.expose(reraise_if_error_type=(ValueError,))
3750
async def pay_project_debt(
3851
app: FastAPI,
3952
*,

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/credit_transactions.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
from ..api.rest.dependencies import get_resource_tracker_db_engine
2222
from ..models.credit_transactions import CreditTransactionCreate
23+
from ..services.modules.db import service_runs_db
2324
from .modules.db import credit_transactions_db
2425
from .modules.rabbitmq import get_rabbitmq_client_from_request
25-
from .utils import make_negative, sum_credit_transactions_and_publish_to_rabbitmq
26+
from .utils import sum_credit_transactions_and_publish_to_rabbitmq
2627

2728

2829
async def create_credit_transaction(
@@ -74,21 +75,16 @@ async def create_credit_transaction(
7475
return transaction_id
7576

7677

77-
async def sum_credit_transactions_by_product_and_wallet(
78+
async def sum_wallet_credits(
7879
db_engine: Annotated[AsyncEngine, Depends(get_resource_tracker_db_engine)],
7980
*,
8081
product_name: ProductName,
8182
wallet_id: WalletID,
82-
# attribute filters
83-
transaction_status: CreditTransactionStatus | None = None,
84-
project_id: ProjectID | None = None,
8583
) -> WalletTotalCredits:
86-
return await credit_transactions_db.sum_credit_transactions_by_product_and_wallet(
84+
return await credit_transactions_db.sum_wallet_credits(
8785
db_engine,
8886
product_name=product_name,
8987
wallet_id=wallet_id,
90-
transaction_status=transaction_status,
91-
project_id=project_id,
9288
)
9389

9490

@@ -102,27 +98,42 @@ async def pay_project_debt(
10298
):
10399
# NOTE: `current_wallet_transaction` is the Wallet in DEBT
104100

105-
total_project_debt_amount = (
106-
await credit_transactions_db.sum_credit_transactions_by_product_and_wallet(
107-
db_engine,
108-
product_name=current_wallet_transaction.product_name,
109-
wallet_id=current_wallet_transaction.wallet_id,
110-
transaction_status=CreditTransactionStatus.IN_DEBT,
111-
project_id=project_id,
112-
)
101+
total_project_debt_amount = await service_runs_db.sum_project_wallet_total_credits(
102+
db_engine,
103+
product_name=current_wallet_transaction.product_name,
104+
wallet_id=current_wallet_transaction.wallet_id,
105+
project_id=project_id,
106+
transaction_status=CreditTransactionStatus.IN_DEBT,
113107
)
114108

115109
if (
116110
total_project_debt_amount.available_osparc_credits
117111
!= new_wallet_transaction.osparc_credits
118112
):
119-
msg = f"Project DEBT of {total_project_debt_amount.available_osparc_credits} does not equal to payment: new_wallet {new_wallet_transaction.osparc_credits}, current wallet {current_wallet_transaction.osparc_credits}"
113+
msg = f"Project DEBT of {total_project_debt_amount.available_osparc_credits} does not equal to payment: new_wallet {new_wallet_transaction.wallet_id} credits {new_wallet_transaction.osparc_credits}, current wallet {current_wallet_transaction.wallet_id} credits {current_wallet_transaction.osparc_credits}"
120114
raise ValueError(msg)
121115
if (
122-
make_negative(total_project_debt_amount.available_osparc_credits)
116+
-total_project_debt_amount.available_osparc_credits
123117
!= current_wallet_transaction.osparc_credits
124118
):
125-
msg = f"Project DEBT of {total_project_debt_amount.available_osparc_credits} does not equal to payment: new_wallet {new_wallet_transaction.osparc_credits}, current wallet {current_wallet_transaction.osparc_credits}"
119+
msg = f"Project DEBT of {total_project_debt_amount.available_osparc_credits} does not equal to payment: new_wallet {new_wallet_transaction.wallet_id} credits {new_wallet_transaction.osparc_credits}, current wallet {current_wallet_transaction.wallet_id} credits {current_wallet_transaction.osparc_credits}"
120+
raise ValueError(msg)
121+
if current_wallet_transaction.product_name != new_wallet_transaction.product_name:
122+
msg = f"Currently we do not support credit exchange between different products. New wallet {new_wallet_transaction.wallet_id}, current wallet {current_wallet_transaction.wallet_id}"
123+
raise ValueError(msg)
124+
125+
# Does the new wallet has enough credits to pay the debt?
126+
new_wallet_total_credit_amount = await credit_transactions_db.sum_wallet_credits(
127+
db_engine,
128+
product_name=new_wallet_transaction.product_name,
129+
wallet_id=new_wallet_transaction.wallet_id,
130+
)
131+
if (
132+
new_wallet_total_credit_amount.available_osparc_credits
133+
+ total_project_debt_amount.available_osparc_credits
134+
< 0
135+
):
136+
msg = f"New wallet {new_wallet_transaction.wallet_id} doesn't have enough credits {new_wallet_total_credit_amount.available_osparc_credits} to pay the debt {total_project_debt_amount.available_osparc_credits} of current wallet {current_wallet_transaction.wallet_id}"
126137
raise ValueError(msg)
127138

128139
new_wallet_transaction_create = CreditTransactionCreate(

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/credit_transactions_db.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
from simcore_postgres_database.models.resource_tracker_credit_transactions import (
1414
resource_tracker_credit_transactions,
1515
)
16+
from simcore_postgres_database.models.resource_tracker_service_runs import (
17+
resource_tracker_service_runs,
18+
)
1619
from simcore_postgres_database.utils_repos import transaction_context
1720
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
1821

@@ -158,22 +161,19 @@ async def batch_update_credit_transaction_status_for_in_debt_transactions(
158161

159162
if project_id:
160163
update_stmt = update_stmt.where(
161-
resource_tracker_credit_transactions.c.project_id == f"{project_id}"
164+
resource_tracker_service_runs.c.project_id == f"{project_id}"
162165
)
163166
async with transaction_context(engine, connection) as conn:
164167
result = await conn.execute(update_stmt)
165168
print(result)
166169

167170

168-
async def sum_credit_transactions_by_product_and_wallet(
171+
async def sum_wallet_credits(
169172
engine: AsyncEngine,
170173
connection: AsyncConnection | None = None,
171174
*,
172175
product_name: ProductName,
173176
wallet_id: WalletID,
174-
# attribute filters
175-
transaction_status: CreditTransactionStatus | None = None,
176-
project_id: ProjectID | None = None,
177177
) -> WalletTotalCredits:
178178
async with transaction_context(engine, connection) as conn:
179179
sum_stmt = sa.select(
@@ -192,16 +192,6 @@ async def sum_credit_transactions_by_product_and_wallet(
192192
)
193193
)
194194

195-
if project_id:
196-
sum_stmt = sum_stmt.where(
197-
resource_tracker_credit_transactions.c.project_id == f"{project_id}"
198-
)
199-
if transaction_status:
200-
sum_stmt = sum_stmt.where(
201-
resource_tracker_credit_transactions.c.transaction_status
202-
== transaction_status
203-
)
204-
205195
result = await conn.execute(sum_stmt)
206196
row = result.first()
207197
if row is None or row[0] is None:

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/service_runs_db.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
# pylint: disable=too-many-arguments
21
import logging
32
from datetime import datetime
3+
4+
# pylint: disable=too-many-arguments
5+
from decimal import Decimal
46
from typing import cast
57

68
import sqlalchemy as sa
9+
from models_library.api_schemas_resource_usage_tracker.credit_transactions import (
10+
WalletTotalCredits,
11+
)
712
from models_library.api_schemas_storage import S3BucketName
813
from models_library.products import ProductName
914
from models_library.projects import ProjectID
@@ -434,6 +439,70 @@ async def get_osparc_credits_aggregated_by_service(
434439
)
435440

436441

442+
async def sum_project_wallet_total_credits(
443+
engine: AsyncEngine,
444+
connection: AsyncConnection | None = None,
445+
*,
446+
product_name: ProductName,
447+
wallet_id: WalletID,
448+
project_id: ProjectID,
449+
transaction_status: CreditTransactionStatus | None = None,
450+
) -> WalletTotalCredits:
451+
async with pass_or_acquire_connection(engine, connection) as conn:
452+
sum_stmt = (
453+
sa.select(
454+
sa.func.SUM(resource_tracker_credit_transactions.c.osparc_credits),
455+
)
456+
.select_from(
457+
resource_tracker_service_runs.join(
458+
resource_tracker_credit_transactions,
459+
(
460+
resource_tracker_service_runs.c.product_name
461+
== resource_tracker_credit_transactions.c.product_name
462+
)
463+
& (
464+
resource_tracker_service_runs.c.service_run_id
465+
== resource_tracker_credit_transactions.c.service_run_id
466+
),
467+
isouter=True,
468+
)
469+
)
470+
.where(
471+
(resource_tracker_service_runs.c.product_name == product_name)
472+
& (resource_tracker_service_runs.c.project_id == f"{project_id}")
473+
& (
474+
resource_tracker_credit_transactions.c.transaction_classification
475+
== CreditClassification.DEDUCT_SERVICE_RUN
476+
)
477+
& (resource_tracker_credit_transactions.c.wallet_id == wallet_id)
478+
)
479+
)
480+
481+
if transaction_status:
482+
sum_stmt = sum_stmt.where(
483+
resource_tracker_credit_transactions.c.transaction_status
484+
== transaction_status
485+
)
486+
else:
487+
sum_stmt = sum_stmt.where(
488+
resource_tracker_credit_transactions.c.transaction_status.in_(
489+
[
490+
CreditTransactionStatus.BILLED,
491+
CreditTransactionStatus.PENDING,
492+
CreditTransactionStatus.IN_DEBT,
493+
]
494+
)
495+
)
496+
497+
result = await conn.execute(sum_stmt)
498+
row = result.first()
499+
if row is None or row[0] is None:
500+
return WalletTotalCredits(
501+
wallet_id=wallet_id, available_osparc_credits=Decimal(0)
502+
)
503+
return WalletTotalCredits(wallet_id=wallet_id, available_osparc_credits=row[0])
504+
505+
437506
async def export_service_runs_table_to_s3(
438507
engine: AsyncEngine,
439508
connection: AsyncConnection | None = None,

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,10 @@ async def _process_stop_event(
293293
running_service.pricing_unit_cost,
294294
)
295295

296-
wallet_total_credits = (
297-
await credit_transactions_db.sum_credit_transactions_by_product_and_wallet(
298-
db_engine,
299-
product_name=running_service.product_name,
300-
wallet_id=running_service.wallet_id,
301-
)
296+
wallet_total_credits = await credit_transactions_db.sum_wallet_credits(
297+
db_engine,
298+
product_name=running_service.product_name,
299+
wallet_id=running_service.wallet_id,
302300
)
303301
_transaction_status = (
304302
CreditTransactionStatus.BILLED

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/service_runs.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
# pylint: disable=too-many-arguments
12
from datetime import UTC, datetime, timedelta
23

34
import shortuuid
45
from aws_library.s3 import SimcoreS3API
6+
from models_library.api_schemas_resource_usage_tracker.credit_transactions import (
7+
WalletTotalCredits,
8+
)
59
from models_library.api_schemas_resource_usage_tracker.service_runs import (
610
OsparcCreditsAggregatedByServiceGet,
711
OsparcCreditsAggregatedUsagesPage,
@@ -182,6 +186,23 @@ async def export_service_runs(
182186
)
183187

184188

189+
async def sum_project_wallet_total_credits(
190+
db_engine: AsyncEngine,
191+
*,
192+
product_name: ProductName,
193+
wallet_id: WalletID,
194+
project_id: ProjectID,
195+
transaction_status: CreditTransactionStatus | None = None,
196+
) -> WalletTotalCredits:
197+
return await service_runs_db.sum_project_wallet_total_credits(
198+
db_engine,
199+
product_name=product_name,
200+
wallet_id=wallet_id,
201+
project_id=project_id,
202+
transaction_status=transaction_status,
203+
)
204+
205+
185206
async def get_osparc_credits_aggregated_usages_page(
186207
user_id: UserID,
187208
product_name: ProductName,

0 commit comments

Comments
 (0)