Skip to content

Commit af7ba09

Browse files
authored
🐛RabbitMQ client: prevent crashing consumer if channel is closed (#8382)
1 parent 8003f66 commit af7ba09

File tree

7 files changed

+118
-38
lines changed

7 files changed

+118
-38
lines changed

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from uuid import uuid4
77

88
import aio_pika
9+
from aiormq import ChannelInvalidStateError
10+
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
911
from pydantic import NonNegativeInt
1012

1113
from ..logging_utils import log_catch, log_context
@@ -52,7 +54,7 @@ def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int:
5254
return count
5355

5456

55-
async def _safe_nack(
57+
async def _nack_message(
5658
message_handler: MessageHandler,
5759
max_retries_upon_error: int,
5860
message: aio_pika.abc.AbstractIncomingMessage,
@@ -72,7 +74,7 @@ async def _safe_nack(
7274
# NOTE: puts message to the Dead Letter Exchange
7375
await message.nack(requeue=False)
7476
else:
75-
_logger.exception(
77+
_logger.error(
7678
"Handler '%s' is giving up on message '%s' with body '%s'",
7779
message_handler,
7880
message,
@@ -85,21 +87,49 @@ async def _on_message(
8587
max_retries_upon_error: int,
8688
message: aio_pika.abc.AbstractIncomingMessage,
8789
) -> None:
88-
async with message.process(requeue=True, ignore_processed=True):
89-
try:
90-
with log_context(
91-
_logger,
92-
logging.DEBUG,
93-
msg=f"Received message from {message.exchange=}, {message.routing_key=}",
94-
):
95-
if not await message_handler(message.body):
96-
await _safe_nack(message_handler, max_retries_upon_error, message)
97-
except Exception: # pylint: disable=broad-exception-caught
98-
_logger.exception(
99-
"Exception raised when handling message. TIP: review your code"
90+
log_error_context = {
91+
"message_id": message.message_id,
92+
"message_body": message.body,
93+
"message_handler": f"{message_handler}",
94+
}
95+
try:
96+
async with message.process(requeue=True, ignore_processed=True):
97+
try:
98+
with log_context(
99+
_logger,
100+
logging.DEBUG,
101+
msg=f"Received message from {message.exchange=}, {message.routing_key=}",
102+
):
103+
if not await message_handler(message.body):
104+
await _nack_message(
105+
message_handler, max_retries_upon_error, message
106+
)
107+
except Exception as exc: # pylint: disable=broad-exception-caught
108+
_logger.exception(
109+
**create_troubleshooting_log_kwargs(
110+
"Unhandled exception raised in message handler or when nacking message",
111+
error=exc,
112+
error_context=log_error_context,
113+
tip="This could indicate an error in the message handler, please check the message handler code",
114+
)
115+
)
116+
with log_catch(_logger, reraise=False):
117+
await _nack_message(
118+
message_handler, max_retries_upon_error, message
119+
)
120+
except ChannelInvalidStateError as exc:
121+
# NOTE: this error can happen as can be seen in aio-pika code
122+
# see https://github.com/mosquito/aio-pika/blob/master/aio_pika/robust_queue.py
123+
_logger.exception(
124+
**create_troubleshooting_log_kwargs(
125+
"Cannot process message because channel is closed. Message will be requeued by RabbitMQ",
126+
error=exc,
127+
error_context=log_error_context,
128+
tip="This could indicate the message handler takes > 30 minutes to complete "
129+
"(default time the RabbitMQ broker waits to close a channel when a "
130+
"message is not acknowledged) or an issue in RabbitMQ broker itself.",
100131
)
101-
with log_catch(_logger, reraise=False):
102-
await _safe_nack(message_handler, max_retries_upon_error, message)
132+
)
103133

104134

105135
@dataclass
@@ -144,6 +174,7 @@ async def close(self) -> None:
144174
async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
145175
assert self._connection_pool # nosec
146176
async with self._connection_pool.acquire() as connection:
177+
assert isinstance(connection, aio_pika.RobustConnection) # nosec
147178
channel: aio_pika.abc.AbstractChannel = await connection.channel()
148179
channel.close_callbacks.add(self._channel_close_callback)
149180
return channel

packages/service-library/src/servicelib/rabbitmq/_client_base.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import aio_pika
88
import aiormq
9+
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
910
from settings_library.rabbit import RabbitSettings
1011

1112
from ..logging_utils import log_catch
@@ -29,33 +30,49 @@ def _connection_close_callback(
2930
exc: BaseException | None,
3031
) -> None:
3132
if exc:
32-
if isinstance(exc, asyncio.CancelledError):
33-
_logger.info("Rabbit connection cancelled")
34-
elif isinstance(exc, aiormq.exceptions.ConnectionClosed):
35-
_logger.info("Rabbit connection closed: %s", exc)
33+
if isinstance(
34+
exc, asyncio.CancelledError | aiormq.exceptions.ConnectionClosed
35+
):
36+
_logger.info(
37+
**create_troubleshooting_log_kwargs(
38+
"RabbitMQ connection closed",
39+
error=exc,
40+
error_context={"sender": sender},
41+
)
42+
)
3643
else:
3744
_logger.error(
38-
"Rabbit connection closed with exception from %s:%s",
39-
type(exc),
40-
exc,
45+
**create_troubleshooting_log_kwargs(
46+
"RabbitMQ connection closed with unexpected error",
47+
error=exc,
48+
error_context={"sender": sender},
49+
)
4150
)
4251
self._healthy_state = False
4352

4453
def _channel_close_callback(
4554
self,
46-
sender: Any, # pylint: disable=unused-argument # noqa: ARG002
55+
sender: Any,
4756
exc: BaseException | None,
4857
) -> None:
4958
if exc:
50-
if isinstance(exc, asyncio.CancelledError):
51-
_logger.info("Rabbit channel cancelled")
52-
elif isinstance(exc, aiormq.exceptions.ChannelClosed):
53-
_logger.info("Rabbit channel closed")
59+
if isinstance(
60+
exc, asyncio.CancelledError | aiormq.exceptions.ChannelClosed
61+
):
62+
_logger.info(
63+
**create_troubleshooting_log_kwargs(
64+
"RabbitMQ channel closed",
65+
error=exc,
66+
error_context={"sender": sender},
67+
)
68+
)
5469
else:
5570
_logger.error(
56-
"Rabbit channel closed with exception from %s:%s",
57-
type(exc),
58-
exc,
71+
**create_troubleshooting_log_kwargs(
72+
"RabbitMQ channel closed with unexpected error",
73+
error=exc,
74+
error_context={"sender": sender},
75+
)
5976
)
6077
self._healthy_state = False
6178

packages/service-library/src/servicelib/redis/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
from pydantic import NonNegativeInt
55

6+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME: Final[datetime.timedelta] = datetime.timedelta(
7+
seconds=30
8+
)
69
DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
710
DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)
811

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from ..background_task import periodic
1616
from ._client import RedisClientSDK
17-
from ._constants import DEFAULT_LOCK_TTL
17+
from ._constants import DEFAULT_EXPECTED_LOCK_OVERALL_TIME, DEFAULT_LOCK_TTL
1818
from ._errors import CouldNotAcquireLockError, LockLostError
1919
from ._utils import auto_extend_lock
2020

@@ -95,6 +95,7 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
9595
):
9696
raise CouldNotAcquireLockError(lock=lock)
9797

98+
lock_acquisition_time = arrow.utcnow()
9899
try:
99100
async with asyncio.TaskGroup() as tg:
100101
started_event = asyncio.Event()
@@ -157,6 +158,19 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
157158
"Look for synchronous code that prevents refreshing the lock or asyncio loop overload.",
158159
)
159160
)
161+
finally:
162+
lock_release_time = arrow.utcnow()
163+
locking_time = lock_release_time - lock_acquisition_time
164+
if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME:
165+
_logger.warning(
166+
"Lock `%s' for %s was held for %s which is longer than the expected (%s). "
167+
"TIP: consider reducing the locking time by optimizing the code inside "
168+
"the critical section or increasing the default locking time",
169+
redis_lock_key,
170+
coro.__name__,
171+
locking_time,
172+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
173+
)
160174

161175
return _wrapper
162176

packages/service-library/src/servicelib/redis/_semaphore_decorator.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
from contextlib import AbstractAsyncContextManager, asynccontextmanager
88
from typing import Any, ParamSpec, TypeVar
99

10+
import arrow
1011
from common_library.async_tools import cancel_wait_task
1112
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
1213

1314
from ..background_task import periodic
1415
from ._client import RedisClientSDK
1516
from ._constants import (
17+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
1618
DEFAULT_SEMAPHORE_TTL,
1719
DEFAULT_SOCKET_TIMEOUT,
1820
)
@@ -42,6 +44,7 @@ async def _managed_semaphore_execution(
4244
if not await semaphore.acquire():
4345
raise SemaphoreAcquisitionError(name=semaphore_key, capacity=semaphore.capacity)
4446

47+
lock_acquisition_time = arrow.utcnow()
4548
try:
4649
# NOTE: Use TaskGroup for proper exception propagation, this ensures that in case of error the context manager will be properly exited
4750
# and the semaphore released.
@@ -100,6 +103,18 @@ async def _periodic_renewer() -> None:
100103
"Look for synchronous code that prevents refreshing the semaphore or asyncio loop overload.",
101104
)
102105
)
106+
finally:
107+
lock_release_time = arrow.utcnow()
108+
locking_time = lock_release_time - lock_acquisition_time
109+
if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME:
110+
_logger.warning(
111+
"Semaphore '%s' was held for %s which is longer than expected (%s). "
112+
"TIP: consider reducing the locking time by optimizing the code inside "
113+
"the critical section or increasing the default locking time",
114+
semaphore_key,
115+
locking_time,
116+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
117+
)
103118

104119

105120
def _create_semaphore(

packages/service-library/tests/rabbitmq/test_rabbitmq.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ async def _always_returning_fail(_: Any) -> bool:
314314

315315

316316
@pytest.mark.parametrize("topics", _TOPICS)
317-
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
317+
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors
318318
async def test_publish_with_no_registered_subscriber(
319319
on_message_spy: mock.Mock,
320320
create_rabbitmq_client: Callable[[str], RabbitMQClient],
@@ -476,7 +476,7 @@ def _raise_once_then_true(*args, **kwargs):
476476

477477
@pytest.fixture
478478
async def ensure_queue_deletion(
479-
create_rabbitmq_client: Callable[[str], RabbitMQClient]
479+
create_rabbitmq_client: Callable[[str], RabbitMQClient],
480480
) -> AsyncIterator[Callable[[QueueName], None]]:
481481
created_queues = set()
482482

@@ -723,7 +723,7 @@ async def test_rabbit_adding_topics_to_a_fanout_exchange(
723723
await _assert_message_received(mocked_message_parser, 0)
724724

725725

726-
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
726+
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors
727727
async def test_rabbit_not_using_the_same_exchange_type_raises(
728728
create_rabbitmq_client: Callable[[str], RabbitMQClient],
729729
random_exchange_name: Callable[[], str],
@@ -738,7 +738,7 @@ async def test_rabbit_not_using_the_same_exchange_type_raises(
738738
await client.subscribe(exchange_name, mocked_message_parser, topics=[])
739739

740740

741-
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
741+
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors
742742
async def test_unsubscribe_consumer(
743743
create_rabbitmq_client: Callable[[str], RabbitMQClient],
744744
random_exchange_name: Callable[[], str],

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,9 +249,9 @@ def _comp_sidecar_fct(
249249
self.backend.client.publish_dataset(task_future, name=job_id)
250250
)
251251

252-
_logger.debug(
252+
_logger.info(
253253
"Dask task %s started [%s]",
254-
f"{task_future.key=}",
254+
f"{job_id=}",
255255
f"{node_image.command=}",
256256
)
257257
return PublishedComputationTask(node_id=node_id, job_id=DaskJobID(job_id))

0 commit comments

Comments
 (0)