Skip to content

Commit d811720

Browse files
committed
Use Tornado's selector thread on Windows/asyncio/ProactorEventLoop
1 parent 0d7f721 commit d811720

File tree

4 files changed

+385
-14
lines changed

4 files changed

+385
-14
lines changed

ipykernel/_selector_thread.py

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
"""Ensure asyncio selector methods (add_reader, etc.) are available.
2+
Running select in a thread and defining these methods on the running event loop.
3+
Originally in tornado.platform.asyncio.
4+
Redistributed under license Apache-2.0
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import asyncio
10+
import atexit
11+
import errno
12+
import functools
13+
import select
14+
import socket
15+
import sys
16+
import threading
17+
import typing
18+
from typing import (
19+
Any,
20+
Callable,
21+
Union,
22+
)
23+
from weakref import WeakKeyDictionary
24+
25+
from sniffio import current_async_library
26+
27+
if typing.TYPE_CHECKING:
28+
from typing_extensions import Protocol
29+
30+
class _HasFileno(Protocol):
31+
def fileno(self) -> int:
32+
pass
33+
34+
_FileDescriptorLike = Union[int, _HasFileno]
35+
36+
37+
# Collection of selector thread event loops to shut down on exit.
38+
_selector_loops: set[SelectorThread] = set()
39+
40+
41+
def _atexit_callback() -> None:
42+
for loop in _selector_loops:
43+
with loop._select_cond:
44+
loop._closing_selector = True
45+
loop._select_cond.notify()
46+
try:
47+
loop._waker_w.send(b"a")
48+
except BlockingIOError:
49+
pass
50+
# If we don't join our (daemon) thread here, we may get a deadlock
51+
# during interpreter shutdown. I don't really understand why. This
52+
# deadlock happens every time in CI (both travis and appveyor) but
53+
# I've never been able to reproduce locally.
54+
assert loop._thread is not None
55+
loop._thread.join()
56+
_selector_loops.clear()
57+
58+
59+
atexit.register(_atexit_callback)
60+
61+
62+
# SelectorThread from tornado 6.4.0
63+
64+
65+
class SelectorThread:
66+
"""Define ``add_reader`` methods to be called in a background select thread.
67+
68+
Instances of this class start a second thread to run a selector.
69+
This thread is completely hidden from the user;
70+
all callbacks are run on the wrapped event loop's thread.
71+
72+
Typically used via ``AddThreadSelectorEventLoop``,
73+
but can be attached to a running asyncio loop.
74+
"""
75+
76+
_closed = False
77+
78+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
79+
self._real_loop = real_loop
80+
81+
self._select_cond = threading.Condition()
82+
self._select_args: tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None = None
83+
self._closing_selector = False
84+
self._thread: threading.Thread | None = None
85+
self._thread_manager_handle = self._thread_manager()
86+
87+
async def thread_manager_anext() -> None:
88+
# the anext builtin wasn't added until 3.10. We just need to iterate
89+
# this generator one step.
90+
await self._thread_manager_handle.__anext__()
91+
92+
# When the loop starts, start the thread. Not too soon because we can't
93+
# clean up if we get to this point but the event loop is closed without
94+
# starting.
95+
self._real_loop.call_soon(lambda: self._real_loop.create_task(thread_manager_anext()))
96+
97+
self._readers: dict[_FileDescriptorLike, Callable] = {}
98+
self._writers: dict[_FileDescriptorLike, Callable] = {}
99+
100+
# Writing to _waker_w will wake up the selector thread, which
101+
# watches for _waker_r to be readable.
102+
self._waker_r, self._waker_w = socket.socketpair()
103+
self._waker_r.setblocking(False)
104+
self._waker_w.setblocking(False)
105+
_selector_loops.add(self)
106+
self.add_reader(self._waker_r, self._consume_waker)
107+
108+
def close(self) -> None:
109+
if self._closed:
110+
return
111+
with self._select_cond:
112+
self._closing_selector = True
113+
self._select_cond.notify()
114+
self._wake_selector()
115+
if self._thread is not None:
116+
self._thread.join()
117+
_selector_loops.discard(self)
118+
self.remove_reader(self._waker_r)
119+
self._waker_r.close()
120+
self._waker_w.close()
121+
self._closed = True
122+
123+
async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
124+
# Create a thread to run the select system call. We manage this thread
125+
# manually so we can trigger a clean shutdown from an atexit hook. Note
126+
# that due to the order of operations at shutdown, only daemon threads
127+
# can be shut down in this way (non-daemon threads would require the
128+
# introduction of a new hook: https://bugs.python.org/issue41962)
129+
self._thread = threading.Thread(
130+
name="Tornado selector",
131+
daemon=True,
132+
target=self._run_select,
133+
)
134+
self._thread.pydev_do_not_trace = True
135+
self._thread.start()
136+
self._start_select()
137+
try:
138+
# The presense of this yield statement means that this coroutine
139+
# is actually an asynchronous generator, which has a special
140+
# shutdown protocol. We wait at this yield point until the
141+
# event loop's shutdown_asyncgens method is called, at which point
142+
# we will get a GeneratorExit exception and can shut down the
143+
# selector thread.
144+
yield
145+
except GeneratorExit:
146+
self.close()
147+
raise
148+
149+
def _wake_selector(self) -> None:
150+
if self._closed:
151+
return
152+
try:
153+
self._waker_w.send(b"a")
154+
except BlockingIOError:
155+
pass
156+
157+
def _consume_waker(self) -> None:
158+
try:
159+
self._waker_r.recv(1024)
160+
except BlockingIOError:
161+
pass
162+
163+
def _start_select(self) -> None:
164+
# Capture reader and writer sets here in the event loop
165+
# thread to avoid any problems with concurrent
166+
# modification while the select loop uses them.
167+
with self._select_cond:
168+
assert self._select_args is None
169+
self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
170+
self._select_cond.notify()
171+
172+
def _run_select(self) -> None:
173+
while True:
174+
with self._select_cond:
175+
while self._select_args is None and not self._closing_selector:
176+
self._select_cond.wait()
177+
if self._closing_selector:
178+
return
179+
assert self._select_args is not None
180+
to_read, to_write = self._select_args
181+
self._select_args = None
182+
183+
# We use the simpler interface of the select module instead of
184+
# the more stateful interface in the selectors module because
185+
# this class is only intended for use on windows, where
186+
# select.select is the only option. The selector interface
187+
# does not have well-documented thread-safety semantics that
188+
# we can rely on so ensuring proper synchronization would be
189+
# tricky.
190+
try:
191+
# On windows, selecting on a socket for write will not
192+
# return the socket when there is an error (but selecting
193+
# for reads works). Also select for errors when selecting
194+
# for writes, and merge the results.
195+
#
196+
# This pattern is also used in
197+
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
198+
rs, ws, xs = select.select(to_read, to_write, to_write)
199+
ws = ws + xs
200+
except OSError as e:
201+
# After remove_reader or remove_writer is called, the file
202+
# descriptor may subsequently be closed on the event loop
203+
# thread. It's possible that this select thread hasn't
204+
# gotten into the select system call by the time that
205+
# happens in which case (at least on macOS), select may
206+
# raise a "bad file descriptor" error. If we get that
207+
# error, check and see if we're also being woken up by
208+
# polling the waker alone. If we are, just return to the
209+
# event loop and we'll get the updated set of file
210+
# descriptors on the next iteration. Otherwise, raise the
211+
# original error.
212+
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
213+
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
214+
if rs:
215+
ws = []
216+
else:
217+
raise
218+
else:
219+
raise
220+
221+
try:
222+
self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
223+
except RuntimeError:
224+
# "Event loop is closed". Swallow the exception for
225+
# consistency with PollIOLoop (and logical consistency
226+
# with the fact that we can't guarantee that an
227+
# add_callback that completes without error will
228+
# eventually execute).
229+
pass
230+
except AttributeError:
231+
# ProactorEventLoop may raise this instead of RuntimeError
232+
# if call_soon_threadsafe races with a call to close().
233+
# Swallow it too for consistency.
234+
pass
235+
236+
def _handle_select(self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]) -> None:
237+
for r in rs:
238+
self._handle_event(r, self._readers)
239+
for w in ws:
240+
self._handle_event(w, self._writers)
241+
self._start_select()
242+
243+
def _handle_event(
244+
self,
245+
fd: _FileDescriptorLike,
246+
cb_map: dict[_FileDescriptorLike, Callable],
247+
) -> None:
248+
try:
249+
callback = cb_map[fd]
250+
except KeyError:
251+
return
252+
callback()
253+
254+
def add_reader(
255+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
256+
) -> None:
257+
self._readers[fd] = functools.partial(callback, *args)
258+
self._wake_selector()
259+
260+
def add_writer(
261+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
262+
) -> None:
263+
self._writers[fd] = functools.partial(callback, *args)
264+
self._wake_selector()
265+
266+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
267+
try:
268+
del self._readers[fd]
269+
except KeyError:
270+
return False
271+
self._wake_selector()
272+
return True
273+
274+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
275+
try:
276+
del self._writers[fd]
277+
except KeyError:
278+
return False
279+
self._wake_selector()
280+
return True
281+
282+
283+
# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0
284+
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
285+
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
286+
287+
Instances of this class start a second thread to run a selector.
288+
This thread is completely hidden from the user; all callbacks are
289+
run on the wrapped event loop's thread.
290+
291+
This class is used automatically by Tornado; applications should not need
292+
to refer to it directly.
293+
294+
It is safe to wrap any event loop with this class, although it only makes sense
295+
for event loops that do not implement the ``add_reader`` family of methods
296+
themselves (i.e. ``WindowsProactorEventLoop``)
297+
298+
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
299+
"""
300+
301+
# This class is a __getattribute__-based proxy. All attributes other than those
302+
# in this set are proxied through to the underlying loop.
303+
MY_ATTRIBUTES = {
304+
"_real_loop",
305+
"_selector",
306+
"add_reader",
307+
"add_writer",
308+
"close",
309+
"remove_reader",
310+
"remove_writer",
311+
}
312+
313+
def __getattribute__(self, name: str) -> Any:
314+
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
315+
return super().__getattribute__(name)
316+
return getattr(self._real_loop, name)
317+
318+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
319+
self._real_loop = real_loop
320+
self._selector = SelectorThread(real_loop)
321+
322+
def close(self) -> None:
323+
self._selector.close()
324+
self._real_loop.close()
325+
326+
def add_reader( # type: ignore[override]
327+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
328+
) -> None:
329+
return self._selector.add_reader(fd, callback, *args)
330+
331+
def add_writer( # type: ignore[override]
332+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
333+
) -> None:
334+
return self._selector.add_writer(fd, callback, *args)
335+
336+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
337+
return self._selector.remove_reader(fd)
338+
339+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
340+
return self._selector.remove_writer(fd)
341+
342+
343+
# registry of asyncio loop : selector thread
344+
_selectors: WeakKeyDictionary = WeakKeyDictionary()
345+
346+
347+
def _set_selector_windows() -> None:
348+
"""Set selector-compatible loop.
349+
Sets ``add_reader`` family of methods on the asyncio loop.
350+
Workaround Windows proactor removal of *reader methods.
351+
"""
352+
if not (
353+
sys.platform == "win32"
354+
and current_async_library() == "asyncio"
355+
and asyncio.get_event_loop_policy().__class__.__name__ == "WindowsProactorEventLoopPolicy"
356+
):
357+
return
358+
359+
asyncio_loop = asyncio.get_running_loop()
360+
if asyncio_loop in _selectors:
361+
return
362+
363+
selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract]
364+
asyncio_loop
365+
)
366+
367+
# patch loop.close to also close the selector thread
368+
loop_close = asyncio_loop.close
369+
370+
def _close_selector_and_loop() -> None:
371+
# restore original before calling selector.close,
372+
# which in turn calls eventloop.close!
373+
asyncio_loop.close = loop_close # type: ignore[method-assign]
374+
_selectors.pop(asyncio_loop, None)
375+
selector_loop.close()
376+
377+
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]
378+
asyncio_loop.add_reader = selector_loop.add_reader # type: ignore[assignment]
379+
asyncio_loop.remove_reader = selector_loop.remove_reader # type: ignore[method-assign]

0 commit comments

Comments
 (0)