Skip to content

Commit ad80ca5

Browse files
committed
Ensure heapq use is safe under freethreading
1 parent ab33abd commit ad80ca5

File tree

5 files changed

+60
-22
lines changed

5 files changed

+60
-22
lines changed

src/async_utils/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
__author__ = "Michael Hall"
1010
__license__ = "Apache-2.0"
1111
__copyright__ = "Copyright 2020-Present Michael Hall"
12-
__version__ = "2025.06.05b"
12+
__version__ = "2025.06.07b"
1313

1414
import os
1515
import sys

src/async_utils/_qs.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import threading
2121
from collections import deque
2222
from collections.abc import Generator
23+
24+
# PYUPDATE: 3.14 release + 3.14 minimum: reaudit
25+
# heapq methods are not threadsafe pre 3.14
26+
# see: GH: cpython 135036
2327
from heapq import heappop, heappush
2428

2529
from . import _typings as t
@@ -566,7 +570,7 @@ def _get(self, /) -> T:
566570
class PriorityQueue[T](BaseQueue[T]):
567571
"""A thread-safe queue with both sync and async access methods."""
568572

569-
__slots__ = ("_data",)
573+
__slots__ = ("_data", "_lock")
570574

571575
def __init_subclass__(cls) -> t.Never:
572576
msg = "Don't subclass this"
@@ -577,6 +581,8 @@ def __init_subclass__(cls) -> t.Never:
577581
def __init__(self, /, maxsize: int | None = None) -> None:
578582
super().__init__(maxsize)
579583
self._data: list[T] = []
584+
# heapq not threadsafe till 3.14
585+
self._lock = threading.RLock()
580586

581587
def _qsize(self, /) -> int:
582588
return len(self._data)
@@ -585,7 +591,9 @@ def _items(self, /) -> list[T]:
585591
return self._data.copy()
586592

587593
def _put(self, item: T, /) -> None:
588-
heappush(self._data, item)
594+
with self._lock:
595+
heappush(self._data, item)
589596

590597
def _get(self, /) -> T:
591-
return heappop(self._data)
598+
with self._lock:
599+
return heappop(self._data)

src/async_utils/lockout.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616

1717
import asyncio
1818
import concurrent.futures as cf
19+
20+
# PYUPDATE: 3.14 release + 3.14 minimum: reaudit
21+
# heapq methods are not threadsafe pre 3.14
22+
# see: GH: cpython 135036
1923
import heapq
24+
import threading
2025
import time
2126
from collections import deque
2227
from functools import partial
@@ -76,6 +81,7 @@ def __repr__(self) -> str:
7681

7782
def __init__(self) -> None:
7883
self._lockouts: list[float] = []
84+
self._internal_lock: threading.RLock = threading.RLock()
7985

8086
def lockout_for(self, seconds: float, /) -> None:
8187
"""Lock a resource for an amount of time."""
@@ -86,9 +92,13 @@ async def __aenter__(self) -> None:
8692
now = time.monotonic()
8793
# There must not be an async context switch between here
8894
# and replacing the lockout when lockout is in the future
89-
ts = heapq.heappop(self._lockouts)
90-
if (sleep_for := ts - now) > 0:
91-
heapq.heappush(self._lockouts, ts)
95+
# PYUPDATE: The lock here can be removed at 3.14 minimum
96+
with self._internal_lock:
97+
ts = heapq.heappop(self._lockouts)
98+
if (sleep_for := ts - now) > 0:
99+
heapq.heappush(self._lockouts, ts)
100+
101+
if sleep_for > 0:
92102
await asyncio.sleep(sleep_for)
93103

94104
async def __aexit__(self, *_dont_care: object) -> None:

src/async_utils/lru.py

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616

1717
from __future__ import annotations
1818

19+
# PYUPDATE: 3.14 release + 3.14 minimum: reaudit
20+
# heapq methods are not threadsafe pre 3.14
21+
# see: GH: cpython 135036
1922
import heapq
2023
import math
24+
import threading
2125
import time
2226

2327
from . import _typings as t
@@ -191,7 +195,14 @@ class TTLLRU[K, V]:
191195
Getting items does not refresh their ttl.
192196
"""
193197

194-
__slots__ = ("_cache", "_expirations", "_maxsize", "_smooth", "_ttl")
198+
__slots__ = (
199+
"_cache",
200+
"_expirations",
201+
"_internal_lock",
202+
"_maxsize",
203+
"_smooth",
204+
"_ttl",
205+
)
195206

196207
def __init_subclass__(cls) -> t.Never:
197208
msg = "Don't subclass this"
@@ -205,29 +216,31 @@ def __init__(self, maxsize: int, ttl: float) -> None:
205216
self._ttl: float = ttl
206217
self._expirations: list[tuple[float, K]] = []
207218
self._smooth: int = max(int(math.log2(maxsize // 2)), 1)
219+
self._internal_lock: threading.RLock = threading.RLock()
208220

209221
def _remove_some_expired(self) -> None:
210222
"""Remove some number of expired entries."""
211223
now = time.monotonic()
212224
tr = max((len(self._expirations) - self._maxsize) >> self._smooth, 2)
213225

214226
while self._expirations and tr > 0:
215-
try:
216-
ts, k = heapq.heappop(self._expirations)
217-
except IndexError:
218-
continue
219-
220-
if ts < now:
221-
tr -= 1
227+
with self._internal_lock:
222228
try:
223-
ts, _v = self._cache[k]
224-
except KeyError:
229+
ts, k = heapq.heappop(self._expirations)
230+
except IndexError:
225231
continue
232+
226233
if ts < now:
227-
self._cache.pop(k, None)
228-
else:
229-
heapq.heappush(self._expirations, (ts, k))
230-
break
234+
tr -= 1
235+
try:
236+
ts, _v = self._cache[k]
237+
except KeyError:
238+
continue
239+
if ts < now:
240+
self._cache.pop(k, None)
241+
else:
242+
heapq.heappush(self._expirations, (ts, k))
243+
break
231244

232245
# needed because in the absence of __iter__ or __contains__ python will
233246
# attempt iteration/containment checks via __getitem__
@@ -251,7 +264,8 @@ def __getitem__(self, key: K, /) -> V:
251264

252265
def __setitem__(self, key: K, value: V, /) -> None:
253266
ts = time.monotonic() + self._ttl
254-
heapq.heappush(self._expirations, (ts, key))
267+
with self._internal_lock:
268+
heapq.heappush(self._expirations, (ts, key))
255269
self._cache[key] = (ts, value)
256270
self._remove_some_expired()
257271
if len(self._cache) > self._maxsize:

src/async_utils/priority_sem.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717

1818
import asyncio
1919
import contextvars
20+
21+
# PYUPDATE: 3.14 release + 3.14 minimum: reaudit
22+
# heapq methods are not threadsafe pre 3.14
23+
# see: GH: cpython 135036
24+
# not currently affected, needs audit when making these loop-agnostic
2025
import heapq
2126
import threading
2227
from collections.abc import Generator
@@ -110,6 +115,7 @@ def __init_subclass__(cls) -> t.Never:
110115

111116
__final__ = True
112117

118+
# When making this loop-agnostic later, heapq isn't threadsafe
113119
_loop: asyncio.AbstractEventLoop | None = None
114120

115121
def __init__(self, value: int = 1) -> None:

0 commit comments

Comments
 (0)