Skip to content

Commit 4025160

Browse files
author
Anton
committed
fix: better semaphore
1 parent e4d00ba commit 4025160

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

taskiq/semaphore.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66

77

88
class DequeSemaphore:
9-
"""Custom deque based semaphore."""
9+
"""
10+
Custom deque based semaphore.
11+
12+
https://neopythonic.blogspot.com/2022/10/reasoning-about-asynciosemaphore.html
13+
"""
1014

1115
def __init__(self, value: int) -> None:
1216
self._value = value
@@ -27,7 +31,9 @@ def locked(self) -> bool:
2731
2832
:returns: true or false
2933
"""
30-
return self._value == 0
34+
return self._value == 0 or (
35+
any(not waiter.cancelled() for waiter in (self._waiters or ()))
36+
)
3137

3238
def release(self) -> None:
3339
"""Release a semaphore, incrementing the internal counter by one.
@@ -46,7 +52,7 @@ async def acquire(self, first: bool = False) -> Literal[True]: # noqa: C901
4652
:raises asyncio.exceptions.CancelledError: task cancelled
4753
:returns: true
4854
"""
49-
if not self.locked() and not self._waiters:
55+
if not self.locked():
5056
# No need to wait as the semaphore is not locked
5157
# and no one is waiting
5258
self._value -= 1
@@ -72,7 +78,7 @@ async def acquire(self, first: bool = False) -> Literal[True]: # noqa: C901
7278
self._wakeup_next()
7379
raise
7480

75-
if not self.locked():
81+
if self._value > 0:
7682
# This is required for strict FIFO ordering
7783
# otherwise it can cause starvation on the waiting tasks
7884
# The next loop iteration will wake up the task and switch

0 commit comments

Comments
 (0)