Skip to content

Commit f976a56

Browse files
GitHKAndrei Neagu
andauthored
⬆️ upgrading rabbitmq to 4.1.2 (ITISFoundation#8109)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 26bc00b commit f976a56

File tree

10 files changed

+53
-26
lines changed

10 files changed

+53
-26
lines changed

packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88

99
import arrow
1010
from faststream.exceptions import NackMessage, RejectMessage
11-
from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange, RabbitRouter
11+
from faststream.rabbit import (
12+
ExchangeType,
13+
RabbitBroker,
14+
RabbitExchange,
15+
RabbitQueue,
16+
RabbitRouter,
17+
)
1218
from pydantic import NonNegativeInt
1319
from servicelib.logging_utils import log_catch, log_context
1420
from servicelib.redis import RedisClientSDK
@@ -149,10 +155,14 @@ def __init__(
149155
self._global_resources_prefix = f"{calling_module_name}"
150156

151157
self.common_exchange = RabbitExchange(
152-
f"{self._global_resources_prefix}_common", type=ExchangeType.DIRECT
158+
f"{self._global_resources_prefix}_common",
159+
durable=True,
160+
type=ExchangeType.DIRECT,
153161
)
154162
self.cancellation_exchange = RabbitExchange(
155-
f"{self._global_resources_prefix}_cancellation", type=ExchangeType.FANOUT
163+
f"{self._global_resources_prefix}_cancellation",
164+
durable=True,
165+
type=ExchangeType.FANOUT,
156166
)
157167

158168
def patch_based_deferred_handlers(self) -> None:
@@ -243,8 +253,10 @@ def un_patch_base_deferred_handlers(cls) -> None:
243253
subclass.is_present.original_is_present # type: ignore
244254
)
245255

246-
def _get_global_queue_name(self, queue_name: _FastStreamRabbitQueue) -> str:
247-
return f"{self._global_resources_prefix}_{queue_name}"
256+
def _get_global_queue(self, queue_name: _FastStreamRabbitQueue) -> RabbitQueue:
257+
return RabbitQueue(
258+
f"{self._global_resources_prefix}_{queue_name}", durable=True
259+
)
248260

249261
def __get_subclass(
250262
self, class_unique_reference: ClassUniqueReference
@@ -259,7 +271,7 @@ async def __publish_to_queue(
259271
) -> None:
260272
await self.broker.publish(
261273
task_uid,
262-
queue=self._get_global_queue_name(queue),
274+
queue=self._get_global_queue(queue),
263275
exchange=(
264276
self.cancellation_exchange
265277
if queue == _FastStreamRabbitQueue.MANUALLY_CANCELLED
@@ -569,47 +581,43 @@ def _register_subscribers(self) -> None:
569581
# pylint:disable=unexpected-keyword-arg
570582
# pylint:disable=no-value-for-parameter
571583
self._fs_handle_scheduled = self.router.subscriber(
572-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SCHEDULED),
584+
queue=self._get_global_queue(_FastStreamRabbitQueue.SCHEDULED),
573585
exchange=self.common_exchange,
574586
retry=True,
575587
)(self._fs_handle_scheduled)
576588

577589
self._fs_handle_submit_task = self.router.subscriber(
578-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SUBMIT_TASK),
590+
queue=self._get_global_queue(_FastStreamRabbitQueue.SUBMIT_TASK),
579591
exchange=self.common_exchange,
580592
retry=True,
581593
)(self._fs_handle_submit_task)
582594

583595
self._fs_handle_worker = self.router.subscriber(
584-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.WORKER),
596+
queue=self._get_global_queue(_FastStreamRabbitQueue.WORKER),
585597
exchange=self.common_exchange,
586598
retry=True,
587599
)(self._fs_handle_worker)
588600

589601
self._fs_handle_error_result = self.router.subscriber(
590-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.ERROR_RESULT),
602+
queue=self._get_global_queue(_FastStreamRabbitQueue.ERROR_RESULT),
591603
exchange=self.common_exchange,
592604
retry=True,
593605
)(self._fs_handle_error_result)
594606

595607
self._fs_handle_finished_with_error = self.router.subscriber(
596-
queue=self._get_global_queue_name(
597-
_FastStreamRabbitQueue.FINISHED_WITH_ERROR
598-
),
608+
queue=self._get_global_queue(_FastStreamRabbitQueue.FINISHED_WITH_ERROR),
599609
exchange=self.common_exchange,
600610
retry=True,
601611
)(self._fs_handle_finished_with_error)
602612

603613
self._fs_handle_deferred_result = self.router.subscriber(
604-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.DEFERRED_RESULT),
614+
queue=self._get_global_queue(_FastStreamRabbitQueue.DEFERRED_RESULT),
605615
exchange=self.common_exchange,
606616
retry=True,
607617
)(self._fs_handle_deferred_result)
608618

609619
self._fs_handle_manually_cancelled = self.router.subscriber(
610-
queue=self._get_global_queue_name(
611-
_FastStreamRabbitQueue.MANUALLY_CANCELLED
612-
),
620+
queue=self._get_global_queue(_FastStreamRabbitQueue.MANUALLY_CANCELLED),
613621
exchange=self.cancellation_exchange,
614622
retry=True,
615623
)(self._fs_handle_manually_cancelled)

packages/service-library/src/servicelib/rabbitmq/_client_rpc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def _rpc_initialize(self) -> None:
4949
self._channel = await self._connection.channel()
5050

5151
self._rpc = aio_pika.patterns.RPC(self._channel)
52-
await self._rpc.initialize()
52+
await self._rpc.initialize(durable=True, auto_delete=True)
5353

5454
async def close(self) -> None:
5555
with log_context(
@@ -134,6 +134,7 @@ async def register_handler(
134134
RPCNamespacedMethodName.from_namespace_and_method(namespace, method_name),
135135
handler,
136136
auto_delete=True,
137+
durable=True,
137138
)
138139

139140
async def register_router(

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ async def declare_queue(
8585
# NOTE: setting a name will ensure multiple instance will take their data here
8686
queue_parameters |= {"name": queue_name}
8787

88+
# avoids deprecated `transient_nonexcl_queues` warning in RabbitMQ
89+
if (
90+
queue_parameters.get("durable", False) is False
91+
and queue_parameters.get("exclusive", False) is False
92+
):
93+
msg = (
94+
"Queue must be `durable` or `exclusive`, but not both. "
95+
"This is to avoid the `transient_nonexcl_queues` warning. "
96+
"NOTE: if both `durable` and `exclusive` are missing they are considered False. "
97+
f"{queue_parameters=}"
98+
)
99+
raise ValueError(msg)
100+
88101
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
89102
# most likely someone changed the signature of the queues (parameters etc...)
90103
# Safest way to deal with it:

packages/service-library/tests/deferred_tasks/test__utils.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def _() -> AsyncIterator[RabbitBroker]:
5757

5858
@pytest.fixture
5959
def rabbit_exchange() -> RabbitExchange:
60-
return RabbitExchange("test_exchange")
60+
return RabbitExchange("test_exchange", durable=True, auto_delete=True)
6161

6262

6363
async def _assert_call_count(
@@ -256,7 +256,12 @@ async def test_fan_out_exchange_message_delivery(
256256
handler_1_call_count = Mock()
257257
handler_2_call_count = Mock()
258258

259-
fan_out_exchange = RabbitExchange("test_fan_out_exchange", type=ExchangeType.FANOUT)
259+
fan_out_exchange = RabbitExchange(
260+
"test_fan_out_exchange",
261+
type=ExchangeType.FANOUT,
262+
durable=True,
263+
auto_delete=True,
264+
)
260265

261266
@rabbit_broker.subscriber(queue="handler_1", exchange=fan_out_exchange, retry=True)
262267
async def handler_1(sleep_duration: float) -> None:

services/autoscaling/tests/manual/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
rabbit:
3-
image: itisfoundation/rabbitmq:3.13.7-management
3+
image: itisfoundation/rabbitmq:4.1.2-management
44
init: true
55
hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}"
66
ports:

services/director-v2/docker-compose-extra.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ services:
2424
"log_line_prefix=[%p] [%a] [%c] [%x] "
2525
]
2626
rabbit:
27-
image: itisfoundation/rabbitmq:3.13.7-management
27+
image: itisfoundation/rabbitmq:4.1.2-management
2828
init: true
2929
environment:
3030
- RABBITMQ_DEFAULT_USER=${RABBIT_USER}

services/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1288,7 +1288,7 @@ services:
12881288
networks: *storage_networks
12891289

12901290
rabbit:
1291-
image: itisfoundation/rabbitmq:3.13.7-management
1291+
image: itisfoundation/rabbitmq:4.1.2-management
12921292
init: true
12931293
hostname: "{{.Node.Hostname}}-{{.Task.Slot}}"
12941294
environment:

services/web/server/tests/unit/with_dbs/docker-compose-devel.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ services:
8888
- "18081:8081"
8989

9090
rabbit:
91-
image: itisfoundation/rabbitmq:3.13.7-management
91+
image: itisfoundation/rabbitmq:4.1.2-management
9292
init: true
9393
environment:
9494
- RABBITMQ_DEFAULT_USER=admin

services/web/server/tests/unit/with_dbs/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,5 @@ services:
5353
"${TEST_REDIS_PASSWORD}"
5454
]
5555
rabbit:
56-
image: itisfoundation/rabbitmq:3.13.7-management
56+
image: itisfoundation/rabbitmq:4.1.2-management
5757
init: true

tests/swarm-deploy/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def simcore_stack_deployed_services(
111111
# logs table like
112112
# ID NAME IMAGE NODE DESIRED STATE CURRENT STATE ERROR
113113
# xbrhmaygtb76 simcore_sidecar.1 itisfoundation/sidecar:latest crespo-wkstn Running Running 53 seconds ago
114-
# zde7p8qdwk4j simcore_rabbit.1 itisfoundation/rabbitmq:3.13.7-management crespo-wkstn Running Running 59 seconds ago
114+
# zde7p8qdwk4j simcore_rabbit.1 itisfoundation/rabbitmq:4.1.2-management crespo-wkstn Running Running 59 seconds ago
115115
# f2gxmhwq7hhk simcore_postgres.1 postgres:10.10 crespo-wkstn Running Running about a minute ago
116116
# 1lh2hulxmc4q simcore_director.1 itisfoundation/director:latest crespo-wkstn Running Running 34 seconds ago
117117
# ...

0 commit comments

Comments
 (0)