Skip to content

Commit 3195f6c

Browse files
committed
direct access to semaphore without decorator prohibited
1 parent 023c19c commit 3195f6c

File tree

4 files changed

+201
-186
lines changed

4 files changed

+201
-186
lines changed

packages/service-library/src/servicelib/redis/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
from ._semaphore import (
2222
SemaphoreAcquisitionError,
2323
SemaphoreNotAcquiredError,
24-
with_limited_concurrency,
2524
)
25+
from ._semaphore_decorator import with_limited_concurrency
2626
from ._utils import handle_redis_returns_union_types
2727

2828
__all__: tuple[str, ...] = (

packages/service-library/src/servicelib/redis/_semaphore.py

Lines changed: 2 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
import asyncio
22
import datetime
3-
import functools
43
import logging
5-
import socket
64
import uuid
7-
from collections.abc import Callable, Coroutine
85
from types import TracebackType
9-
from typing import Annotated, Any, ParamSpec, TypeVar
6+
from typing import Annotated
107

118
from pydantic import (
129
BaseModel,
@@ -17,8 +14,6 @@
1714
field_validator,
1815
)
1916

20-
from ..background_task import periodic
21-
from ..logging_errors import create_troubleshootting_log_kwargs
2217
from ..logging_utils import log_catch
2318
from ._client import RedisClientSDK
2419
from ._constants import (
@@ -32,31 +27,9 @@
3227
_logger = logging.getLogger(__name__)
3328

3429

35-
async def _renew_semaphore_entry(semaphore: "DistributedSemaphore") -> None:
36-
"""
37-
Manually renew a semaphore entry by updating its timestamp and TTL.
38-
39-
This function is intended to be called by decorators or external renewal mechanisms.
40-
41-
Args:
42-
semaphore: The semaphore instance to renew
43-
44-
Raises:
45-
Exception: If the renewal operation fails
46-
"""
47-
48-
current_time = asyncio.get_event_loop().time()
49-
ttl_seconds = semaphore.ttl.total_seconds()
50-
51-
# Update timestamp in sorted set and refresh holder key
52-
async with semaphore.redis_client.redis.pipeline(transaction=True) as pipe:
53-
await pipe.zadd(semaphore.semaphore_key, {semaphore.instance_id: current_time})
54-
await pipe.expire(semaphore.holder_key, int(ttl_seconds))
55-
await pipe.execute()
56-
57-
5830
class DistributedSemaphore(BaseModel):
5931
"""
32+
Warning: This should only be used directly via the decorator
6033
A distributed semaphore implementation using Redis.
6134
6235
This semaphore allows limiting the number of concurrent operations across
@@ -283,155 +256,3 @@ async def __aexit__(
283256
) -> None:
284257
if self._acquired:
285258
await self.release()
286-
287-
288-
P = ParamSpec("P")
289-
R = TypeVar("R")
290-
291-
292-
def with_limited_concurrency(
293-
redis_client: RedisClientSDK | Callable[..., RedisClientSDK],
294-
*,
295-
key: str | Callable[..., str],
296-
capacity: int | Callable[..., int],
297-
ttl: datetime.timedelta = DEFAULT_SEMAPHORE_TTL,
298-
blocking: bool = True,
299-
blocking_timeout: datetime.timedelta | None = DEFAULT_SOCKET_TIMEOUT,
300-
) -> Callable[
301-
[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
302-
]:
303-
"""
304-
Decorator to limit concurrent execution of a function using a distributed semaphore.
305-
306-
This decorator ensures that only a specified number of instances of the decorated
307-
function can run concurrently across multiple processes/instances using Redis
308-
as the coordination backend.
309-
310-
Args:
311-
redis_client: Redis client for coordination (can be callable)
312-
key: Unique identifier for the semaphore (can be callable)
313-
capacity: Maximum number of concurrent executions (can be callable)
314-
ttl: Time-to-live for semaphore entries (default: 5 minutes)
315-
blocking: Whether to block when semaphore is full (default: True)
316-
blocking_timeout: Maximum time to wait when blocking (default: socket timeout)
317-
318-
Example:
319-
@with_limited_concurrency(
320-
redis_client,
321-
key=f"{user_id}-{wallet_id}",
322-
capacity=20,
323-
blocking=True,
324-
blocking_timeout=None
325-
)
326-
async def process_user_wallet(user_id: str, wallet_id: str):
327-
# Only 20 instances of this function can run concurrently
328-
# for the same user_id-wallet_id combination
329-
await do_processing()
330-
331-
Raises:
332-
SemaphoreAcquisitionError: If semaphore cannot be acquired and blocking=True
333-
"""
334-
335-
def _decorator(
336-
coro: Callable[P, Coroutine[Any, Any, R]],
337-
) -> Callable[P, Coroutine[Any, Any, R]]:
338-
@functools.wraps(coro)
339-
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
340-
# Resolve callable parameters
341-
semaphore_key = key(*args, **kwargs) if callable(key) else key
342-
semaphore_capacity = (
343-
capacity(*args, **kwargs) if callable(capacity) else capacity
344-
)
345-
client = (
346-
redis_client(*args, **kwargs)
347-
if callable(redis_client)
348-
else redis_client
349-
)
350-
351-
assert isinstance(semaphore_key, str) # nosec
352-
assert isinstance(semaphore_capacity, int) # nosec
353-
assert isinstance(client, RedisClientSDK) # nosec
354-
355-
# Create the semaphore (without auto-renewal)
356-
semaphore = DistributedSemaphore(
357-
redis_client=client,
358-
key=semaphore_key,
359-
capacity=semaphore_capacity,
360-
ttl=ttl,
361-
blocking=blocking,
362-
blocking_timeout=blocking_timeout,
363-
)
364-
365-
# Acquire the semaphore first
366-
if not await semaphore.acquire():
367-
raise SemaphoreAcquisitionError(
368-
name=semaphore_key, capacity=semaphore_capacity
369-
)
370-
371-
try:
372-
# Use TaskGroup for proper exception propagation (similar to exclusive decorator)
373-
async with asyncio.TaskGroup() as tg:
374-
started_event = asyncio.Event()
375-
376-
# Create auto-renewal task
377-
@periodic(interval=ttl / 3, raise_on_error=True)
378-
async def _periodic_renewer() -> None:
379-
await _renew_semaphore_entry(semaphore)
380-
started_event.set()
381-
382-
# Start the renewal task
383-
renewal_task = tg.create_task(
384-
_periodic_renewer(),
385-
name=f"semaphore/autorenewal_{semaphore_key}_{semaphore.instance_id}",
386-
)
387-
388-
# Wait for first renewal to complete (ensures task is running)
389-
await started_event.wait()
390-
391-
# Run the user work
392-
work_task = tg.create_task(
393-
coro(*args, **kwargs),
394-
name=f"semaphore/work_{coro.__module__}.{coro.__name__}",
395-
)
396-
397-
result = await work_task
398-
399-
# Cancel renewal task (work is done)
400-
renewal_task.cancel()
401-
402-
return result
403-
404-
except BaseExceptionGroup as eg:
405-
# Handle exceptions similar to exclusive decorator
406-
# If renewal fails, the TaskGroup will propagate the exception
407-
# and cancel the work task automatically
408-
409-
# Re-raise the first exception in the group
410-
if eg.exceptions:
411-
raise eg.exceptions[0] from eg
412-
raise
413-
414-
finally:
415-
# Always release the semaphore
416-
if semaphore.is_acquired():
417-
try:
418-
await semaphore.release()
419-
except SemaphoreNotAcquiredError as exc:
420-
_logger.exception(
421-
**create_troubleshootting_log_kwargs(
422-
"Unexpected error while releasing semaphore",
423-
error=exc,
424-
error_context={
425-
"semaphore_key": semaphore_key,
426-
"client_name": client.client_name,
427-
"hostname": socket.gethostname(),
428-
"coroutine": coro.__name__,
429-
},
430-
tip="This might happen if the semaphore was lost before releasing it. "
431-
"Look for synchronous code that prevents refreshing the semaphore or asyncio loop overload.",
432-
)
433-
)
434-
435-
return _wrapper
436-
437-
return _decorator

0 commit comments

Comments
 (0)