Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

import arrow
from faststream.exceptions import NackMessage, RejectMessage
from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange, RabbitRouter
from faststream.rabbit import (
ExchangeType,
RabbitBroker,
RabbitExchange,
RabbitQueue,
RabbitRouter,
)
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_catch, log_context
from servicelib.redis import RedisClientSDK
Expand Down Expand Up @@ -149,10 +155,14 @@ def __init__(
self._global_resources_prefix = f"{calling_module_name}"

self.common_exchange = RabbitExchange(
f"{self._global_resources_prefix}_common", type=ExchangeType.DIRECT
f"{self._global_resources_prefix}_common",
durable=True,
type=ExchangeType.DIRECT,
)
self.cancellation_exchange = RabbitExchange(
f"{self._global_resources_prefix}_cancellation", type=ExchangeType.FANOUT
f"{self._global_resources_prefix}_cancellation",
durable=True,
type=ExchangeType.FANOUT,
)

def patch_based_deferred_handlers(self) -> None:
Expand Down Expand Up @@ -243,8 +253,10 @@ def un_patch_base_deferred_handlers(cls) -> None:
subclass.is_present.original_is_present # type: ignore
)

def _get_global_queue_name(self, queue_name: _FastStreamRabbitQueue) -> str:
return f"{self._global_resources_prefix}_{queue_name}"
def _get_global_queue(self, queue_name: _FastStreamRabbitQueue) -> RabbitQueue:
return RabbitQueue(
f"{self._global_resources_prefix}_{queue_name}", durable=True
)

def __get_subclass(
self, class_unique_reference: ClassUniqueReference
Expand All @@ -259,7 +271,7 @@ async def __publish_to_queue(
) -> None:
await self.broker.publish(
task_uid,
queue=self._get_global_queue_name(queue),
queue=self._get_global_queue(queue),
exchange=(
self.cancellation_exchange
if queue == _FastStreamRabbitQueue.MANUALLY_CANCELLED
Expand Down Expand Up @@ -569,47 +581,43 @@ def _register_subscribers(self) -> None:
# pylint:disable=unexpected-keyword-arg
# pylint:disable=no-value-for-parameter
self._fs_handle_scheduled = self.router.subscriber(
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SCHEDULED),
queue=self._get_global_queue(_FastStreamRabbitQueue.SCHEDULED),
exchange=self.common_exchange,
retry=True,
)(self._fs_handle_scheduled)

self._fs_handle_submit_task = self.router.subscriber(
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SUBMIT_TASK),
queue=self._get_global_queue(_FastStreamRabbitQueue.SUBMIT_TASK),
exchange=self.common_exchange,
retry=True,
)(self._fs_handle_submit_task)

self._fs_handle_worker = self.router.subscriber(
queue=self._get_global_queue_name(_FastStreamRabbitQueue.WORKER),
queue=self._get_global_queue(_FastStreamRabbitQueue.WORKER),
exchange=self.common_exchange,
retry=True,
)(self._fs_handle_worker)

self._fs_handle_error_result = self.router.subscriber(
queue=self._get_global_queue_name(_FastStreamRabbitQueue.ERROR_RESULT),
queue=self._get_global_queue(_FastStreamRabbitQueue.ERROR_RESULT),
exchange=self.common_exchange,
retry=True,
)(self._fs_handle_error_result)

self._fs_handle_finished_with_error = self.router.subscriber(
queue=self._get_global_queue_name(
_FastStreamRabbitQueue.FINISHED_WITH_ERROR
),
queue=self._get_global_queue(_FastStreamRabbitQueue.FINISHED_WITH_ERROR),
exchange=self.common_exchange,
retry=True,
)(self._fs_handle_finished_with_error)

self._fs_handle_deferred_result = self.router.subscriber(
queue=self._get_global_queue_name(_FastStreamRabbitQueue.DEFERRED_RESULT),
queue=self._get_global_queue(_FastStreamRabbitQueue.DEFERRED_RESULT),
exchange=self.common_exchange,
retry=True,
)(self._fs_handle_deferred_result)

self._fs_handle_manually_cancelled = self.router.subscriber(
queue=self._get_global_queue_name(
_FastStreamRabbitQueue.MANUALLY_CANCELLED
),
queue=self._get_global_queue(_FastStreamRabbitQueue.MANUALLY_CANCELLED),
exchange=self.cancellation_exchange,
retry=True,
)(self._fs_handle_manually_cancelled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def _rpc_initialize(self) -> None:
self._channel = await self._connection.channel()

self._rpc = aio_pika.patterns.RPC(self._channel)
await self._rpc.initialize()
await self._rpc.initialize(durable=True)

async def close(self) -> None:
with log_context(
Expand Down Expand Up @@ -134,6 +134,7 @@ async def register_handler(
RPCNamespacedMethodName.from_namespace_and_method(namespace, method_name),
handler,
auto_delete=True,
durable=True,
)

async def register_router(
Expand Down
13 changes: 13 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ async def declare_queue(
# NOTE: setting a name will ensure multiple instance will take their data here
queue_parameters |= {"name": queue_name}

# avoids deprecated `transient_nonexcl_queues` warning in RabbitMQ
if (
queue_parameters.get("durable", False) is False
and queue_parameters.get("exclusive", False) is False
):
msg = (
"Queue must be `durable` or `exclusive`, but not both. "
"This is to avoid the `transient_nonexcl_queues` warning. "
"NOTE: if both `durable` and `exclusive` are missing they are considered False. "
f"{queue_parameters=}"
)
raise ValueError(msg)

# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
# most likely someone changed the signature of the queues (parameters etc...)
# Safest way to deal with it:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def _() -> AsyncIterator[RabbitBroker]:

@pytest.fixture
def rabbit_exchange() -> RabbitExchange:
return RabbitExchange("test_exchange")
return RabbitExchange("test_exchange", durable=True)


async def _assert_call_count(
Expand Down Expand Up @@ -256,7 +256,9 @@ async def test_fan_out_exchange_message_delivery(
handler_1_call_count = Mock()
handler_2_call_count = Mock()

fan_out_exchange = RabbitExchange("test_fan_out_exchange", type=ExchangeType.FANOUT)
fan_out_exchange = RabbitExchange(
"test_fan_out_exchange", type=ExchangeType.FANOUT, durable=True
)

@rabbit_broker.subscriber(queue="handler_1", exchange=fan_out_exchange, retry=True)
async def handler_1(sleep_duration: float) -> None:
Expand Down
2 changes: 1 addition & 1 deletion services/autoscaling/tests/manual/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbit:
image: itisfoundation/rabbitmq:3.13.7-management
image: itisfoundation/rabbitmq:4.1.2-management
init: true
hostname: "{{.Node.Hostname}}-{{.Service.Name}}-{{.Task.Slot}}"
ports:
Expand Down
2 changes: 1 addition & 1 deletion services/director-v2/docker-compose-extra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
"log_line_prefix=[%p] [%a] [%c] [%x] "
]
rabbit:
image: itisfoundation/rabbitmq:3.13.7-management
image: itisfoundation/rabbitmq:4.1.2-management
init: true
environment:
- RABBITMQ_DEFAULT_USER=${RABBIT_USER}
Expand Down
2 changes: 1 addition & 1 deletion services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ services:
networks: *storage_networks

rabbit:
image: itisfoundation/rabbitmq:3.13.7-management
image: itisfoundation/rabbitmq:4.1.2-management
init: true
hostname: "{{.Node.Hostname}}-{{.Task.Slot}}"
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ services:
- "18081:8081"

rabbit:
image: itisfoundation/rabbitmq:3.13.7-management
image: itisfoundation/rabbitmq:4.1.2-management
init: true
environment:
- RABBITMQ_DEFAULT_USER=admin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ services:
"${TEST_REDIS_PASSWORD}"
]
rabbit:
image: itisfoundation/rabbitmq:3.13.7-management
image: itisfoundation/rabbitmq:4.1.2-management
init: true
2 changes: 1 addition & 1 deletion tests/swarm-deploy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def simcore_stack_deployed_services(
# logs table like
# ID NAME IMAGE NODE DESIRED STATE CURRENT STATE ERROR
# xbrhmaygtb76 simcore_sidecar.1 itisfoundation/sidecar:latest crespo-wkstn Running Running 53 seconds ago
# zde7p8qdwk4j simcore_rabbit.1 itisfoundation/rabbitmq:3.13.7-management crespo-wkstn Running Running 59 seconds ago
# zde7p8qdwk4j simcore_rabbit.1 itisfoundation/rabbitmq:4.1.2-management crespo-wkstn Running Running 59 seconds ago
# f2gxmhwq7hhk simcore_postgres.1 postgres:10.10 crespo-wkstn Running Running about a minute ago
# 1lh2hulxmc4q simcore_director.1 itisfoundation/director:latest crespo-wkstn Running Running 34 seconds ago
# ...
Expand Down
Loading