Skip to content

Commit a48e192

Browse files
✨ introduce rabbit exchange for reaching 0 credits (#4887)
1 parent 90a73d7 commit a48e192

File tree

7 files changed

+188
-12
lines changed

7 files changed

+188
-12
lines changed

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from abc import abstractmethod
44
from decimal import Decimal
5-
from enum import Enum, auto
5+
from enum import Enum, IntEnum, auto
66
from typing import Any, Literal, TypeAlias
77

88
import arrow
@@ -267,3 +267,29 @@ class WalletCreditsMessage(RabbitMessageBase):
267267

268268
def routing_key(self) -> str | None:
269269
return f"{self.wallet_id}"
270+
271+
272+
class CreditsLimit(IntEnum):
273+
MIN_CREDITS = 0
274+
275+
276+
class WalletCreditsLimitReachedMessage(RabbitMessageBase):
277+
channel_name: Literal["io.simcore.service.wallets-credit-limit-reached"] = Field(
278+
default="io.simcore.service.wallets-credit-limit-reached", const=True
279+
)
280+
created_at: datetime.datetime = Field(
281+
default_factory=lambda: arrow.utcnow().datetime,
282+
description="message creation datetime",
283+
)
284+
service_run_id: str = Field(
285+
..., description="uniquely identitifies the service run"
286+
)
287+
user_id: UserID
288+
project_id: ProjectID
289+
node_id: NodeID
290+
wallet_id: WalletID
291+
credits: Decimal
292+
credits_limit: CreditsLimit
293+
294+
def routing_key(self) -> str | None:
295+
return f"{self.wallet_id}.{self.credits_limit}"

scripts/release/monitor/monitor_release/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def get_settings(env_file, deployment):
6767
portainer_password=portainer_password,
6868
starts_with="production-simcore_production",
6969
swarm_stack_name="production-simcore",
70-
portainer_endpoint_version=1,
70+
portainer_endpoint_version=2,
7171
)
7272
if deployment == "aws-staging":
7373
portainer_url = os.getenv("AWS_STAGING_PORTAINER_URL")

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,12 @@ async def update_service_run_stopped_at(
174174
async def list_service_runs_by_product_and_user_and_wallet(
175175
self,
176176
product_name: ProductName,
177+
*,
177178
user_id: UserID | None,
178179
wallet_id: WalletID | None,
179180
offset: int,
180181
limit: int,
182+
service_run_status: ServiceRunStatus | None = None,
181183
) -> list[ServiceRunWithCreditsDB]:
182184
async with self.db_engine.begin() as conn:
183185
query = (
@@ -228,6 +230,11 @@ async def list_service_runs_by_product_and_user_and_wallet(
228230
query = query.where(
229231
resource_tracker_service_runs.c.wallet_id == wallet_id
230232
)
233+
if service_run_status:
234+
query = query.where(
235+
resource_tracker_service_runs.c.service_run_status
236+
== service_run_status
237+
)
231238

232239
result = await conn.execute(query)
233240

@@ -236,8 +243,10 @@ async def list_service_runs_by_product_and_user_and_wallet(
236243
async def total_service_runs_by_product_and_user_and_wallet(
237244
self,
238245
product_name: ProductName,
246+
*,
239247
user_id: UserID | None,
240248
wallet_id: WalletID | None,
249+
service_run_status: ServiceRunStatus | None = None,
241250
) -> PositiveInt:
242251
async with self.db_engine.begin() as conn:
243252
query = (
@@ -252,6 +261,11 @@ async def total_service_runs_by_product_and_user_and_wallet(
252261
query = query.where(
253262
resource_tracker_service_runs.c.wallet_id == wallet_id
254263
)
264+
if service_run_status:
265+
query = query.where(
266+
resource_tracker_service_runs.c.service_run_status
267+
== service_run_status
268+
)
255269

256270
result = await conn.execute(query)
257271
row = result.first()

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_process_messages.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from fastapi import FastAPI
77
from fastapi.encoders import jsonable_encoder
88
from models_library.rabbitmq_messages import (
9+
CreditsLimit,
910
RabbitResourceTrackingHeartbeatMessage,
1011
RabbitResourceTrackingMessages,
1112
RabbitResourceTrackingMessageType,
@@ -36,6 +37,7 @@
3637
from .modules.rabbitmq import RabbitMQClient, get_rabbitmq_client
3738
from .resource_tracker_utils import (
3839
make_negative,
40+
publish_to_rabbitmq_wallet_credits_limit_reached,
3941
sum_credit_transactions_and_publish_to_rabbitmq,
4042
)
4143

@@ -87,7 +89,7 @@ async def _process_start_event(
8789
user_id=msg.user_id,
8890
user_email=msg.user_email,
8991
project_id=msg.project_id,
90-
project_name=msg.product_name,
92+
project_name=msg.project_name,
9193
node_id=msg.node_id,
9294
node_name=msg.node_name,
9395
service_key=msg.service_key,
@@ -160,12 +162,21 @@ async def _process_heartbeat_event(
160162
update_credit_transaction
161163
)
162164
# Publish wallet total credits to RabbitMQ
163-
await sum_credit_transactions_and_publish_to_rabbitmq(
165+
wallet_total_credits = await sum_credit_transactions_and_publish_to_rabbitmq(
164166
resource_tracker_repo,
165167
rabbitmq_client,
166168
running_service.product_name,
167169
running_service.wallet_id,
168170
)
171+
if wallet_total_credits.available_osparc_credits < CreditsLimit.MIN_CREDITS:
172+
await publish_to_rabbitmq_wallet_credits_limit_reached(
173+
resource_tracker_repo,
174+
rabbitmq_client,
175+
product_name=running_service.product_name,
176+
wallet_id=running_service.wallet_id,
177+
credits_=wallet_total_credits.available_osparc_credits,
178+
credits_limit=CreditsLimit.MIN_CREDITS,
179+
)
169180

170181

171182
async def _process_stop_event(

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_utils.py

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,23 @@
1+
import asyncio
12
import logging
23
from datetime import datetime, timezone
4+
from decimal import Decimal
35

6+
from models_library.api_schemas_resource_usage_tracker.credit_transactions import (
7+
WalletTotalCredits,
8+
)
49
from models_library.products import ProductName
5-
from models_library.rabbitmq_messages import WalletCreditsMessage
10+
from models_library.projects import ProjectID
11+
from models_library.projects_nodes_io import NodeID
12+
from models_library.rabbitmq_messages import (
13+
CreditsLimit,
14+
WalletCreditsLimitReachedMessage,
15+
WalletCreditsMessage,
16+
)
17+
from models_library.resource_tracker import ServiceRunId, ServiceRunStatus
18+
from models_library.users import UserID
619
from models_library.wallets import WalletID
20+
from pydantic import PositiveInt
721
from servicelib.rabbitmq import RabbitMQClient
822

923
from .modules.db.repositories.resource_tracker import ResourceTrackerRepository
@@ -20,7 +34,7 @@ async def sum_credit_transactions_and_publish_to_rabbitmq(
2034
rabbitmq_client: RabbitMQClient,
2135
product_name: ProductName,
2236
wallet_id: WalletID,
23-
):
37+
) -> WalletTotalCredits:
2438
wallet_total_credits = (
2539
await resource_tracker_repo.sum_credit_transactions_by_product_and_wallet(
2640
product_name,
@@ -33,3 +47,74 @@ async def sum_credit_transactions_and_publish_to_rabbitmq(
3347
credits=wallet_total_credits.available_osparc_credits,
3448
)
3549
await rabbitmq_client.publish(publish_message.channel_name, publish_message)
50+
return wallet_total_credits
51+
52+
53+
_BATCH_SIZE = 20
54+
55+
56+
async def _publish_to_rabbitmq_wallet_credits_limit_reached(
57+
rabbitmq_client: RabbitMQClient,
58+
service_run_id: ServiceRunId,
59+
user_id: UserID,
60+
project_id: ProjectID,
61+
node_id: NodeID,
62+
wallet_id: WalletID,
63+
credits_: Decimal,
64+
credits_limit: CreditsLimit,
65+
):
66+
publish_message = WalletCreditsLimitReachedMessage(
67+
service_run_id=service_run_id,
68+
user_id=user_id,
69+
project_id=project_id,
70+
node_id=node_id,
71+
wallet_id=wallet_id,
72+
credits=credits_,
73+
credits_limit=credits_limit,
74+
)
75+
await rabbitmq_client.publish(publish_message.channel_name, publish_message)
76+
77+
78+
async def publish_to_rabbitmq_wallet_credits_limit_reached(
79+
resource_tracker_repo: ResourceTrackerRepository,
80+
rabbitmq_client: RabbitMQClient,
81+
product_name: ProductName,
82+
wallet_id: WalletID,
83+
credits_: Decimal,
84+
credits_limit: CreditsLimit,
85+
):
86+
# Get all current running services for that wallet
87+
total_count: PositiveInt = (
88+
await resource_tracker_repo.total_service_runs_by_product_and_user_and_wallet(
89+
product_name,
90+
user_id=None,
91+
wallet_id=wallet_id,
92+
service_run_status=ServiceRunStatus.RUNNING,
93+
)
94+
)
95+
96+
for offset in range(0, total_count, _BATCH_SIZE):
97+
batch_services = await resource_tracker_repo.list_service_runs_by_product_and_user_and_wallet(
98+
product_name,
99+
user_id=None,
100+
wallet_id=wallet_id,
101+
offset=offset,
102+
limit=_BATCH_SIZE,
103+
service_run_status=ServiceRunStatus.RUNNING,
104+
)
105+
106+
await asyncio.gather(
107+
*(
108+
_publish_to_rabbitmq_wallet_credits_limit_reached(
109+
rabbitmq_client=rabbitmq_client,
110+
service_run_id=service.service_run_id,
111+
user_id=service.user_id,
112+
project_id=service.project_id,
113+
node_id=service.node_id,
114+
wallet_id=wallet_id,
115+
credits_=credits_,
116+
credits_limit=credits_limit,
117+
)
118+
for service in batch_services
119+
)
120+
)

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/resource_tracker_service_runs.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,32 +30,44 @@ async def list_service_runs(
3030
# Situation when we want to see all usage of a specific user
3131
if wallet_id is None and access_all_wallet_usage is None:
3232
total_service_runs: PositiveInt = await resource_tacker_repo.total_service_runs_by_product_and_user_and_wallet(
33-
product_name, user_id, None
33+
product_name, user_id=user_id, wallet_id=None
3434
)
3535
service_runs_db_model: list[
3636
ServiceRunWithCreditsDB
3737
] = await resource_tacker_repo.list_service_runs_by_product_and_user_and_wallet(
38-
product_name, user_id, None, page_params.offset, page_params.limit
38+
product_name,
39+
user_id=user_id,
40+
wallet_id=None,
41+
offset=page_params.offset,
42+
limit=page_params.limit,
3943
)
4044
# Situation when accountant user can see all users usage of the wallet
4145
elif wallet_id and access_all_wallet_usage is True:
4246
total_service_runs: PositiveInt = await resource_tacker_repo.total_service_runs_by_product_and_user_and_wallet( # type: ignore[no-redef]
43-
product_name, None, wallet_id
47+
product_name, user_id=None, wallet_id=wallet_id
4448
)
4549
service_runs_db_model: list[ # type: ignore[no-redef]
4650
ServiceRunWithCreditsDB
4751
] = await resource_tacker_repo.list_service_runs_by_product_and_user_and_wallet(
48-
product_name, None, wallet_id, page_params.offset, page_params.limit
52+
product_name,
53+
user_id=None,
54+
wallet_id=wallet_id,
55+
offset=page_params.offset,
56+
limit=page_params.limit,
4957
)
5058
# Situation when regular user can see only his usage of the wallet
5159
elif wallet_id and access_all_wallet_usage is False:
5260
total_service_runs: PositiveInt = await resource_tacker_repo.total_service_runs_by_product_and_user_and_wallet( # type: ignore[no-redef]
53-
product_name, user_id, wallet_id
61+
product_name, user_id=user_id, wallet_id=wallet_id
5462
)
5563
service_runs_db_model: list[ # type: ignore[no-redef]
5664
ServiceRunWithCreditsDB
5765
] = await resource_tacker_repo.list_service_runs_by_product_and_user_and_wallet(
58-
product_name, user_id, wallet_id, page_params.offset, page_params.limit
66+
product_name,
67+
user_id=user_id,
68+
wallet_id=wallet_id,
69+
offset=page_params.offset,
70+
limit=page_params.limit,
5971
)
6072
else:
6173
msg = "wallet_id and access_all_wallet_usage parameters must be specified together"

services/resource-usage-tracker/tests/unit/with_dbs/test_process_rabbitmq_message_with_billing.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
from collections.abc import Callable, Iterator
33
from datetime import datetime, timezone
44
from decimal import Decimal
5+
from unittest import mock
56

67
import pytest
78
import sqlalchemy as sa
89
from models_library.rabbitmq_messages import (
910
RabbitResourceTrackingHeartbeatMessage,
1011
RabbitResourceTrackingStoppedMessage,
1112
SimcorePlatformStatus,
13+
WalletCreditsLimitReachedMessage,
1214
)
15+
from pytest_mock.plugin import MockerFixture
1316
from servicelib.rabbitmq import RabbitMQClient
1417
from simcore_postgres_database.models.resource_tracker_credit_transactions import (
1518
resource_tracker_credit_transactions,
@@ -34,6 +37,10 @@
3437
_process_start_event,
3538
_process_stop_event,
3639
)
40+
from tenacity._asyncio import AsyncRetrying
41+
from tenacity.retry import retry_if_exception_type
42+
from tenacity.stop import stop_after_delay
43+
from tenacity.wait import wait_fixed
3744

3845
from .conftest import assert_credit_transactions_db_row
3946

@@ -152,6 +159,11 @@ def resource_tracker_pricing_tables_db(postgres_db: sa.engine.Engine) -> Iterato
152159
con.execute(resource_tracker_credit_transactions.delete())
153160

154161

162+
@pytest.fixture
163+
async def mocked_message_parser(mocker: MockerFixture) -> mock.AsyncMock:
164+
return mocker.AsyncMock(return_value=True)
165+
166+
155167
async def test_process_event_functions(
156168
rabbitmq_client: Callable[[str], RabbitMQClient],
157169
random_rabbit_message_start,
@@ -160,9 +172,16 @@ async def test_process_event_functions(
160172
resource_tracker_service_run_db,
161173
resource_tracker_pricing_tables_db,
162174
initialized_app,
175+
mocked_message_parser,
163176
):
164177
engine = initialized_app.state.engine
165178
publisher = rabbitmq_client("publisher")
179+
consumer = rabbitmq_client("consumer")
180+
await consumer.subscribe(
181+
WalletCreditsLimitReachedMessage.get_channel_name(),
182+
mocked_message_parser,
183+
topics=["#"],
184+
)
166185

167186
msg = random_rabbit_message_start(
168187
wallet_id=1,
@@ -210,3 +229,12 @@ async def test_process_event_functions(
210229
)
211230
assert output.osparc_credits < first_credits_used
212231
assert output.transaction_status == "BILLED"
232+
233+
async for attempt in AsyncRetrying(
234+
wait=wait_fixed(0.1),
235+
stop=stop_after_delay(5),
236+
retry=retry_if_exception_type(AssertionError),
237+
reraise=True,
238+
):
239+
with attempt:
240+
mocked_message_parser.assert_called_once()

0 commit comments

Comments
 (0)