Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
415cb0a
ongoing fair queuing semaphore
sanderegg Sep 22, 2025
c65c0c8
copy from branch
sanderegg Sep 22, 2025
c2f6100
implementing fair queuing
sanderegg Sep 22, 2025
550f003
ongoing
sanderegg Sep 22, 2025
f711353
fixing code
sanderegg Sep 22, 2025
4feca5b
remove holder key
sanderegg Sep 22, 2025
5698788
revert
sanderegg Sep 22, 2025
409a173
fixes handling
sanderegg Sep 22, 2025
82a658a
adjusting
sanderegg Sep 22, 2025
4377bc3
handle blocking timeouts
sanderegg Sep 22, 2025
2ec306c
remove unused code
sanderegg Sep 22, 2025
0d69791
clean
sanderegg Sep 22, 2025
0a86174
clean
sanderegg Sep 22, 2025
85d7d52
ensure blocking semaphore works as expected
sanderegg Sep 22, 2025
0a0fb45
maybe
sanderegg Sep 22, 2025
86ac7f5
we are fair?
sanderegg Sep 22, 2025
0865b0b
cleanup
sanderegg Sep 22, 2025
d414233
cleanup
sanderegg Sep 22, 2025
0f9cbe2
cleanup
sanderegg Sep 22, 2025
41550f9
cleanup
sanderegg Sep 22, 2025
c5f3384
mypy
sanderegg Sep 22, 2025
8e1bfc3
ensure we do not wait too much
sanderegg Sep 22, 2025
055dd8c
sonar
sanderegg Sep 22, 2025
39ba892
improve testing
sanderegg Sep 22, 2025
ef9e8b0
ongoing
sanderegg Sep 22, 2025
d80df65
ongoing
sanderegg Sep 22, 2025
f4dc251
fixing test
sanderegg Sep 23, 2025
5a98fe0
adjust
sanderegg Sep 23, 2025
a189d9d
clean
sanderegg Sep 23, 2025
56d3611
minor
sanderegg Sep 23, 2025
5f4f043
check ttl expiration
sanderegg Sep 23, 2025
2b223a1
adjust test
sanderegg Sep 23, 2025
7698eda
refactor lua scripts
sanderegg Sep 23, 2025
3a3a725
minor
sanderegg Sep 23, 2025
4cef369
work with lost tokens
sanderegg Sep 23, 2025
46a0baa
release pushes the token back in
sanderegg Sep 23, 2025
c6079e5
release pushes the token back in
sanderegg Sep 23, 2025
be8b3ff
re-created context manager
sanderegg Sep 23, 2025
48a4eb4
ruff
sanderegg Sep 23, 2025
5745241
ongoing
sanderegg Sep 23, 2025
f7f8c86
rename
sanderegg Sep 23, 2025
738750f
minor
sanderegg Sep 23, 2025
5be7c64
getting there
sanderegg Sep 23, 2025
5acf5a2
getting there
sanderegg Sep 23, 2025
a9b0649
ensure we test it all
sanderegg Sep 23, 2025
73d783d
clean
sanderegg Sep 23, 2025
51b3268
fixed blocking and non-blocking
sanderegg Sep 23, 2025
42a31e0
change test
sanderegg Sep 23, 2025
62bc8f3
use underlying context manager
sanderegg Sep 23, 2025
063c452
ensure cancellation happens for real
sanderegg Sep 23, 2025
ba10d17
ok
sanderegg Sep 23, 2025
8736204
fixed variable not accessed
sanderegg Sep 23, 2025
2a3e7e8
found a problem
sanderegg Sep 23, 2025
44d16ae
should be fixed now
sanderegg Sep 23, 2025
bd8e191
clean
sanderegg Sep 23, 2025
c4d46a3
add some docs
sanderegg Sep 23, 2025
fc0c2d0
ruff
sanderegg Sep 23, 2025
e3e2afd
removed socket timeout on redis clients as this is a dangerous setting
sanderegg Sep 23, 2025
b0f4018
ensure we have auto expiry on keys
sanderegg Sep 23, 2025
b8f9b8f
ensure we keep docker up
sanderegg Sep 23, 2025
d39b298
ensure cancellation is done
sanderegg Sep 23, 2025
6933f97
missing declaration
sanderegg Sep 23, 2025
0dc0e93
mypy
sanderegg Sep 23, 2025
421cbfc
ensure changing capacity changes the key as well
sanderegg Sep 23, 2025
d063248
rename key
sanderegg Sep 23, 2025
c5d5b3d
clean the cleanup script
sanderegg Sep 23, 2025
84c7212
@pcrespov review: rename scripts
sanderegg Sep 23, 2025
4544179
use self call
sanderegg Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/celery-library/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tests: ## runs unit tests
--durations=10 \
--exitfirst \
--failed-first \
--keep-docker-up \
--pdb \
-vv \
$(CURDIR)/tests
Expand All @@ -41,6 +42,7 @@ tests-ci: ## runs unit tests
--cov-report=term-missing \
--cov-report=xml \
--junitxml=junit.xml -o junit_family=legacy \
--keep-docker-up \
--cov=celery_library \
--durations=10 \
--log-date-format="%Y-%m-%d %H:%M:%S" \
Expand Down
9 changes: 0 additions & 9 deletions packages/pytest-simcore/src/pytest_simcore/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import logging
from collections.abc import AsyncIterator
from datetime import timedelta

import pytest
import tenacity
Expand Down Expand Up @@ -116,14 +115,6 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
await client.aclose(close_connection_pool=True)


@pytest.fixture
def mock_redis_socket_timeout(mocker: MockerFixture) -> None:
# lowered to allow CI to properly shutdown RedisClientSDK instances
mocker.patch(
"servicelib.redis._client.DEFAULT_SOCKET_TIMEOUT", timedelta(seconds=0.25)
)


@pytest.fixture
async def use_in_memory_redis(mocker: MockerFixture) -> RedisSettings:
mocker.patch("redis.asyncio.from_url", FakeAsyncRedis)
Expand Down
15 changes: 9 additions & 6 deletions packages/service-library/src/servicelib/redis/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
DEFAULT_DECODE_RESPONSES,
DEFAULT_HEALTH_CHECK_INTERVAL,
DEFAULT_LOCK_TTL,
DEFAULT_SOCKET_TIMEOUT,
)

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,6 +48,7 @@ class RedisClientSDK:
_client: aioredis.Redis = field(init=False)
_task_health_check: Task | None = None
_started_event_task_health_check: asyncio.Event | None = None
_cancelled_event_task_health_check: asyncio.Event | None = None
_is_healthy: bool = False

@property
Expand All @@ -65,20 +65,24 @@ def __post_init__(self) -> None:
redis.exceptions.ConnectionError,
],
retry_on_timeout=True,
socket_timeout=DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_timeout=None, # NOTE: setting a timeout here can lead to issues with long running commands
encoding="utf-8",
decode_responses=self.decode_responses,
client_name=self.client_name,
)
self._is_healthy = False
self._started_event_task_health_check = asyncio.Event()
self._cancelled_event_task_health_check = asyncio.Event()

async def setup(self) -> None:
@periodic(interval=self.health_check_interval)
async def _periodic_check_health() -> None:
assert self._started_event_task_health_check # nosec
assert self._cancelled_event_task_health_check # nosec
self._started_event_task_health_check.set()
self._is_healthy = await self.ping()
if self._cancelled_event_task_health_check.is_set():
raise asyncio.CancelledError

self._task_health_check = asyncio.create_task(
_periodic_check_health(),
Expand All @@ -100,10 +104,9 @@ async def shutdown(self) -> None:
if self._task_health_check:
assert self._started_event_task_health_check # nosec
await self._started_event_task_health_check.wait()

await cancel_wait_task(
self._task_health_check, max_delay=_HEALTHCHECK_TIMEOUT_S
)
assert self._cancelled_event_task_health_check # nosec
self._cancelled_event_task_health_check.set()
await cancel_wait_task(self._task_health_check, max_delay=None)

await self._client.aclose(close_connection_pool=True)

Expand Down
5 changes: 3 additions & 2 deletions packages/service-library/src/servicelib/redis/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
seconds=30
)
DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)

DEFAULT_SEMAPHORE_BLOCK_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(
seconds=30
)
DEFAULT_SEMAPHORE_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
SEMAPHORE_KEY_PREFIX: Final[str] = "semaphores:"
SEMAPHORE_HOLDER_KEY_PREFIX: Final[str] = "semaphores:holders:"

DEFAULT_DECODE_RESPONSES: Final[bool] = True
DEFAULT_HEALTH_CHECK_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5)
Expand Down
20 changes: 15 additions & 5 deletions packages/service-library/src/servicelib/redis/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,23 @@ class LockLostError(BaseRedisError):
ProjectLockError: TypeAlias = redis.exceptions.LockError # NOTE: backwards compatible


class SemaphoreAcquisitionError(BaseRedisError):
msg_template: str = "Could not acquire semaphore '{name}' (capacity: {capacity})"
class SemaphoreError(BaseRedisError):
msg_template: str = (
"Unexpected error with semaphore '{name}' by this instance `{instance_id}`"
)


class SemaphoreAcquisitionError(SemaphoreError):
msg_template: str = (
"Could not acquire semaphore '{name}' by this instance `{instance_id}`"
)


class SemaphoreNotAcquiredError(BaseRedisError):
msg_template: str = "Semaphore '{name}' was not acquired by this instance"
class SemaphoreNotAcquiredError(SemaphoreError):
msg_template: str = (
"Semaphore '{name}' was not acquired by this instance `{instance_id}`"
)


class SemaphoreLostError(BaseRedisError):
class SemaphoreLostError(SemaphoreError):
msg_template: str = "Semaphore '{name}' was lost by this instance `{instance_id}`"
Loading
Loading