Skip to content

Commit 3c2dae1

Browse files
Merge branch 'master' into is8102/add-search-api-in-storage
2 parents 74814c4 + 3ba41bc commit 3c2dae1

File tree

28 files changed

+640
-441
lines changed

28 files changed

+640
-441
lines changed

.github/workflows/cleanup-caches-by-branches.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,23 @@ on:
88
jobs:
99
cleanup:
1010
runs-on: ubuntu-latest
11+
permissions:
12+
actions: write
1113
steps:
1214
- name: Cleanup
1315
run: |
14-
gh extension install actions/gh-actions-cache
15-
16-
echo "Fetching list of cache key"
17-
cacheKeysForPR=$(gh actions-cache list -R $REPO -B $BRANCH -L 100 | cut -f 1 )
16+
echo "Fetching list of cache keys"
17+
cacheKeysForPR=$(gh cache list --ref $BRANCH --limit 100 --json id --jq '.[].id')
1818
1919
## Setting this to not fail the workflow while deleting cache keys.
2020
set +e
2121
echo "Deleting caches..."
2222
for cacheKey in $cacheKeysForPR
2323
do
24-
gh actions-cache delete $cacheKey -R $REPO -B $BRANCH --confirm
24+
gh cache delete $cacheKey
2525
done
2626
echo "Done"
2727
env:
28-
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
29-
REPO: ${{ github.repository }}
28+
GH_TOKEN: ${{ github.token }}
29+
GH_REPO: ${{ github.repository }}
3030
BRANCH: refs/pull/${{ github.event.pull_request.number }}/merge

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from uuid import uuid4
77

88
import aio_pika
9+
from aiormq import ChannelInvalidStateError
10+
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
911
from pydantic import NonNegativeInt
1012

1113
from ..logging_utils import log_catch, log_context
@@ -52,7 +54,7 @@ def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int:
5254
return count
5355

5456

55-
async def _safe_nack(
57+
async def _nack_message(
5658
message_handler: MessageHandler,
5759
max_retries_upon_error: int,
5860
message: aio_pika.abc.AbstractIncomingMessage,
@@ -72,7 +74,7 @@ async def _safe_nack(
7274
# NOTE: puts message to the Dead Letter Exchange
7375
await message.nack(requeue=False)
7476
else:
75-
_logger.exception(
77+
_logger.error(
7678
"Handler '%s' is giving up on message '%s' with body '%s'",
7779
message_handler,
7880
message,
@@ -85,21 +87,49 @@ async def _on_message(
8587
max_retries_upon_error: int,
8688
message: aio_pika.abc.AbstractIncomingMessage,
8789
) -> None:
88-
async with message.process(requeue=True, ignore_processed=True):
89-
try:
90-
with log_context(
91-
_logger,
92-
logging.DEBUG,
93-
msg=f"Received message from {message.exchange=}, {message.routing_key=}",
94-
):
95-
if not await message_handler(message.body):
96-
await _safe_nack(message_handler, max_retries_upon_error, message)
97-
except Exception: # pylint: disable=broad-exception-caught
98-
_logger.exception(
99-
"Exception raised when handling message. TIP: review your code"
90+
log_error_context = {
91+
"message_id": message.message_id,
92+
"message_body": message.body,
93+
"message_handler": f"{message_handler}",
94+
}
95+
try:
96+
async with message.process(requeue=True, ignore_processed=True):
97+
try:
98+
with log_context(
99+
_logger,
100+
logging.DEBUG,
101+
msg=f"Received message from {message.exchange=}, {message.routing_key=}",
102+
):
103+
if not await message_handler(message.body):
104+
await _nack_message(
105+
message_handler, max_retries_upon_error, message
106+
)
107+
except Exception as exc: # pylint: disable=broad-exception-caught
108+
_logger.exception(
109+
**create_troubleshooting_log_kwargs(
110+
"Unhandled exception raised in message handler or when nacking message",
111+
error=exc,
112+
error_context=log_error_context,
113+
tip="This could indicate an error in the message handler, please check the message handler code",
114+
)
115+
)
116+
with log_catch(_logger, reraise=False):
117+
await _nack_message(
118+
message_handler, max_retries_upon_error, message
119+
)
120+
except ChannelInvalidStateError as exc:
121+
# NOTE: this error can happen as can be seen in aio-pika code
122+
# see https://github.com/mosquito/aio-pika/blob/master/aio_pika/robust_queue.py
123+
_logger.exception(
124+
**create_troubleshooting_log_kwargs(
125+
"Cannot process message because channel is closed. Message will be requeued by RabbitMQ",
126+
error=exc,
127+
error_context=log_error_context,
128+
tip="This could indicate the message handler takes > 30 minutes to complete "
129+
"(default time the RabbitMQ broker waits to close a channel when a "
130+
"message is not acknowledged) or an issue in RabbitMQ broker itself.",
100131
)
101-
with log_catch(_logger, reraise=False):
102-
await _safe_nack(message_handler, max_retries_upon_error, message)
132+
)
103133

104134

105135
@dataclass
@@ -144,6 +174,7 @@ async def close(self) -> None:
144174
async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
145175
assert self._connection_pool # nosec
146176
async with self._connection_pool.acquire() as connection:
177+
assert isinstance(connection, aio_pika.RobustConnection) # nosec
147178
channel: aio_pika.abc.AbstractChannel = await connection.channel()
148179
channel.close_callbacks.add(self._channel_close_callback)
149180
return channel

packages/service-library/src/servicelib/rabbitmq/_client_base.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import aio_pika
88
import aiormq
9+
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
910
from settings_library.rabbit import RabbitSettings
1011

1112
from ..logging_utils import log_catch
@@ -29,33 +30,49 @@ def _connection_close_callback(
2930
exc: BaseException | None,
3031
) -> None:
3132
if exc:
32-
if isinstance(exc, asyncio.CancelledError):
33-
_logger.info("Rabbit connection cancelled")
34-
elif isinstance(exc, aiormq.exceptions.ConnectionClosed):
35-
_logger.info("Rabbit connection closed: %s", exc)
33+
if isinstance(
34+
exc, asyncio.CancelledError | aiormq.exceptions.ConnectionClosed
35+
):
36+
_logger.info(
37+
**create_troubleshooting_log_kwargs(
38+
"RabbitMQ connection closed",
39+
error=exc,
40+
error_context={"sender": sender},
41+
)
42+
)
3643
else:
3744
_logger.error(
38-
"Rabbit connection closed with exception from %s:%s",
39-
type(exc),
40-
exc,
45+
**create_troubleshooting_log_kwargs(
46+
"RabbitMQ connection closed with unexpected error",
47+
error=exc,
48+
error_context={"sender": sender},
49+
)
4150
)
4251
self._healthy_state = False
4352

4453
def _channel_close_callback(
4554
self,
46-
sender: Any, # pylint: disable=unused-argument # noqa: ARG002
55+
sender: Any,
4756
exc: BaseException | None,
4857
) -> None:
4958
if exc:
50-
if isinstance(exc, asyncio.CancelledError):
51-
_logger.info("Rabbit channel cancelled")
52-
elif isinstance(exc, aiormq.exceptions.ChannelClosed):
53-
_logger.info("Rabbit channel closed")
59+
if isinstance(
60+
exc, asyncio.CancelledError | aiormq.exceptions.ChannelClosed
61+
):
62+
_logger.info(
63+
**create_troubleshooting_log_kwargs(
64+
"RabbitMQ channel closed",
65+
error=exc,
66+
error_context={"sender": sender},
67+
)
68+
)
5469
else:
5570
_logger.error(
56-
"Rabbit channel closed with exception from %s:%s",
57-
type(exc),
58-
exc,
71+
**create_troubleshooting_log_kwargs(
72+
"RabbitMQ channel closed with unexpected error",
73+
error=exc,
74+
error_context={"sender": sender},
75+
)
5976
)
6077
self._healthy_state = False
6178

packages/service-library/src/servicelib/redis/_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
from pydantic import NonNegativeInt
55

6+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME: Final[datetime.timedelta] = datetime.timedelta(
7+
seconds=30
8+
)
69
DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
710
DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)
811

packages/service-library/src/servicelib/redis/_decorators.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from ..background_task import periodic
1616
from ._client import RedisClientSDK
17-
from ._constants import DEFAULT_LOCK_TTL
17+
from ._constants import DEFAULT_EXPECTED_LOCK_OVERALL_TIME, DEFAULT_LOCK_TTL
1818
from ._errors import CouldNotAcquireLockError, LockLostError
1919
from ._utils import auto_extend_lock
2020

@@ -95,6 +95,7 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
9595
):
9696
raise CouldNotAcquireLockError(lock=lock)
9797

98+
lock_acquisition_time = arrow.utcnow()
9899
try:
99100
async with asyncio.TaskGroup() as tg:
100101
started_event = asyncio.Event()
@@ -157,6 +158,19 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
157158
"Look for synchronous code that prevents refreshing the lock or asyncio loop overload.",
158159
)
159160
)
161+
finally:
162+
lock_release_time = arrow.utcnow()
163+
locking_time = lock_release_time - lock_acquisition_time
164+
if locking_time > DEFAULT_EXPECTED_LOCK_OVERALL_TIME:
165+
_logger.warning(
166+
"Lock `%s' for %s was held for %s which is longer than the expected (%s). "
167+
"TIP: consider reducing the locking time by optimizing the code inside "
168+
"the critical section or increasing the default locking time",
169+
redis_lock_key,
170+
coro.__name__,
171+
locking_time,
172+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
173+
)
160174

161175
return _wrapper
162176

packages/service-library/src/servicelib/redis/_semaphore_decorator.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
from contextlib import AbstractAsyncContextManager, asynccontextmanager
88
from typing import Any, ParamSpec, TypeVar
99

10+
import arrow
1011
from common_library.async_tools import cancel_wait_task
1112
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
1213

1314
from ..background_task import periodic
1415
from ._client import RedisClientSDK
1516
from ._constants import (
17+
DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
1618
DEFAULT_SEMAPHORE_TTL,
1719
DEFAULT_SOCKET_TIMEOUT,
1820
)
@@ -36,12 +38,14 @@ async def _managed_semaphore_execution(
3638
semaphore_key: str,
3739
ttl: datetime.timedelta,
3840
execution_context: str,
41+
expected_lock_overall_time: datetime.timedelta,
3942
) -> AsyncIterator:
4043
"""Common semaphore management logic with auto-renewal."""
4144
# Acquire the semaphore first
4245
if not await semaphore.acquire():
4346
raise SemaphoreAcquisitionError(name=semaphore_key, capacity=semaphore.capacity)
4447

48+
lock_acquisition_time = arrow.utcnow()
4549
try:
4650
# NOTE: Use TaskGroup for proper exception propagation, this ensures that in case of error the context manager will be properly exited
4751
# and the semaphore released.
@@ -100,6 +104,18 @@ async def _periodic_renewer() -> None:
100104
"Look for synchronous code that prevents refreshing the semaphore or asyncio loop overload.",
101105
)
102106
)
107+
finally:
108+
lock_release_time = arrow.utcnow()
109+
locking_time = lock_release_time - lock_acquisition_time
110+
if locking_time > expected_lock_overall_time:
111+
_logger.warning(
112+
"Semaphore '%s' was held for %s which is longer than expected (%s). "
113+
"TIP: consider reducing the locking time by optimizing the code inside "
114+
"the critical section or increasing the default locking time",
115+
semaphore_key,
116+
locking_time,
117+
expected_lock_overall_time,
118+
)
103119

104120

105121
def _create_semaphore(
@@ -142,6 +158,7 @@ def with_limited_concurrency(
142158
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
143159
blocking: bool = True,
144160
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
161+
expected_lock_overall_time: datetime.timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
145162
) -> Callable[
146163
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
147164
]:
@@ -159,6 +176,7 @@ def with_limited_concurrency(
159176
ttl: Time-to-live for semaphore entries (default: 5 minutes)
160177
blocking: Whether to block when semaphore is full (default: True)
161178
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
179+
expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
162180
163181
Example:
164182
@with_limited_concurrency(
@@ -194,7 +212,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
194212
)
195213

196214
async with _managed_semaphore_execution(
197-
semaphore, semaphore_key, ttl, f"coroutine_{coro.__name__}"
215+
semaphore,
216+
semaphore_key,
217+
ttl,
218+
f"coroutine_{coro.__name__}",
219+
expected_lock_overall_time,
198220
):
199221
return await coro(*args, **kwargs)
200222

@@ -211,6 +233,7 @@ def with_limited_concurrency_cm(
211233
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
212234
blocking: bool = True,
213235
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
236+
expected_lock_overall_time: datetime.timedelta = DEFAULT_EXPECTED_LOCK_OVERALL_TIME,
214237
) -> Callable[
215238
[Callable[P, AbstractAsyncContextManager[R]]],
216239
Callable[P, AbstractAsyncContextManager[R]],
@@ -229,6 +252,7 @@ def with_limited_concurrency_cm(
229252
ttl: Time-to-live for semaphore entries (default: 5 minutes)
230253
blocking: Whether to block when semaphore is full (default: True)
231254
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
255+
expected_lock_overall_time: helper for logging warnings if lock is held longer than expected
232256
233257
Example:
234258
@asynccontextmanager
@@ -266,7 +290,11 @@ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[R]:
266290

267291
async with (
268292
_managed_semaphore_execution(
269-
semaphore, semaphore_key, ttl, f"context_manager_{cm_func.__name__}"
293+
semaphore,
294+
semaphore_key,
295+
ttl,
296+
f"context_manager_{cm_func.__name__}",
297+
expected_lock_overall_time,
270298
),
271299
cm_func(*args, **kwargs) as value,
272300
):

packages/service-library/tests/rabbitmq/test_rabbitmq.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ async def _always_returning_fail(_: Any) -> bool:
314314

315315

316316
@pytest.mark.parametrize("topics", _TOPICS)
317-
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
317+
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors
318318
async def test_publish_with_no_registered_subscriber(
319319
on_message_spy: mock.Mock,
320320
create_rabbitmq_client: Callable[[str], RabbitMQClient],
@@ -476,7 +476,7 @@ def _raise_once_then_true(*args, **kwargs):
476476

477477
@pytest.fixture
478478
async def ensure_queue_deletion(
479-
create_rabbitmq_client: Callable[[str], RabbitMQClient]
479+
create_rabbitmq_client: Callable[[str], RabbitMQClient],
480480
) -> AsyncIterator[Callable[[QueueName], None]]:
481481
created_queues = set()
482482

@@ -723,7 +723,7 @@ async def test_rabbit_adding_topics_to_a_fanout_exchange(
723723
await _assert_message_received(mocked_message_parser, 0)
724724

725725

726-
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
726+
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors
727727
async def test_rabbit_not_using_the_same_exchange_type_raises(
728728
create_rabbitmq_client: Callable[[str], RabbitMQClient],
729729
random_exchange_name: Callable[[], str],
@@ -738,7 +738,7 @@ async def test_rabbit_not_using_the_same_exchange_type_raises(
738738
await client.subscribe(exchange_name, mocked_message_parser, topics=[])
739739

740740

741-
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
741+
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors
742742
async def test_unsubscribe_consumer(
743743
create_rabbitmq_client: Callable[[str], RabbitMQClient],
744744
random_exchange_name: Callable[[], str],

0 commit comments

Comments
 (0)