Skip to content

Commit 81e90f4

Browse files
committed
Modify lock internals
- There's a lot of internal Cpython behavior attached to why the specific yields here are needed. I need to go through it in depth later so that I can be more confident in the solution prior to a public export
1 parent ec68cb2 commit 81e90f4

File tree

2 files changed

+60
-22
lines changed

2 files changed

+60
-22
lines changed

_misc/_l.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
import random
3+
import time
4+
5+
from async_utils._simple_lock import AsyncLock # noqa: PLC2701
6+
from async_utils.bg_loop import threaded_loop
7+
8+
9+
async def check(lock: AsyncLock):
10+
async with lock:
11+
v = random.random()
12+
s = time.monotonic()
13+
await asyncio.sleep(v)
14+
e = time.monotonic()
15+
print(s, v, e, flush=True) # noqa: T201
16+
17+
18+
async def amain():
19+
lock = AsyncLock()
20+
with threaded_loop() as tl1, threaded_loop() as tl2:
21+
tsks = {loop.run(check(lock)) for loop in (tl1, tl2) for _ in range(10)}
22+
await asyncio.gather(*tsks)
23+
24+
25+
if __name__ == "__main__":
26+
asyncio.run(amain())

src/async_utils/_simple_lock.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,19 @@ def __init_subclass__(cls) -> t.Never:
3333

3434
def __init__(self) -> None:
3535
self._waiters: deque[cf.Future[None]] = deque()
36+
self._lockv: bool = False
3637
self._internal_lock: threading.RLock = threading.RLock()
37-
self._locked: bool = False
3838

39-
async def __aenter__(self, /) -> None:
39+
def __locked(self) -> bool:
4040
with self._internal_lock:
41-
if not self._locked and (all(w.cancelled() for w in self._waiters)):
42-
self._locked = True
41+
return self._lockv or (any(not w.cancelled() for w in (self._waiters)))
42+
43+
async def __aenter__(self) -> None:
44+
await asyncio.sleep(0) # This yield is non-optional.
45+
46+
with self._internal_lock:
47+
if not self.__locked():
48+
self._lockv = True
4349
return
4450

4551
fut: cf.Future[None] = cf.Future()
@@ -49,27 +55,33 @@ async def __aenter__(self, /) -> None:
4955

5056
try:
5157
await asyncio.wrap_future(fut)
52-
except (asyncio.CancelledError, cf.CancelledError):
53-
with self._internal_lock:
54-
if self._locked:
55-
self._maybe_wake()
58+
except asyncio.CancelledError:
59+
if fut.done() and not fut.cancelled():
60+
self._lockv = False
61+
raise
62+
5663
finally:
57-
self._waiters.remove(fut)
64+
self._maybe_wake()
65+
return
5866

59-
async def __aexit__(self, *_dont_care: object) -> t.Literal[False]:
67+
def _maybe_wake(self) -> None:
6068
with self._internal_lock:
61-
if self._locked:
62-
self._locked = False
63-
self._maybe_wake()
69+
while (not self._lockv) and self._waiters:
70+
next_waiter = self._waiters.popleft()
6471

65-
return False
72+
if not (next_waiter.done() or next_waiter.cancelled()):
73+
self._lockv = True
74+
next_waiter.set_result(None)
6675

67-
def _maybe_wake(self) -> None:
76+
while self._waiters:
77+
next_waiter = self._waiters.popleft()
78+
if not (next_waiter.done() or next_waiter.cancelled()):
79+
self._waiters.appendleft(next_waiter)
80+
break
81+
82+
async def __aexit__(self, *dont_care: object) -> t.Literal[False]:
83+
await asyncio.sleep(0) # this yield is not optional
6884
with self._internal_lock:
69-
if self._waiters:
70-
try:
71-
fut = next(iter(self._waiters))
72-
except StopIteration:
73-
return
74-
if not fut.done():
75-
fut.set_result(None)
85+
self._lockv = False
86+
self._maybe_wake()
87+
return False

0 commit comments

Comments
 (0)