Skip to content

Commit 7eef365

Browse files
committed
Start adding N:M concurrency building blocks
1 parent a5eeb67 commit 7eef365

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Copyright 2020-present Michael Hall
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This used to include CPython code, some minor performance losses have been
16+
# taken to not tightly include upstream code
17+
"""Building blocks used to create thread-safe multi-event loop async things."""
18+
19+
from __future__ import annotations
20+
21+
import asyncio
22+
import concurrent.futures as cf
23+
import threading
24+
from collections import deque
25+
from collections.abc import Callable, Generator
26+
27+
from . import _typings as t
28+
29+
30+
class _NMWaiter:
31+
# This is for internal use only, tested on both 3.12 and 3.13
32+
# This will be tested for 3.14 prior to 3.14's release.
33+
__slots__ = ("_future", "_val")
34+
35+
def __init__(self, val: int = 1, /) -> None:
36+
self._future: cf.Future[None] = (
37+
cf.Future()
38+
) # This is why this is for internal use only
39+
self._val = val
40+
41+
def __lt__(self, other: object) -> bool:
42+
if isinstance(other, _NMWaiter):
43+
return self._val < other._val
44+
return NotImplemented
45+
46+
def __init_subclass__(cls) -> t.Never:
47+
msg = "Don't subclass this"
48+
raise RuntimeError(msg)
49+
50+
__final__ = True
51+
52+
def __await__(self) -> Generator[t.Any, None, None]:
53+
return asyncio.wrap_future(self._future).__await__()
54+
55+
def wake(self) -> None:
56+
if not self._future.done():
57+
try:
58+
self._future.set_result(None) # This is why this is for internal use only
59+
except cf.InvalidStateError:
60+
# Race condition possible with multiple attempts to wake
61+
# Racing in some cases is less expensive than locking in all
62+
pass
63+
64+
def cancelled(self):
65+
return self._future.cancelled()
66+
67+
def done(self):
68+
return self._future.done()
69+
70+
def add_done_callback(self, cb: Callable[[cf.Future[None]], None]) -> None:
71+
self._future.add_done_callback(cb)
72+
73+
74+
class _WrappedRLock:
75+
__slots__ = ("_lock",)
76+
77+
def __init__(self, lock: threading.RLock) -> None:
78+
self._lock = lock
79+
80+
async def __aenter__(self) -> None:
81+
acquired = False
82+
while not acquired:
83+
acquired = self._lock.acquire(blocking=False)
84+
await asyncio.sleep(0)
85+
86+
async def __aexit__(self, *dont_care: object) -> None:
87+
self._lock.release()
88+
89+
90+
class NMSemaphore:
91+
__final__ = True
92+
__slots__ = ("_lock", "_value", "_waiters")
93+
94+
def __init_subclass__(cls) -> t.Never:
95+
msg = "Don't subclass this"
96+
raise RuntimeError(msg)
97+
98+
def __init__(self, value: int = 1, /) -> None:
99+
self._waiters: deque[_NMWaiter] = deque()
100+
self._lock = _WrappedRLock(threading.RLock())
101+
self._value: int = value
102+
103+
def __repr__(self) -> str:
104+
res = super().__repr__()
105+
extra = "locked" if self.__locked() else f"unlocked, value:{self._value}"
106+
if self._waiters:
107+
extra = f"{extra}, waiters:{len(self._waiters)}"
108+
return f"<{res[1:-1]} [{extra}]>"
109+
110+
def __locked(self) -> bool:
111+
return self._value == 0 or (any(not w.cancelled() for w in (self._waiters or ())))
112+
113+
async def __aexit__(self, *dont_care: object) -> None:
114+
async with self._lock:
115+
self._value += 1
116+
await self._async_maybe_wake()
117+
118+
async def __aenter__(self) -> None:
119+
async with self._lock:
120+
if not self.__locked():
121+
self._value -= 1
122+
return
123+
124+
waiter = _NMWaiter()
125+
self._waiters.append(waiter)
126+
127+
try:
128+
await waiter
129+
except asyncio.CancelledError:
130+
if waiter.done() and not waiter.cancelled():
131+
self._value += 1
132+
raise
133+
134+
finally:
135+
await self._async_maybe_wake()
136+
137+
async def _async_maybe_wake(self) -> None:
138+
async with self._lock:
139+
while self._value > 0 and self._waiters:
140+
next_waiter = self._waiters.popleft()
141+
142+
if not (next_waiter.done() or next_waiter.cancelled()):
143+
self._value -= 1
144+
next_waiter.wake()
145+
146+
while self._waiters:
147+
next_waiter = self._waiters.popleft()
148+
if not (next_waiter.done() or next_waiter.cancelled()):
149+
self._waiters.appendleft(next_waiter)
150+
break

0 commit comments

Comments
 (0)