Skip to content

Commit 9bf767c

Browse files
authored
ensure instrumentation messages are non exclusive queues (ITISFoundation#3685)
1 parent 2fae099 commit 9bf767c

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

packages/service-library/src/servicelib/rabbitmq.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,15 @@ async def ping(self) -> bool:
9494
return connection.connected.is_set()
9595

9696
async def subscribe(
97-
self, exchange_name: str, message_handler: MessageHandler
97+
self,
98+
exchange_name: str,
99+
message_handler: MessageHandler,
100+
exclusive_queue: bool = True,
98101
) -> None:
102+
"""subscribe to exchange_name calling message_handler for every incoming message
103+
- exclusive_queue: True means that every instance of this application will receive the incoming messages
104+
- exclusive_queue: False means that only one instance of this application will reveice the incoming message
105+
"""
99106
assert self._channel_pool # nosec
100107
async with self._channel_pool.acquire() as channel:
101108
channel: aio_pika.RobustChannel
@@ -110,11 +117,15 @@ async def subscribe(
110117
# consumer/publisher must set the same configuration for same queue
111118
# exclusive means that the queue is only available for THIS very client
112119
# and will be deleted when the client disconnects
113-
queue = await channel.declare_queue(
120+
queue_parameters = dict(
114121
durable=True,
115-
exclusive=True,
122+
exclusive=exclusive_queue,
116123
arguments={"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S},
117124
)
125+
if not exclusive_queue:
126+
# NOTE: setting a name will ensure multiple instance will take their data here
127+
queue_parameters |= {"name": exchange_name}
128+
queue = await channel.declare_queue(**queue_parameters)
118129
await queue.bind(exchange)
119130

120131
async def _on_message(

services/web/server/src/simcore_service_webserver/computation_subscribe.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,22 @@ async def events_message_parser(app: web.Application, data: bytes) -> bool:
119119
(
120120
LoggerRabbitMessage.get_channel_name(),
121121
log_message_parser,
122-
{"no_ack": True},
122+
{},
123123
),
124124
(
125125
ProgressRabbitMessage.get_channel_name(),
126126
progress_message_parser,
127-
{"no_ack": True},
127+
{},
128128
),
129129
(
130130
InstrumentationRabbitMessage.get_channel_name(),
131131
instrumentation_message_parser,
132-
{"no_ack": False},
132+
dict(exclusive_queue=False),
133133
),
134134
(
135135
EventRabbitMessage.get_channel_name(),
136136
events_message_parser,
137-
{"no_ack": False},
137+
{},
138138
),
139139
)
140140

@@ -151,9 +151,9 @@ async def setup_rabbitmq_consumer(app: web.Application) -> AsyncIterator[None]:
151151
):
152152
rabbit_client = RabbitMQClient("webserver", settings)
153153

154-
for exchange_name, parser_fct, _exchange_kwargs in EXCHANGE_TO_PARSER_CONFIG:
154+
for exchange_name, parser_fct, queue_kwargs in EXCHANGE_TO_PARSER_CONFIG:
155155
await rabbit_client.subscribe(
156-
exchange_name, functools.partial(parser_fct, app)
156+
exchange_name, functools.partial(parser_fct, app), **queue_kwargs
157157
)
158158

159159
yield

0 commit comments

Comments
 (0)