Skip to content

Commit e838279

Browse files
committed
Merge branch 'enh/uniform-chats' of github.com:odeimaiz/osparc-simcore into enh/uniform-chats
2 parents a22c31e + b38249d commit e838279

File tree

28 files changed

+1118
-697
lines changed

28 files changed

+1118
-697
lines changed

packages/celery-library/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ tests: ## runs unit tests
2727
--durations=10 \
2828
--exitfirst \
2929
--failed-first \
30+
--keep-docker-up \
3031
--pdb \
3132
-vv \
3233
$(CURDIR)/tests
@@ -41,6 +42,7 @@ tests-ci: ## runs unit tests
4142
--cov-report=term-missing \
4243
--cov-report=xml \
4344
--junitxml=junit.xml -o junit_family=legacy \
45+
--keep-docker-up \
4446
--cov=celery_library \
4547
--durations=10 \
4648
--log-date-format="%Y-%m-%d %H:%M:%S" \

packages/pytest-simcore/src/pytest_simcore/redis_service.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import logging
66
from collections.abc import AsyncIterator
7-
from datetime import timedelta
87

98
import pytest
109
import tenacity
@@ -116,14 +115,6 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
116115
await client.aclose(close_connection_pool=True)
117116

118117

119-
@pytest.fixture
120-
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
121-
# lowered to allow CI to properly shutdown RedisClientSDK instances
122-
mocker.patch(
123-
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25)
124-
)
125-
126-
127118
@pytest.fixture
128119
async def use_in_memory_redis(mocker: MockerFixture) -> RedisSettings:
129120
mocker.patch("redis.asyncio.from_url", FakeAsyncRedis)

packages/service-library/src/servicelib/redis/_client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
DEFAULT_DECODE_RESPONSES,
2121
DEFAULT_HEALTH_CHECK_INTERVAL,
2222
DEFAULT_LOCK_TTL,
23-
DEFAULT_SOCKET_TIMEOUT,
2423
)
2524

2625
_logger = logging.getLogger(__name__)
@@ -49,6 +48,7 @@ class RedisClientSDK:
4948
_client: aioredis.Redis = field(init=False)
5049
_task_health_check: Task | None = None
5150
_started_event_task_health_check: asyncio.Event | None = None
51+
_cancelled_event_task_health_check: asyncio.Event | None = None
5252
_is_healthy: bool = False
5353

5454
@property
@@ -65,20 +65,24 @@ def __post_init__(self) -> None:
6565
redis.exceptions.ConnectionError,
6666
],
6767
retry_on_timeout=True,
68-
socket_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(),
68+
socket_timeout=None, # NOTE: setting a timeout here can lead to issues with long running commands
6969
encoding="utf-8",
7070
decode_responses=self.decode_responses,
7171
client_name=self.client_name,
7272
)
7373
self._is_healthy = False
7474
self._started_event_task_health_check = asyncio.Event()
75+
self._cancelled_event_task_health_check = asyncio.Event()
7576

7677
async def setup(self) -> None:
7778
@periodic(interval=self.health_check_interval)
7879
async def _periodic_check_health() -> None:
7980
assert self._started_event_task_health_check # nosec
81+
assert self._cancelled_event_task_health_check # nosec
8082
self._started_event_task_health_check.set()
8183
self._is_healthy = await self.ping()
84+
if self._cancelled_event_task_health_check.is_set():
85+
raise asyncio.CancelledError
8286

8387
self._task_health_check = asyncio.create_task(
8488
_periodic_check_health(),
@@ -100,10 +104,9 @@ async def shutdown(self) -> None:
100104
if self._task_health_check:
101105
assert self._started_event_task_health_check # nosec
102106
await self._started_event_task_health_check.wait()
103-
104-
await cancel_wait_task(
105-
self._task_health_check, max_delay=_HEALTHCHECK_TIMEOUT_S
106-
)
107+
assert self._cancelled_event_task_health_check # nosec
108+
self._cancelled_event_task_health_check.set()
109+
await cancel_wait_task(self._task_health_check, max_delay=None)
107110

108111
await self._client.aclose(close_connection_pool=True)
109112

packages/service-library/src/servicelib/redis/_constants.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
seconds=30
88
)
99
DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
10-
DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)
1110

11+
DEFAULT_SEMAPHORE_BLOCK_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(
12+
seconds=30
13+
)
1214
DEFAULT_SEMAPHORE_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
1315
SEMAPHORE_KEY_PREFIX: Final[str] = "semaphores:"
14-
SEMAPHORE_HOLDER_KEY_PREFIX: Final[str] = "semaphores:holders:"
1516

1617
DEFAULT_DECODE_RESPONSES: Final[bool] = True
1718
DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5)

packages/service-library/src/servicelib/redis/_errors.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,23 @@ class LockLostError(BaseRedisError):
2626
ProjectLockError: TypeAlias = redis.exceptions.LockError # NOTE: backwards compatible
2727

2828

29-
class SemaphoreAcquisitionError(BaseRedisError):
30-
msg_template: str = "Could not acquire semaphore '{name}' (capacity: {capacity})"
29+
class SemaphoreError(BaseRedisError):
30+
msg_template: str = (
31+
"Unexpected error with semaphore '{name}' by this instance `{instance_id}`"
32+
)
33+
34+
35+
class SemaphoreAcquisitionError(SemaphoreError):
36+
msg_template: str = (
37+
"Could not acquire semaphore '{name}' by this instance `{instance_id}`"
38+
)
3139

3240

33-
class SemaphoreNotAcquiredError(BaseRedisError):
34-
msg_template: str = "Semaphore '{name}' was not acquired by this instance"
41+
class SemaphoreNotAcquiredError(SemaphoreError):
42+
msg_template: str = (
43+
"Semaphore '{name}' was not acquired by this instance `{instance_id}`"
44+
)
3545

3646

37-
class SemaphoreLostError(BaseRedisError):
47+
class SemaphoreLostError(SemaphoreError):
3848
msg_template: str = "Semaphore '{name}' was lost by this instance `{instance_id}`"

0 commit comments

Comments
 (0)