diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index bf243a5a12b3..097da98b8063 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -6,6 +6,8 @@ from uuid import uuid4 import aio_pika +from aiormq import ChannelInvalidStateError +from common_library.logging.logging_errors import create_troubleshooting_log_kwargs from pydantic import NonNegativeInt from ..logging_utils import log_catch, log_context @@ -52,7 +54,7 @@ def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int: return count -async def _safe_nack( +async def _nack_message( message_handler: MessageHandler, max_retries_upon_error: int, message: aio_pika.abc.AbstractIncomingMessage, @@ -72,7 +74,7 @@ async def _safe_nack( # NOTE: puts message to the Dead Letter Exchange await message.nack(requeue=False) else: - _logger.exception( + _logger.error( "Handler '%s' is giving up on message '%s' with body '%s'", message_handler, message, @@ -85,21 +87,49 @@ async def _on_message( max_retries_upon_error: int, message: aio_pika.abc.AbstractIncomingMessage, ) -> None: - async with message.process(requeue=True, ignore_processed=True): - try: - with log_context( - _logger, - logging.DEBUG, - msg=f"Received message from {message.exchange=}, {message.routing_key=}", - ): - if not await message_handler(message.body): - await _safe_nack(message_handler, max_retries_upon_error, message) - except Exception: # pylint: disable=broad-exception-caught - _logger.exception( - "Exception raised when handling message. TIP: review your code" + log_error_context = { + "message_id": message.message_id, + "message_body": message.body, + "message_handler": f"{message_handler}", + } + try: + async with message.process(requeue=True, ignore_processed=True): + try: + with log_context( + _logger, + logging.DEBUG, + msg=f"Received message from {message.exchange=}, {message.routing_key=}", + ): + if not await message_handler(message.body): + await _nack_message( + message_handler, max_retries_upon_error, message + ) + except Exception as exc: # pylint: disable=broad-exception-caught + _logger.exception( + **create_troubleshooting_log_kwargs( + "Unhandled exception raised in message handler or when nacking message", + error=exc, + error_context=log_error_context, + tip="This could indicate an error in the message handler, please check the message handler code", + ) + ) + with log_catch(_logger, reraise=False): + await _nack_message( + message_handler, max_retries_upon_error, message + ) + except ChannelInvalidStateError as exc: + # NOTE: this error can happen as can be seen in aio-pika code + # see https://github.com/mosquito/aio-pika/blob/master/aio_pika/robust_queue.py + _logger.exception( + **create_troubleshooting_log_kwargs( + "Cannot process message because channel is closed. Message will be requeued by RabbitMQ", + error=exc, + error_context=log_error_context, + tip="This could indicate the message handler takes > 30 minutes to complete " + "(default time the RabbitMQ broker waits to close a channel when a " + "message is not acknowledged) or an issue in RabbitMQ broker itself.", ) - with log_catch(_logger, reraise=False): - await _safe_nack(message_handler, max_retries_upon_error, message) + ) @dataclass @@ -144,6 +174,7 @@ async def close(self) -> None: async def _get_channel(self) -> aio_pika.abc.AbstractChannel: assert self._connection_pool # nosec async with self._connection_pool.acquire() as connection: + assert isinstance(connection, aio_pika.RobustConnection) # nosec channel: aio_pika.abc.AbstractChannel = await connection.channel() channel.close_callbacks.add(self._channel_close_callback) return channel diff --git a/packages/service-library/src/servicelib/rabbitmq/_client_base.py b/packages/service-library/src/servicelib/rabbitmq/_client_base.py index 09cea1d44529..ecc483f784b5 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client_base.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client_base.py @@ -6,6 +6,7 @@ import aio_pika import aiormq +from common_library.logging.logging_errors import create_troubleshooting_log_kwargs from settings_library.rabbit import RabbitSettings from ..logging_utils import log_catch @@ -29,33 +30,49 @@ def _connection_close_callback( exc: BaseException | None, ) -> None: if exc: - if isinstance(exc, asyncio.CancelledError): - _logger.info("Rabbit connection cancelled") - elif isinstance(exc, aiormq.exceptions.ConnectionClosed): - _logger.info("Rabbit connection closed: %s", exc) + if isinstance( + exc, asyncio.CancelledError | aiormq.exceptions.ConnectionClosed + ): + _logger.info( + **create_troubleshooting_log_kwargs( + "RabbitMQ connection closed", + error=exc, + error_context={"sender": sender}, + ) + ) else: _logger.error( - "Rabbit connection closed with exception from %s:%s", - type(exc), - exc, + **create_troubleshooting_log_kwargs( + "RabbitMQ connection closed with unexpected error", + error=exc, + error_context={"sender": sender}, + ) ) self._healthy_state = False def _channel_close_callback( self, - sender: Any, # pylint: disable=unused-argument # noqa: ARG002 + sender: Any, exc: BaseException | None, ) -> None: if exc: - if isinstance(exc, asyncio.CancelledError): - _logger.info("Rabbit channel cancelled") - elif isinstance(exc, aiormq.exceptions.ChannelClosed): - _logger.info("Rabbit channel closed") + if isinstance( + exc, asyncio.CancelledError | aiormq.exceptions.ChannelClosed + ): + _logger.info( + **create_troubleshooting_log_kwargs( + "RabbitMQ channel closed", + error=exc, + error_context={"sender": sender}, + ) + ) else: _logger.error( - "Rabbit channel closed with exception from %s:%s", - type(exc), - exc, + **create_troubleshooting_log_kwargs( + "RabbitMQ channel closed with unexpected error", + error=exc, + error_context={"sender": sender}, + ) ) self._healthy_state = False diff --git a/packages/service-library/src/servicelib/redis/_constants.py b/packages/service-library/src/servicelib/redis/_constants.py index 37aa19cb5a82..e34befa1536b 100644 --- a/packages/service-library/src/servicelib/redis/_constants.py +++ b/packages/service-library/src/servicelib/redis/_constants.py @@ -3,6 +3,9 @@ from pydantic import NonNegativeInt +DEFAULT_EXPECTED_LOCK_OVERALL_TIME: Final[datetime.timedelta] = datetime.timedelta( + seconds=30 +) DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10) DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30) diff --git a/packages/service-library/src/servicelib/redis/_decorators.py b/packages/service-library/src/servicelib/redis/_decorators.py index 2ec079c5e7e6..63b1019ba656 100644 --- a/packages/service-library/src/servicelib/redis/_decorators.py +++ b/packages/service-library/src/servicelib/redis/_decorators.py @@ -14,7 +14,7 @@ from ..background_task import periodic from ._client import RedisClientSDK -from ._constants import DEFAULT_LOCK_TTL +from ._constants import DEFAULT_EXPECTED_LOCK_OVERALL_TIME, DEFAULT_LOCK_TTL from ._errors import CouldNotAcquireLockError, LockLostError from ._utils import auto_extend_lock @@ -95,6 +95,7 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: ): raise CouldNotAcquireLockError(lock=lock) + lock_acquisition_time = arrow.utcnow() try: async with asyncio.TaskGroup() as tg: started_event = asyncio.Event() @@ -157,6 +158,19 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: "Look for synchronous code that prevents refreshing the lock or asyncio loop overload.", ) ) + finally: + lock_release_time = arrow.utcnow() + locking_time = lock_release_time - lock_acquisition_time + if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME: + _logger.warning( + "Lock `%s' for %s was held for %s which is longer than the expected (%s). " + "TIP: consider reducing the locking time by optimizing the code inside " + "the critical section or increasing the default locking time", + redis_lock_key, + coro.__name__, + locking_time, + DEFAULT_EXPECTED_LOCK_OVERALL_TIME, + ) return _wrapper diff --git a/packages/service-library/src/servicelib/redis/_semaphore_decorator.py b/packages/service-library/src/servicelib/redis/_semaphore_decorator.py index 4a03e3928602..84679ea2aeeb 100644 --- a/packages/service-library/src/servicelib/redis/_semaphore_decorator.py +++ b/packages/service-library/src/servicelib/redis/_semaphore_decorator.py @@ -7,12 +7,14 @@ from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import Any, ParamSpec, TypeVar +import arrow from common_library.async_tools import cancel_wait_task from common_library.logging.logging_errors import create_troubleshooting_log_kwargs from ..background_task import periodic from ._client import RedisClientSDK from ._constants import ( + DEFAULT_EXPECTED_LOCK_OVERALL_TIME, DEFAULT_SEMAPHORE_TTL, DEFAULT_SOCKET_TIMEOUT, ) @@ -42,6 +44,7 @@ async def _managed_semaphore_execution( if not await semaphore.acquire(): raise SemaphoreAcquisitionError(name=semaphore_key, capacity=semaphore.capacity) + lock_acquisition_time = arrow.utcnow() try: # NOTE: Use TaskGroup for proper exception propagation, this ensures that in case of error the context manager will be properly exited # and the semaphore released. @@ -100,6 +103,18 @@ async def _periodic_renewer() -> None: "Look for synchronous code that prevents refreshing the semaphore or asyncio loop overload.", ) ) + finally: + lock_release_time = arrow.utcnow() + locking_time = lock_release_time - lock_acquisition_time + if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME: + _logger.warning( + "Semaphore '%s' was held for %s which is longer than expected (%s). " + "TIP: consider reducing the locking time by optimizing the code inside " + "the critical section or increasing the default locking time", + semaphore_key, + locking_time, + DEFAULT_EXPECTED_LOCK_OVERALL_TIME, + ) def _create_semaphore( diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq.py b/packages/service-library/tests/rabbitmq/test_rabbitmq.py index d4c6c4b8ebb1..979d11d26777 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq.py @@ -314,7 +314,7 @@ async def _always_returning_fail(_: Any) -> bool: @pytest.mark.parametrize("topics", _TOPICS) -@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors async def test_publish_with_no_registered_subscriber( on_message_spy: mock.Mock, create_rabbitmq_client: Callable[[str], RabbitMQClient], @@ -476,7 +476,7 @@ def _raise_once_then_true(*args, **kwargs): @pytest.fixture async def ensure_queue_deletion( - create_rabbitmq_client: Callable[[str], RabbitMQClient] + create_rabbitmq_client: Callable[[str], RabbitMQClient], ) -> AsyncIterator[Callable[[QueueName], None]]: created_queues = set() @@ -723,7 +723,7 @@ async def test_rabbit_adding_topics_to_a_fanout_exchange( await _assert_message_received(mocked_message_parser, 0) -@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors async def test_rabbit_not_using_the_same_exchange_type_raises( create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], @@ -738,7 +738,7 @@ async def test_rabbit_not_using_the_same_exchange_type_raises( await client.subscribe(exchange_name, mocked_message_parser, topics=[]) -@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors async def test_unsubscribe_consumer( create_rabbitmq_client: Callable[[str], RabbitMQClient], random_exchange_name: Callable[[], str], diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 735286b77a2d..9b7b7c17b18b 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -249,9 +249,9 @@ def _comp_sidecar_fct( self.backend.client.publish_dataset(task_future, name=job_id) ) - _logger.debug( + _logger.info( "Dask task %s started [%s]", - f"{task_future.key=}", + f"{job_id=}", f"{node_image.command=}", ) return PublishedComputationTask(node_id=node_id, job_id=DaskJobID(job_id))