|
6 | 6 | from uuid import uuid4 |
7 | 7 |
|
8 | 8 | import aio_pika |
| 9 | +from aiormq import ChannelInvalidStateError |
9 | 10 | from common_library.logging.logging_errors import create_troubleshooting_log_kwargs |
10 | 11 | from pydantic import NonNegativeInt |
11 | 12 |
|
@@ -86,32 +87,47 @@ async def _on_message( |
86 | 87 | max_retries_upon_error: int, |
87 | 88 | message: aio_pika.abc.AbstractIncomingMessage, |
88 | 89 | ) -> None: |
89 | | - async with message.process(requeue=True, ignore_processed=True): |
90 | | - try: |
91 | | - with log_context( |
92 | | - _logger, |
93 | | - logging.DEBUG, |
94 | | - msg=f"Received message from {message.exchange=}, {message.routing_key=}", |
95 | | - ): |
96 | | - if not await message_handler(message.body): |
| 90 | + log_error_context = { |
| 91 | + "message_id": message.message_id, |
| 92 | + "message_body": message.body, |
| 93 | + "message_handler": f"{message_handler}", |
| 94 | + } |
| 95 | + try: |
| 96 | + async with message.process(requeue=True, ignore_processed=True): |
| 97 | + try: |
| 98 | + with log_context( |
| 99 | + _logger, |
| 100 | + logging.DEBUG, |
| 101 | + msg=f"Received message from {message.exchange=}, {message.routing_key=}", |
| 102 | + ): |
| 103 | + if not await message_handler(message.body): |
| 104 | + await _nack_message( |
| 105 | + message_handler, max_retries_upon_error, message |
| 106 | + ) |
| 107 | + except Exception as exc: # pylint: disable=broad-exception-caught |
| 108 | + _logger.exception( |
| 109 | + **create_troubleshooting_log_kwargs( |
| 110 | + "Unhandled exception raised in message handler or when nacking message", |
| 111 | + error=exc, |
| 112 | + error_context=log_error_context, |
| 113 | + tip="This could indicate an error in the message handler, please check the message handler code", |
| 114 | + ) |
| 115 | + ) |
| 116 | + with log_catch(_logger, reraise=False): |
97 | 117 | await _nack_message( |
98 | 118 | message_handler, max_retries_upon_error, message |
99 | 119 | ) |
100 | | - except Exception as exc: # pylint: disable=broad-exception-caught |
101 | | - _logger.exception( |
102 | | - **create_troubleshooting_log_kwargs( |
103 | | - "Unhandled exception raised in message handler or when nacking message", |
104 | | - error=exc, |
105 | | - error_context={ |
106 | | - "message_id": message.message_id, |
107 | | - "message_body": message.body, |
108 | | - "message_handler": f"{message_handler}", |
109 | | - }, |
110 | | - tip="This could indicate an error in the message handler, please check the message handler code", |
111 | | - ) |
| 120 | + except ChannelInvalidStateError as exc: |
| 121 | + _logger.exception( |
| 122 | + **create_troubleshooting_log_kwargs( |
| 123 | + "Cannot process message because channel is closed. Message will be requeued by RabbitMQ", |
| 124 | + error=exc, |
| 125 | + error_context=log_error_context, |
| 126 | + tip="This could indicate the message handler takes > 30 minutes to complete " |
| 127 | + "(default time the RabbitMQ broker waits to close a channel when a " |
| 128 | + "message is not acknowledged) or an issue in RabbitMQ broker itself.", |
112 | 129 | ) |
113 | | - with log_catch(_logger, reraise=False): |
114 | | - await _nack_message(message_handler, max_retries_upon_error, message) |
| 130 | + ) |
115 | 131 |
|
116 | 132 |
|
117 | 133 | @dataclass |
|
0 commit comments