diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 716d8e2adae6..50195ce4e95a 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -1,5 +1,7 @@ import logging +import random import socket +import string from typing import Any, Final import aio_pika @@ -19,6 +21,9 @@ RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS: Final[int] = 15 * _MINUTE * 1000 +CHARACTERS = string.ascii_letters + string.digits +_GENERATE_RANDOM_STRING_LENGTH = 6 + class RabbitMQRetryPolicyUponInitialization: """Retry policy upon service initialization""" @@ -51,7 +56,10 @@ async def wait_till_rabbitmq_responsive(url: str) -> bool: def get_rabbitmq_client_unique_name(base_name: str) -> str: - return f"{base_name}_{socket.gethostname()}" + random_string = "".join( + random.choice(CHARACTERS) for _ in range(_GENERATE_RANDOM_STRING_LENGTH) + ) + return f"{base_name}_{socket.gethostname()}_{random_string}" async def declare_queue( diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index d9a6b1f08618..633a0dcfa945 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -35,7 +35,7 @@ _logger = logging.getLogger(__name__) -_APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers" +_APP_RABBITMQ_EXCLUSIVE_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers" async def _convert_to_node_update_event( @@ -181,7 +181,7 @@ async def _unsubscribe_from_rabbitmq(app) -> None: await logged_gather( *( rabbit_client.unsubscribe(queue_name) - for queue_name in app[_APP_RABBITMQ_CONSUMERS_KEY].values() + for queue_name in app[_APP_RABBITMQ_EXCLUSIVE_CONSUMERS_KEY].values() ), ) @@ -189,7 +189,7 @@ async def _unsubscribe_from_rabbitmq(app) -> None: async def on_cleanup_ctx_rabbitmq_consumers( app: web.Application, ) -> AsyncIterator[None]: - app[_APP_RABBITMQ_CONSUMERS_KEY] = await subscribe_to_rabbitmq( + app[_APP_RABBITMQ_EXCLUSIVE_CONSUMERS_KEY] = await subscribe_to_rabbitmq( app, _EXCHANGE_TO_PARSER_CONFIG ) yield