Skip to content

Commit 299a759

Browse files
committed
Use Tornado's selector thread on Windows/asyncio/ProactorEventLoop
1 parent 0d7f721 commit 299a759

File tree

3 files changed

+393
-11
lines changed

3 files changed

+393
-11
lines changed

ipykernel/_selector_thread.py

Lines changed: 388 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,388 @@
1+
"""Ensure asyncio selector methods (add_reader, etc.) are available.
2+
Running select in a thread and defining these methods on the running event loop.
3+
Originally in tornado.platform.asyncio.
4+
Redistributed under license Apache-2.0
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import asyncio
10+
import atexit
11+
import errno
12+
import functools
13+
import select
14+
import socket
15+
import sys
16+
import threading
17+
import typing
18+
from typing import (
19+
Any,
20+
Callable,
21+
Union,
22+
)
23+
from weakref import WeakKeyDictionary
24+
25+
from sniffio import current_async_library
26+
27+
if typing.TYPE_CHECKING:
28+
from typing_extensions import Protocol
29+
30+
class _HasFileno(Protocol):
31+
def fileno(self) -> int:
32+
pass
33+
34+
_FileDescriptorLike = Union[int, _HasFileno]
35+
36+
37+
# Collection of selector thread event loops to shut down on exit.
38+
_selector_loops: set[SelectorThread] = set()
39+
40+
41+
def _atexit_callback() -> None:
42+
for loop in _selector_loops:
43+
with loop._select_cond:
44+
loop._closing_selector = True
45+
loop._select_cond.notify()
46+
try:
47+
loop._waker_w.send(b"a")
48+
except BlockingIOError:
49+
pass
50+
# If we don't join our (daemon) thread here, we may get a deadlock
51+
# during interpreter shutdown. I don't really understand why. This
52+
# deadlock happens every time in CI (both travis and appveyor) but
53+
# I've never been able to reproduce locally.
54+
assert loop._thread is not None
55+
loop._thread.join()
56+
_selector_loops.clear()
57+
58+
59+
atexit.register(_atexit_callback)
60+
61+
62+
# SelectorThread from tornado 6.4.0
63+
64+
65+
class SelectorThread:
66+
"""Define ``add_reader`` methods to be called in a background select thread.
67+
68+
Instances of this class start a second thread to run a selector.
69+
This thread is completely hidden from the user;
70+
all callbacks are run on the wrapped event loop's thread.
71+
72+
Typically used via ``AddThreadSelectorEventLoop``,
73+
but can be attached to a running asyncio loop.
74+
"""
75+
76+
_closed = False
77+
78+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
79+
self._real_loop = real_loop
80+
81+
self._select_cond = threading.Condition()
82+
self._select_args: (
83+
tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None
84+
) = None
85+
self._closing_selector = False
86+
self._thread: threading.Thread | None = None
87+
self._thread_manager_handle = self._thread_manager()
88+
89+
async def thread_manager_anext() -> None:
90+
# the anext builtin wasn't added until 3.10. We just need to iterate
91+
# this generator one step.
92+
await self._thread_manager_handle.__anext__()
93+
94+
# When the loop starts, start the thread. Not too soon because we can't
95+
# clean up if we get to this point but the event loop is closed without
96+
# starting.
97+
self._real_loop.call_soon(
98+
lambda: self._real_loop.create_task(thread_manager_anext())
99+
)
100+
101+
self._readers: dict[_FileDescriptorLike, Callable] = {}
102+
self._writers: dict[_FileDescriptorLike, Callable] = {}
103+
104+
# Writing to _waker_w will wake up the selector thread, which
105+
# watches for _waker_r to be readable.
106+
self._waker_r, self._waker_w = socket.socketpair()
107+
self._waker_r.setblocking(False)
108+
self._waker_w.setblocking(False)
109+
_selector_loops.add(self)
110+
self.add_reader(self._waker_r, self._consume_waker)
111+
112+
def close(self) -> None:
113+
if self._closed:
114+
return
115+
with self._select_cond:
116+
self._closing_selector = True
117+
self._select_cond.notify()
118+
self._wake_selector()
119+
if self._thread is not None:
120+
self._thread.join()
121+
_selector_loops.discard(self)
122+
self.remove_reader(self._waker_r)
123+
self._waker_r.close()
124+
self._waker_w.close()
125+
self._closed = True
126+
127+
async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
128+
# Create a thread to run the select system call. We manage this thread
129+
# manually so we can trigger a clean shutdown from an atexit hook. Note
130+
# that due to the order of operations at shutdown, only daemon threads
131+
# can be shut down in this way (non-daemon threads would require the
132+
# introduction of a new hook: https://bugs.python.org/issue41962)
133+
self._thread = threading.Thread(
134+
name="Tornado selector",
135+
daemon=True,
136+
target=self._run_select,
137+
)
138+
self._thread.pydev_do_not_trace = True
139+
self._thread.start()
140+
self._start_select()
141+
try:
142+
# The presense of this yield statement means that this coroutine
143+
# is actually an asynchronous generator, which has a special
144+
# shutdown protocol. We wait at this yield point until the
145+
# event loop's shutdown_asyncgens method is called, at which point
146+
# we will get a GeneratorExit exception and can shut down the
147+
# selector thread.
148+
yield
149+
except GeneratorExit:
150+
self.close()
151+
raise
152+
153+
def _wake_selector(self) -> None:
154+
if self._closed:
155+
return
156+
try:
157+
self._waker_w.send(b"a")
158+
except BlockingIOError:
159+
pass
160+
161+
def _consume_waker(self) -> None:
162+
try:
163+
self._waker_r.recv(1024)
164+
except BlockingIOError:
165+
pass
166+
167+
def _start_select(self) -> None:
168+
# Capture reader and writer sets here in the event loop
169+
# thread to avoid any problems with concurrent
170+
# modification while the select loop uses them.
171+
with self._select_cond:
172+
assert self._select_args is None
173+
self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
174+
self._select_cond.notify()
175+
176+
def _run_select(self) -> None:
177+
while True:
178+
with self._select_cond:
179+
while self._select_args is None and not self._closing_selector:
180+
self._select_cond.wait()
181+
if self._closing_selector:
182+
return
183+
assert self._select_args is not None
184+
to_read, to_write = self._select_args
185+
self._select_args = None
186+
187+
# We use the simpler interface of the select module instead of
188+
# the more stateful interface in the selectors module because
189+
# this class is only intended for use on windows, where
190+
# select.select is the only option. The selector interface
191+
# does not have well-documented thread-safety semantics that
192+
# we can rely on so ensuring proper synchronization would be
193+
# tricky.
194+
try:
195+
# On windows, selecting on a socket for write will not
196+
# return the socket when there is an error (but selecting
197+
# for reads works). Also select for errors when selecting
198+
# for writes, and merge the results.
199+
#
200+
# This pattern is also used in
201+
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
202+
rs, ws, xs = select.select(to_read, to_write, to_write)
203+
ws = ws + xs
204+
except OSError as e:
205+
# After remove_reader or remove_writer is called, the file
206+
# descriptor may subsequently be closed on the event loop
207+
# thread. It's possible that this select thread hasn't
208+
# gotten into the select system call by the time that
209+
# happens in which case (at least on macOS), select may
210+
# raise a "bad file descriptor" error. If we get that
211+
# error, check and see if we're also being woken up by
212+
# polling the waker alone. If we are, just return to the
213+
# event loop and we'll get the updated set of file
214+
# descriptors on the next iteration. Otherwise, raise the
215+
# original error.
216+
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
217+
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
218+
if rs:
219+
ws = []
220+
else:
221+
raise
222+
else:
223+
raise
224+
225+
try:
226+
self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
227+
except RuntimeError:
228+
# "Event loop is closed". Swallow the exception for
229+
# consistency with PollIOLoop (and logical consistency
230+
# with the fact that we can't guarantee that an
231+
# add_callback that completes without error will
232+
# eventually execute).
233+
pass
234+
except AttributeError:
235+
# ProactorEventLoop may raise this instead of RuntimeError
236+
# if call_soon_threadsafe races with a call to close().
237+
# Swallow it too for consistency.
238+
pass
239+
240+
def _handle_select(
241+
self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
242+
) -> None:
243+
for r in rs:
244+
self._handle_event(r, self._readers)
245+
for w in ws:
246+
self._handle_event(w, self._writers)
247+
self._start_select()
248+
249+
def _handle_event(
250+
self,
251+
fd: _FileDescriptorLike,
252+
cb_map: dict[_FileDescriptorLike, Callable],
253+
) -> None:
254+
try:
255+
callback = cb_map[fd]
256+
except KeyError:
257+
return
258+
callback()
259+
260+
def add_reader(
261+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
262+
) -> None:
263+
self._readers[fd] = functools.partial(callback, *args)
264+
self._wake_selector()
265+
266+
def add_writer(
267+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
268+
) -> None:
269+
self._writers[fd] = functools.partial(callback, *args)
270+
self._wake_selector()
271+
272+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
273+
try:
274+
del self._readers[fd]
275+
except KeyError:
276+
return False
277+
self._wake_selector()
278+
return True
279+
280+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
281+
try:
282+
del self._writers[fd]
283+
except KeyError:
284+
return False
285+
self._wake_selector()
286+
return True
287+
288+
289+
# AddThreadSelectorEventLoop: unmodified from tornado 6.4.0
290+
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
291+
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
292+
293+
Instances of this class start a second thread to run a selector.
294+
This thread is completely hidden from the user; all callbacks are
295+
run on the wrapped event loop's thread.
296+
297+
This class is used automatically by Tornado; applications should not need
298+
to refer to it directly.
299+
300+
It is safe to wrap any event loop with this class, although it only makes sense
301+
for event loops that do not implement the ``add_reader`` family of methods
302+
themselves (i.e. ``WindowsProactorEventLoop``)
303+
304+
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
305+
"""
306+
307+
# This class is a __getattribute__-based proxy. All attributes other than those
308+
# in this set are proxied through to the underlying loop.
309+
MY_ATTRIBUTES = {
310+
"_real_loop",
311+
"_selector",
312+
"add_reader",
313+
"add_writer",
314+
"close",
315+
"remove_reader",
316+
"remove_writer",
317+
}
318+
319+
def __getattribute__(self, name: str) -> Any:
320+
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
321+
return super().__getattribute__(name)
322+
return getattr(self._real_loop, name)
323+
324+
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
325+
self._real_loop = real_loop
326+
self._selector = SelectorThread(real_loop)
327+
328+
def close(self) -> None:
329+
self._selector.close()
330+
self._real_loop.close()
331+
332+
def add_reader( # type: ignore[override]
333+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
334+
) -> None:
335+
return self._selector.add_reader(fd, callback, *args)
336+
337+
def add_writer( # type: ignore[override]
338+
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
339+
) -> None:
340+
return self._selector.add_writer(fd, callback, *args)
341+
342+
def remove_reader(self, fd: _FileDescriptorLike) -> bool:
343+
return self._selector.remove_reader(fd)
344+
345+
def remove_writer(self, fd: _FileDescriptorLike) -> bool:
346+
return self._selector.remove_writer(fd)
347+
348+
349+
# registry of asyncio loop : selector thread
350+
_selectors: WeakKeyDictionary = WeakKeyDictionary()
351+
352+
353+
def _set_selector_windows() -> None:
354+
"""Set selector-compatible loop.
355+
Sets ``add_reader`` family of methods on the asyncio loop.
356+
Workaround Windows proactor removal of *reader methods.
357+
"""
358+
if not (
359+
sys.platform == "win32"
360+
and current_async_library() == "asyncio"
361+
and asyncio.get_event_loop_policy().__class__.__name__
362+
== "WindowsProactorEventLoopPolicy"
363+
):
364+
return
365+
366+
asyncio_loop = asyncio.get_running_loop()
367+
if asyncio_loop in _selectors:
368+
return
369+
370+
from ._selector_thread import AddThreadSelectorEventLoop
371+
372+
selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract]
373+
asyncio_loop
374+
)
375+
376+
# patch loop.close to also close the selector thread
377+
loop_close = asyncio_loop.close
378+
379+
def _close_selector_and_loop() -> None:
380+
# restore original before calling selector.close,
381+
# which in turn calls eventloop.close!
382+
asyncio_loop.close = loop_close # type: ignore[method-assign]
383+
_selectors.pop(asyncio_loop, None)
384+
selector_loop.close()
385+
386+
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]
387+
asyncio_loop.add_reader = selector_loop.add_reader # type: ignore[assignment]
388+
asyncio_loop.remove_reader = selector_loop.remove_reader # type: ignore[method-assign]

0 commit comments

Comments
 (0)