From 9cf273a162042a916a35fb581aaf0338980f4840 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 16 Jun 2025 18:24:54 +0200 Subject: [PATCH 01/11] fix bug --- .../_rabbitmq_exclusive_queue_consumers.py | 6 +++ .../notifications/wallet_osparc_credits.py | 37 +++++++++++-------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index 7c3925b8c50f..de6b5c633b61 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -1,4 +1,6 @@ +import asyncio import logging +from collections import defaultdict from collections.abc import AsyncIterator, Generator from typing import Final @@ -192,6 +194,10 @@ async def on_cleanup_ctx_rabbitmq_consumers( app[_APP_RABBITMQ_CONSUMERS_KEY] = await subscribe_to_rabbitmq( app, _EXCHANGE_TO_PARSER_CONFIG ) + + app["wallet_subscriptions"] = defaultdict(int) # wallet_id -> subscriber count + app["wallet_subscription_lock"] = asyncio.Lock() # For thread-safe operations + yield # cleanup diff --git a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py index f66293bf3775..830235aecf10 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py +++ b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py @@ -11,24 +11,29 @@ _logger = logging.getLogger(__name__) -_SUBSCRIBABLE_EXCHANGES = [ - WalletCreditsMessage, -] - - async def subscribe(app: web.Application, wallet_id: WalletID) -> None: - rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - for exchange in _SUBSCRIBABLE_EXCHANGES: - exchange_name = exchange.get_channel_name() - await rabbit_client.add_topics(exchange_name, topics=[f"{wallet_id}"]) + async with app["subscription_lock"]: + counter = app["wallet_subscriptions"][wallet_id] + app["wallet_subscriptions"][wallet_id] += 1 + + if counter == 0: # First subscriber + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + await rabbit_client.add_topics( + WalletCreditsMessage.get_channel_name(), topics=[f"{wallet_id}"] + ) async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None: - rabbit_client: RabbitMQClient = get_rabbitmq_client(app) - for exchange in _SUBSCRIBABLE_EXCHANGES: - exchange_name = exchange.get_channel_name() - with log_catch(_logger, reraise=False): - # NOTE: in case something bad happenned with the connection to the RabbitMQ server - # such as a network disconnection. this call can fail. - await rabbit_client.remove_topics(exchange_name, topics=[f"{wallet_id}"]) + + async with app["subscription_lock"]: + counter = app["wallet_subscriptions"].get(wallet_id, 0) + if counter > 0: + app["wallet_subscriptions"][wallet_id] -= 1 + + if counter == 1: # Last subscriber + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + with log_catch(_logger, reraise=False): + await rabbit_client.remove_topics( + WalletCreditsMessage.get_channel_name(), topics=[f"{wallet_id}"] + ) From 2a1a8e230c6ac1b85330773792249fc842ae3bae Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 16 Jun 2025 18:27:15 +0200 Subject: [PATCH 02/11] fix bug --- .../notifications/wallet_osparc_credits.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py index 830235aecf10..4b01aa45de8c 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py +++ b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py @@ -13,7 +13,7 @@ async def subscribe(app: web.Application, wallet_id: WalletID) -> None: - async with app["subscription_lock"]: + async with app["wallet_subscription_lock"]: counter = app["wallet_subscriptions"][wallet_id] app["wallet_subscriptions"][wallet_id] += 1 @@ -26,7 +26,7 @@ async def subscribe(app: web.Application, wallet_id: WalletID) -> None: async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None: - async with app["subscription_lock"]: + async with app["wallet_subscription_lock"]: counter = app["wallet_subscriptions"].get(wallet_id, 0) if counter > 0: app["wallet_subscriptions"][wallet_id] -= 1 From fcb831e742b2b41a985891cc6fe8193a803eb8eb Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 08:59:57 +0200 Subject: [PATCH 03/11] fix bug --- .../test_wallet_osparc_credits.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py diff --git a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py new file mode 100644 index 000000000000..007c6ce1f3f7 --- /dev/null +++ b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py @@ -0,0 +1,68 @@ +# Unit tests for wallet_osparc_credits.py subscribe and unsubscribe functions +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest +from models_library.wallets import WalletID +from simcore_service_webserver.notifications import wallet_osparc_credits + + +@pytest.fixture +def app_with_wallets(): + app = { + "wallet_subscription_lock": asyncio.Lock(), + "wallet_subscriptions": {}, + } + return app + + +@pytest.fixture +def wallet_id(): + return WalletID(1) + + +async def test_subscribe_first_and_second(app_with_wallets, wallet_id): + app = app_with_wallets + app["wallet_subscriptions"][wallet_id] = 0 + mock_rabbit = AsyncMock() + with patch( + "simcore_service_webserver.notifications.wallet_osparc_credits.get_rabbitmq_client", + return_value=mock_rabbit, + ): + await wallet_osparc_credits.subscribe(app, wallet_id) + mock_rabbit.add_topics.assert_awaited_once() + # Second subscribe should not call add_topics again + await wallet_osparc_credits.subscribe(app, wallet_id) + assert mock_rabbit.add_topics.await_count == 1 + assert app["wallet_subscriptions"][wallet_id] == 2 + + +async def test_unsubscribe_last_and_not_last(app_with_wallets, wallet_id): + app = app_with_wallets + app["wallet_subscriptions"][wallet_id] = 2 + mock_rabbit = AsyncMock() + with patch( + "simcore_service_webserver.notifications.wallet_osparc_credits.get_rabbitmq_client", + return_value=mock_rabbit, + ): + # Not last unsubscribe + await wallet_osparc_credits.unsubscribe(app, wallet_id) + mock_rabbit.remove_topics.assert_not_awaited() + assert app["wallet_subscriptions"][wallet_id] == 1 + # Last unsubscribe + await wallet_osparc_credits.unsubscribe(app, wallet_id) + mock_rabbit.remove_topics.assert_awaited_once() + assert app["wallet_subscriptions"][wallet_id] == 0 + + +async def test_unsubscribe_when_not_subscribed(app_with_wallets, wallet_id): + app = app_with_wallets + # wallet_id not present + mock_rabbit = AsyncMock() + with patch( + "simcore_service_webserver.notifications.wallet_osparc_credits.get_rabbitmq_client", + return_value=mock_rabbit, + ): + await wallet_osparc_credits.unsubscribe(app, wallet_id) + mock_rabbit.remove_topics.assert_not_awaited() + assert app["wallet_subscriptions"].get(wallet_id, 0) == 0 From 2e5608599a55f18fc41a7cf51916e741a6663936 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 09:11:47 +0200 Subject: [PATCH 04/11] review @GitHK --- .../_rabbitmq_exclusive_queue_consumers.py | 8 ++++++-- .../notifications/wallet_osparc_credits.py | 16 ++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index de6b5c633b61..acf4e605e16a 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -38,6 +38,8 @@ _logger = logging.getLogger(__name__) _APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers" +APP_WALLET_SUBSCRIPTIONS_KEY = "wallet_subscriptions" +APP_WALLET_SUBSCRIPTION_LOCK_KEY = "wallet_subscription_lock" async def _convert_to_node_update_event( @@ -195,8 +197,10 @@ async def on_cleanup_ctx_rabbitmq_consumers( app, _EXCHANGE_TO_PARSER_CONFIG ) - app["wallet_subscriptions"] = defaultdict(int) # wallet_id -> subscriber count - app["wallet_subscription_lock"] = asyncio.Lock() # For thread-safe operations + app[APP_WALLET_SUBSCRIPTIONS_KEY] = defaultdict( + int + ) # wallet_id -> subscriber count + app[APP_WALLET_SUBSCRIPTION_LOCK_KEY] = asyncio.Lock() # For thread-safe operations yield diff --git a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py index 4b01aa45de8c..2f314bd93330 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py +++ b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py @@ -7,15 +7,19 @@ from servicelib.rabbitmq import RabbitMQClient from ..rabbitmq import get_rabbitmq_client +from ._rabbitmq_exclusive_queue_consumers import ( + APP_WALLET_SUBSCRIPTION_LOCK_KEY, + APP_WALLET_SUBSCRIPTIONS_KEY, +) _logger = logging.getLogger(__name__) async def subscribe(app: web.Application, wallet_id: WalletID) -> None: - async with app["wallet_subscription_lock"]: - counter = app["wallet_subscriptions"][wallet_id] - app["wallet_subscriptions"][wallet_id] += 1 + async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]: + counter = app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] + app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] += 1 if counter == 0: # First subscriber rabbit_client: RabbitMQClient = get_rabbitmq_client(app) @@ -26,10 +30,10 @@ async def subscribe(app: web.Application, wallet_id: WalletID) -> None: async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None: - async with app["wallet_subscription_lock"]: - counter = app["wallet_subscriptions"].get(wallet_id, 0) + async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]: + counter = app[APP_WALLET_SUBSCRIPTIONS_KEY].get(wallet_id, 0) if counter > 0: - app["wallet_subscriptions"][wallet_id] -= 1 + app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] -= 1 if counter == 1: # Last subscriber rabbit_client: RabbitMQClient = get_rabbitmq_client(app) From 6a8401e75cffe5d9343b8b2926bbcb6c55a84c22 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 09:12:36 +0200 Subject: [PATCH 05/11] review @GitHK --- .../notifications/_rabbitmq_exclusive_queue_consumers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index acf4e605e16a..0f90118ff082 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -38,8 +38,8 @@ _logger = logging.getLogger(__name__) _APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers" -APP_WALLET_SUBSCRIPTIONS_KEY = "wallet_subscriptions" -APP_WALLET_SUBSCRIPTION_LOCK_KEY = "wallet_subscription_lock" +APP_WALLET_SUBSCRIPTIONS_KEY: Final[str] = "wallet_subscriptions" +APP_WALLET_SUBSCRIPTION_LOCK_KEY: Final[str] = "wallet_subscription_lock" async def _convert_to_node_update_event( From 7b02c946d0225e3623c259a08cf03c1570e041e4 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 09:37:21 +0200 Subject: [PATCH 06/11] review @sanderegg --- .../notifications/_rabbitmq_exclusive_queue_consumers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index 0f90118ff082..45c45bdfed1f 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -200,7 +200,7 @@ async def on_cleanup_ctx_rabbitmq_consumers( app[APP_WALLET_SUBSCRIPTIONS_KEY] = defaultdict( int ) # wallet_id -> subscriber count - app[APP_WALLET_SUBSCRIPTION_LOCK_KEY] = asyncio.Lock() # For thread-safe operations + app[APP_WALLET_SUBSCRIPTION_LOCK_KEY] = asyncio.Lock() # Ensures exclusive access yield From 9d0dab4fcf254349fd4856d6341b7c22686630a5 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 11:05:13 +0200 Subject: [PATCH 07/11] pylint ignore --- .../projects/_controller/nodes_rest.py | 1 + .../simcore_service_webserver/projects/_projects_service.py | 2 +- .../isolated/notifications/test_wallet_osparc_credits.py | 5 +++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py index 4e556e95d22d..b8b4f5b244fa 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py @@ -178,6 +178,7 @@ async def patch_project_node(request: web.Request) -> web.Response: path_params = parse_request_path_parameters_as(NodePathParams, request) node_patch = await parse_request_body_as(NodePatch, request) + # MD: Here is the issue await _projects_service.patch_project_node( request.app, product_name=req_ctx.product_name, diff --git a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py index 08d99259eb2c..7797ba4bf0ef 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_projects_service.py +++ b/services/web/server/src/simcore_service_webserver/projects/_projects_service.py @@ -1114,7 +1114,7 @@ async def patch_project_node( project_id=project_id, user_id=user_id, product_name=product_name, - permission="write", # NOTE: MD: before only read was sufficient, double check this + permission="write", ) # 2. If patching service key or version make sure it's valid diff --git a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py index 007c6ce1f3f7..40c1af264e17 100644 --- a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py +++ b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py @@ -1,5 +1,6 @@ -# Unit tests for wallet_osparc_credits.py subscribe and unsubscribe functions -import asyncio +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-importimport asyncio from unittest.mock import AsyncMock, patch import pytest From e0fff37520cec3efab93ffd61c8aeb00b0153bf0 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 11:06:47 +0200 Subject: [PATCH 08/11] pylint ignore --- .../unit/isolated/notifications/test_wallet_osparc_credits.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py index 40c1af264e17..f115a1b6967f 100644 --- a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py +++ b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py @@ -1,6 +1,7 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-importimport asyncio +import asyncio from unittest.mock import AsyncMock, patch import pytest From 6e85d350ce361c727a26bf598193887b85e0f76c Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 11:18:27 +0200 Subject: [PATCH 09/11] pylint ignore --- .../unit/isolated/notifications/test_wallet_osparc_credits.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py index f115a1b6967f..227bfbe484cf 100644 --- a/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py +++ b/services/web/server/tests/unit/isolated/notifications/test_wallet_osparc_credits.py @@ -1,6 +1,6 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -# pylint: disable=unused-importimport asyncio +# pylint: disable=unused-import import asyncio from unittest.mock import AsyncMock, patch From 98b191d5aaff8c29c12be8325f92fa953dfdcde1 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 11:38:55 +0200 Subject: [PATCH 10/11] review @pcrespov --- .../notifications/wallet_osparc_credits.py | 2 +- .../projects/_controller/nodes_rest.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py index 2f314bd93330..a8f9402e93f9 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py +++ b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py @@ -31,7 +31,7 @@ async def subscribe(app: web.Application, wallet_id: WalletID) -> None: async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None: async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]: - counter = app[APP_WALLET_SUBSCRIPTIONS_KEY].get(wallet_id, 0) + counter = app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] if counter > 0: app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] -= 1 diff --git a/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py b/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py index b8b4f5b244fa..4e556e95d22d 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py +++ b/services/web/server/src/simcore_service_webserver/projects/_controller/nodes_rest.py @@ -178,7 +178,6 @@ async def patch_project_node(request: web.Request) -> web.Response: path_params = parse_request_path_parameters_as(NodePathParams, request) node_patch = await parse_request_body_as(NodePatch, request) - # MD: Here is the issue await _projects_service.patch_project_node( request.app, product_name=req_ctx.product_name, From f98dc077929d0c445e86bb3581b24447f656744e Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 17 Jun 2025 12:50:15 +0200 Subject: [PATCH 11/11] defensive programming --- .../notifications/wallet_osparc_credits.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py index a8f9402e93f9..2f314bd93330 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py +++ b/services/web/server/src/simcore_service_webserver/notifications/wallet_osparc_credits.py @@ -31,7 +31,7 @@ async def subscribe(app: web.Application, wallet_id: WalletID) -> None: async def unsubscribe(app: web.Application, wallet_id: WalletID) -> None: async with app[APP_WALLET_SUBSCRIPTION_LOCK_KEY]: - counter = app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] + counter = app[APP_WALLET_SUBSCRIPTIONS_KEY].get(wallet_id, 0) if counter > 0: app[APP_WALLET_SUBSCRIPTIONS_KEY][wallet_id] -= 1