Skip to content

Commit c09126d

Browse files
author
Andrei Neagu
committed
wip: made queues durable
1 parent 9d55fc6 commit c09126d

File tree

4 files changed

+48
-20
lines changed

4 files changed

+48
-20
lines changed

packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88

99
import arrow
1010
from faststream.exceptions import NackMessage, RejectMessage
11-
from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange, RabbitRouter
11+
from faststream.rabbit import (
12+
ExchangeType,
13+
RabbitBroker,
14+
RabbitExchange,
15+
RabbitQueue,
16+
RabbitRouter,
17+
)
1218
from pydantic import NonNegativeInt
1319
from servicelib.logging_utils import log_catch, log_context
1420
from servicelib.redis import RedisClientSDK
@@ -149,10 +155,14 @@ def __init__(
149155
self._global_resources_prefix = f"{calling_module_name}"
150156

151157
self.common_exchange = RabbitExchange(
152-
f"{self._global_resources_prefix}_common", type=ExchangeType.DIRECT
158+
f"{self._global_resources_prefix}_common",
159+
durable=True,
160+
type=ExchangeType.DIRECT,
153161
)
154162
self.cancellation_exchange = RabbitExchange(
155-
f"{self._global_resources_prefix}_cancellation", type=ExchangeType.FANOUT
163+
f"{self._global_resources_prefix}_cancellation",
164+
durable=True,
165+
type=ExchangeType.FANOUT,
156166
)
157167

158168
def patch_based_deferred_handlers(self) -> None:
@@ -243,8 +253,10 @@ def un_patch_base_deferred_handlers(cls) -> None:
243253
subclass.is_present.original_is_present # type: ignore
244254
)
245255

246-
def _get_global_queue_name(self, queue_name: _FastStreamRabbitQueue) -> str:
247-
return f"{self._global_resources_prefix}_{queue_name}"
256+
def _get_global_queue(self, queue_name: _FastStreamRabbitQueue) -> RabbitQueue:
257+
return RabbitQueue(
258+
f"{self._global_resources_prefix}_{queue_name}", durable=True
259+
)
248260

249261
def __get_subclass(
250262
self, class_unique_reference: ClassUniqueReference
@@ -259,7 +271,7 @@ async def __publish_to_queue(
259271
) -> None:
260272
await self.broker.publish(
261273
task_uid,
262-
queue=self._get_global_queue_name(queue),
274+
queue=self._get_global_queue(queue),
263275
exchange=(
264276
self.cancellation_exchange
265277
if queue == _FastStreamRabbitQueue.MANUALLY_CANCELLED
@@ -569,47 +581,43 @@ def _register_subscribers(self) -> None:
569581
# pylint:disable=unexpected-keyword-arg
570582
# pylint:disable=no-value-for-parameter
571583
self._fs_handle_scheduled = self.router.subscriber(
572-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SCHEDULED),
584+
queue=self._get_global_queue(_FastStreamRabbitQueue.SCHEDULED),
573585
exchange=self.common_exchange,
574586
retry=True,
575587
)(self._fs_handle_scheduled)
576588

577589
self._fs_handle_submit_task = self.router.subscriber(
578-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SUBMIT_TASK),
590+
queue=self._get_global_queue(_FastStreamRabbitQueue.SUBMIT_TASK),
579591
exchange=self.common_exchange,
580592
retry=True,
581593
)(self._fs_handle_submit_task)
582594

583595
self._fs_handle_worker = self.router.subscriber(
584-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.WORKER),
596+
queue=self._get_global_queue(_FastStreamRabbitQueue.WORKER),
585597
exchange=self.common_exchange,
586598
retry=True,
587599
)(self._fs_handle_worker)
588600

589601
self._fs_handle_error_result = self.router.subscriber(
590-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.ERROR_RESULT),
602+
queue=self._get_global_queue(_FastStreamRabbitQueue.ERROR_RESULT),
591603
exchange=self.common_exchange,
592604
retry=True,
593605
)(self._fs_handle_error_result)
594606

595607
self._fs_handle_finished_with_error = self.router.subscriber(
596-
queue=self._get_global_queue_name(
597-
_FastStreamRabbitQueue.FINISHED_WITH_ERROR
598-
),
608+
queue=self._get_global_queue(_FastStreamRabbitQueue.FINISHED_WITH_ERROR),
599609
exchange=self.common_exchange,
600610
retry=True,
601611
)(self._fs_handle_finished_with_error)
602612

603613
self._fs_handle_deferred_result = self.router.subscriber(
604-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.DEFERRED_RESULT),
614+
queue=self._get_global_queue(_FastStreamRabbitQueue.DEFERRED_RESULT),
605615
exchange=self.common_exchange,
606616
retry=True,
607617
)(self._fs_handle_deferred_result)
608618

609619
self._fs_handle_manually_cancelled = self.router.subscriber(
610-
queue=self._get_global_queue_name(
611-
_FastStreamRabbitQueue.MANUALLY_CANCELLED
612-
),
620+
queue=self._get_global_queue(_FastStreamRabbitQueue.MANUALLY_CANCELLED),
613621
exchange=self.cancellation_exchange,
614622
retry=True,
615623
)(self._fs_handle_manually_cancelled)

packages/service-library/src/servicelib/rabbitmq/_client_rpc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def _rpc_initialize(self) -> None:
4949
self._channel = await self._connection.channel()
5050

5151
self._rpc = aio_pika.patterns.RPC(self._channel)
52-
await self._rpc.initialize()
52+
await self._rpc.initialize(durable=True)
5353

5454
async def close(self) -> None:
5555
with log_context(
@@ -134,6 +134,7 @@ async def register_handler(
134134
RPCNamespacedMethodName.from_namespace_and_method(namespace, method_name),
135135
handler,
136136
auto_delete=True,
137+
durable=True,
137138
)
138139

139140
async def register_router(

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,23 @@ async def declare_queue(
8585
# NOTE: setting a name will ensure multiple instance will take their data here
8686
queue_parameters |= {"name": queue_name}
8787

88+
# avoids deprecated `transient_nonexcl_queues` warning
89+
# TODO: raise an error here to avoid this configuration
90+
if (
91+
queue_parameters.get("durable", False) is False
92+
and queue_parameters.get("exclusive", False) is False
93+
):
94+
_logger.warning(
95+
"Avoid `transient_nonexcl_queues` warning for queue_setup: %s",
96+
queue_parameters,
97+
)
98+
# this is the closes equivalen of the old `transient_nonexcl_queues`
99+
# without violating the deprecation warning
100+
# https://github.com/rabbitmq/rabbitmq-server/discussions/13161
101+
queue_parameters["durable"] = True
102+
queue_parameters["message-ttl"] = 86400000 # 24 hours
103+
_logger.warning("NEW params: %s", queue_parameters)
104+
88105
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
89106
# most likely someone changed the signature of the queues (parameters etc...)
90107
# Safest way to deal with it:

packages/service-library/tests/deferred_tasks/test__utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async def _() -> AsyncIterator[RabbitBroker]:
5757

5858
@pytest.fixture
5959
def rabbit_exchange() -> RabbitExchange:
60-
return RabbitExchange("test_exchange")
60+
return RabbitExchange("test_exchange", durable=True)
6161

6262

6363
async def _assert_call_count(
@@ -256,7 +256,9 @@ async def test_fan_out_exchange_message_delivery(
256256
handler_1_call_count = Mock()
257257
handler_2_call_count = Mock()
258258

259-
fan_out_exchange = RabbitExchange("test_fan_out_exchange", type=ExchangeType.FANOUT)
259+
fan_out_exchange = RabbitExchange(
260+
"test_fan_out_exchange", type=ExchangeType.FANOUT, durable=True
261+
)
260262

261263
@rabbit_broker.subscriber(queue="handler_1", exchange=fan_out_exchange, retry=True)
262264
async def handler_1(sleep_duration: float) -> None:

0 commit comments

Comments
 (0)