Skip to content

Commit 2e56085

Browse files
review @GitHK
1 parent fcb831e commit 2e56085

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
_logger = logging.getLogger(__name__)
3939

4040
_APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers"
41+
APP_WALLET_SUBSCRIPTIONS_KEY = "wallet_subscriptions"
42+
APP_WALLET_SUBSCRIPTION_LOCK_KEY = "wallet_subscription_lock"
4143

4244

4345
async def _convert_to_node_update_event(
@@ -195,8 +197,10 @@ async def on_cleanup_ctx_rabbitmq_consumers(
195197
app, _EXCHANGE_TO_PARSER_CONFIG
196198
)
197199

198-
app["wallet_subscriptions"] = defaultdict(int) # wallet_id -> subscriber count
199-
app["wallet_subscription_lock"] = asyncio.Lock() # For thread-safe operations
200+
app[APP_WALLET_SUBSCRIPTIONS_KEY] = defaultdict(
201+
int
202+
) # wallet_id -> subscriber count
203+
app[APP_WALLET_SUBSCRIPTION_LOCK_KEY] = asyncio.Lock() # For thread-safe operations
200204

201205
yield
202206

services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,19 @@
77
from servicelib.rabbitmq import RabbitMQClient
88

99
from ..rabbitmq import get_rabbitmq_client
10+
from ._rabbitmq_exclusive_queue_consumers import (
11+
APP_WALLET_SUBSCRIPTION_LOCK_KEY,
12+
APP_WALLET_SUBSCRIPTIONS_KEY,
13+
)
1014

1115
_logger = logging.getLogger(__name__)
1216

1317

1418
async def subscribe(app: web.Application, wallet_id: WalletID) -> None:
1519

16-
async with app["wallet_subscription_lock"]:
17-
counter = app["wallet_subscriptions"][wallet_id]
18-
app["wallet_subscriptions"][wallet_id] += 1
20+
async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]:
21+
counter = app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id]
22+
app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] += 1
1923

2024
if counter == 0: # First subscriber
2125
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
@@ -26,10 +30,10 @@ async def subscribe(app: web.Application, wallet_id: WalletID) -> None:
2630

2731
async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None:
2832

29-
async with app["wallet_subscription_lock"]:
30-
counter = app["wallet_subscriptions"].get(wallet_id, 0)
33+
async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]:
34+
counter = app[APP_WALLET_SUBSCRIPTIONS_KEY].get(wallet_id, 0)
3135
if counter > 0:
32-
app["wallet_subscriptions"][wallet_id] -= 1
36+
app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] -= 1
3337

3438
if counter == 1: # Last subscriber
3539
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)

0 commit comments

Comments
 (0)