|
6 | 6 | from uuid import uuid4 |
7 | 7 |
|
8 | 8 | import aio_pika |
| 9 | +from common_library.logging.logging_errors import create_troubleshooting_log_kwargs |
9 | 10 | from pydantic import NonNegativeInt |
10 | 11 |
|
11 | 12 | from ..logging_utils import log_catch, log_context |
@@ -85,21 +86,33 @@ async def _on_message( |
85 | 86 | max_retries_upon_error: int, |
86 | 87 | message: aio_pika.abc.AbstractIncomingMessage, |
87 | 88 | ) -> None: |
88 | | - async with message.process(requeue=True, ignore_processed=True): |
89 | | - try: |
90 | | - with log_context( |
91 | | - _logger, |
92 | | - logging.DEBUG, |
93 | | - msg=f"Received message from {message.exchange=}, {message.routing_key=}", |
94 | | - ): |
95 | | - if not await message_handler(message.body): |
96 | | - await _safe_nack(message_handler, max_retries_upon_error, message) |
97 | | - except Exception: # pylint: disable=broad-exception-caught |
98 | | - _logger.exception( |
99 | | - "Exception raised when handling message. TIP: review your code" |
100 | | - ) |
101 | | - with log_catch(_logger, reraise=False): |
| 89 | + with log_catch(_logger, reraise=False): |
| 90 | + async with message.process(requeue=True, ignore_processed=True): |
| 91 | + try: |
| 92 | + with log_context( |
| 93 | + _logger, |
| 94 | + logging.DEBUG, |
| 95 | + msg=f"Received message from {message.exchange=}, {message.routing_key=}", |
| 96 | + ): |
| 97 | + if not await message_handler(message.body): |
| 98 | + await _safe_nack( |
| 99 | + message_handler, max_retries_upon_error, message |
| 100 | + ) |
| 101 | + except Exception as exc: |
| 102 | + _logger.exception( |
| 103 | + **create_troubleshooting_log_kwargs( |
| 104 | + "Unhandled exception raised in message handler", |
| 105 | + error=exc, |
| 106 | + error_context={ |
| 107 | + "message_id": message.message_id, |
| 108 | + "message_body": message.body, |
| 109 | + "message_handler": message_handler.__name__, |
| 110 | + }, |
| 111 | + tip="This could indicate an error in the message handler, please check the message handler code", |
| 112 | + ) |
| 113 | + ) |
102 | 114 | await _safe_nack(message_handler, max_retries_upon_error, message) |
| 115 | + raise |
103 | 116 |
|
104 | 117 |
|
105 | 118 | @dataclass |
@@ -144,6 +157,7 @@ async def close(self) -> None: |
144 | 157 | async def _get_channel(self) -> aio_pika.abc.AbstractChannel: |
145 | 158 | assert self._connection_pool # nosec |
146 | 159 | async with self._connection_pool.acquire() as connection: |
| 160 | + assert isinstance(connection, aio_pika.RobustConnection) # nosec |
147 | 161 | channel: aio_pika.abc.AbstractChannel = await connection.channel() |
148 | 162 | channel.close_callbacks.add(self._channel_close_callback) |
149 | 163 | return channel |
|
0 commit comments