Skip to content

Commit 9507936

Browse files
committed
catch also errors that might happen in the processing of messages
1 parent 0a583a5 commit 9507936

File tree

1 file changed

+28
-14
lines changed
  • packages/service-library/src/servicelib/rabbitmq

1 file changed

+28
-14
lines changed

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from uuid import uuid4
77

88
import aio_pika
9+
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
910
from pydantic import NonNegativeInt
1011

1112
from ..logging_utils import log_catch, log_context
@@ -85,21 +86,33 @@ async def _on_message(
8586
max_retries_upon_error: int,
8687
message: aio_pika.abc.AbstractIncomingMessage,
8788
) -> None:
88-
async with message.process(requeue=True, ignore_processed=True):
89-
try:
90-
with log_context(
91-
_logger,
92-
logging.DEBUG,
93-
msg=f"Received message from {message.exchange=}, {message.routing_key=}",
94-
):
95-
if not await message_handler(message.body):
96-
await _safe_nack(message_handler, max_retries_upon_error, message)
97-
except Exception: # pylint: disable=broad-exception-caught
98-
_logger.exception(
99-
"Exception raised when handling message. TIP: review your code"
100-
)
101-
with log_catch(_logger, reraise=False):
89+
with log_catch(_logger, reraise=False):
90+
async with message.process(requeue=True, ignore_processed=True):
91+
try:
92+
with log_context(
93+
_logger,
94+
logging.DEBUG,
95+
msg=f"Received message from {message.exchange=}, {message.routing_key=}",
96+
):
97+
if not await message_handler(message.body):
98+
await _safe_nack(
99+
message_handler, max_retries_upon_error, message
100+
)
101+
except Exception as exc:
102+
_logger.exception(
103+
**create_troubleshooting_log_kwargs(
104+
"Unhandled exception raised in message handler",
105+
error=exc,
106+
error_context={
107+
"message_id": message.message_id,
108+
"message_body": message.body,
109+
"message_handler": message_handler.__name__,
110+
},
111+
tip="This could indicate an error in the message handler, please check the message handler code",
112+
)
113+
)
102114
await _safe_nack(message_handler, max_retries_upon_error, message)
115+
raise
103116

104117

105118
@dataclass
@@ -144,6 +157,7 @@ async def close(self) -> None:
144157
async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
145158
assert self._connection_pool # nosec
146159
async with self._connection_pool.acquire() as connection:
160+
assert isinstance(connection, aio_pika.RobustConnection) # nosec
147161
channel: aio_pika.abc.AbstractChannel = await connection.channel()
148162
channel.close_callbacks.add(self._channel_close_callback)
149163
return channel

0 commit comments

Comments
 (0)