Skip to content

Commit 38c3fc4

Browse files
Merge branch 'master' into extract-celery-code
2 parents 912d041 + 1a1b0b2 commit 38c3fc4

File tree

4 files changed

+106
-17
lines changed

4 files changed

+106
-17
lines changed

services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import logging
3+
from collections import defaultdict
24
from collections.abc import AsyncIterator, Generator
35
from typing import Final
46

@@ -36,6 +38,8 @@
3638
_logger = logging.getLogger(__name__)
3739

3840
_APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers"
41+
APP_WALLET_SUBSCRIPTIONS_KEY: Final[str] = "wallet_subscriptions"
42+
APP_WALLET_SUBSCRIPTION_LOCK_KEY: Final[str] = "wallet_subscription_lock"
3943

4044

4145
async def _convert_to_node_update_event(
@@ -192,6 +196,12 @@ async def on_cleanup_ctx_rabbitmq_consumers(
192196
app[_APP_RABBITMQ_CONSUMERS_KEY] = await subscribe_to_rabbitmq(
193197
app, _EXCHANGE_TO_PARSER_CONFIG
194198
)
199+
200+
app[APP_WALLET_SUBSCRIPTIONS_KEY] = defaultdict(
201+
int
202+
) # wallet_id -> subscriber count
203+
app[APP_WALLET_SUBSCRIPTION_LOCK_KEY] = asyncio.Lock() # Ensures exclusive access
204+
195205
yield
196206

197207
# cleanup

services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,37 @@
77
from servicelib.rabbitmq import RabbitMQClient
88

99
from ..rabbitmq import get_rabbitmq_client
10+
from ._rabbitmq_exclusive_queue_consumers import (
11+
APP_WALLET_SUBSCRIPTION_LOCK_KEY,
12+
APP_WALLET_SUBSCRIPTIONS_KEY,
13+
)
1014

1115
_logger = logging.getLogger(__name__)
1216

1317

14-
_SUBSCRIBABLE_EXCHANGES = [
15-
WalletCreditsMessage,
16-
]
17-
18-
1918
async def subscribe(app: web.Application, wallet_id: WalletID) -> None:
20-
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
2119

22-
for exchange in _SUBSCRIBABLE_EXCHANGES:
23-
exchange_name = exchange.get_channel_name()
24-
await rabbit_client.add_topics(exchange_name, topics=[f"{wallet_id}"])
20+
async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]:
21+
counter = app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id]
22+
app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] += 1
23+
24+
if counter == 0: # First subscriber
25+
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
26+
await rabbit_client.add_topics(
27+
WalletCreditsMessage.get_channel_name(), topics=[f"{wallet_id}"]
28+
)
2529

2630

2731
async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None:
28-
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
29-
for exchange in _SUBSCRIBABLE_EXCHANGES:
30-
exchange_name = exchange.get_channel_name()
31-
with log_catch(_logger, reraise=False):
32-
# NOTE: in case something bad happenned with the connection to the RabbitMQ server
33-
# such as a network disconnection. this call can fail.
34-
await rabbit_client.remove_topics(exchange_name, topics=[f"{wallet_id}"])
32+
33+
async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]:
34+
counter = app[APP_WALLET_SUBSCRIPTIONS_KEY].get(wallet_id, 0)
35+
if counter > 0:
36+
app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] -= 1
37+
38+
if counter == 1: # Last subscriber
39+
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
40+
with log_catch(_logger, reraise=False):
41+
await rabbit_client.remove_topics(
42+
WalletCreditsMessage.get_channel_name(), topics=[f"{wallet_id}"]
43+
)

services/web/server/src/simcore_service_webserver/projects/_projects_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,7 +1114,7 @@ async def patch_project_node(
11141114
project_id=project_id,
11151115
user_id=user_id,
11161116
product_name=product_name,
1117-
permission="write", # NOTE: MD: before only read was sufficient, double check this
1117+
permission="write",
11181118
)
11191119

11201120
# 2. If patching service key or version make sure it's valid
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
3+
# pylint: disable=unused-import
4+
import asyncio
5+
from unittest.mock import AsyncMock, patch
6+
7+
import pytest
8+
from models_library.wallets import WalletID
9+
from simcore_service_webserver.notifications import wallet_osparc_credits
10+
11+
12+
@pytest.fixture
13+
def app_with_wallets():
14+
app = {
15+
"wallet_subscription_lock": asyncio.Lock(),
16+
"wallet_subscriptions": {},
17+
}
18+
return app
19+
20+
21+
@pytest.fixture
22+
def wallet_id():
23+
return WalletID(1)
24+
25+
26+
async def test_subscribe_first_and_second(app_with_wallets, wallet_id):
27+
app = app_with_wallets
28+
app["wallet_subscriptions"][wallet_id] = 0
29+
mock_rabbit = AsyncMock()
30+
with patch(
31+
"simcore_service_webserver.notifications.wallet_osparc_credits.get_rabbitmq_client",
32+
return_value=mock_rabbit,
33+
):
34+
await wallet_osparc_credits.subscribe(app, wallet_id)
35+
mock_rabbit.add_topics.assert_awaited_once()
36+
# Second subscribe should not call add_topics again
37+
await wallet_osparc_credits.subscribe(app, wallet_id)
38+
assert mock_rabbit.add_topics.await_count == 1
39+
assert app["wallet_subscriptions"][wallet_id] == 2
40+
41+
42+
async def test_unsubscribe_last_and_not_last(app_with_wallets, wallet_id):
43+
app = app_with_wallets
44+
app["wallet_subscriptions"][wallet_id] = 2
45+
mock_rabbit = AsyncMock()
46+
with patch(
47+
"simcore_service_webserver.notifications.wallet_osparc_credits.get_rabbitmq_client",
48+
return_value=mock_rabbit,
49+
):
50+
# Not last unsubscribe
51+
await wallet_osparc_credits.unsubscribe(app, wallet_id)
52+
mock_rabbit.remove_topics.assert_not_awaited()
53+
assert app["wallet_subscriptions"][wallet_id] == 1
54+
# Last unsubscribe
55+
await wallet_osparc_credits.unsubscribe(app, wallet_id)
56+
mock_rabbit.remove_topics.assert_awaited_once()
57+
assert app["wallet_subscriptions"][wallet_id] == 0
58+
59+
60+
async def test_unsubscribe_when_not_subscribed(app_with_wallets, wallet_id):
61+
app = app_with_wallets
62+
# wallet_id not present
63+
mock_rabbit = AsyncMock()
64+
with patch(
65+
"simcore_service_webserver.notifications.wallet_osparc_credits.get_rabbitmq_client",
66+
return_value=mock_rabbit,
67+
):
68+
await wallet_osparc_credits.unsubscribe(app, wallet_id)
69+
mock_rabbit.remove_topics.assert_not_awaited()
70+
assert app["wallet_subscriptions"].get(wallet_id, 0) == 0

0 commit comments

Comments
 (0)