Skip to content

Commit 98c87fe

Browse files
authored
use c.f.Future to wait across threads (#940)
1 parent 2f6357f commit 98c87fe

File tree

1 file changed

+54
-22
lines changed

1 file changed

+54
-22
lines changed

jupyter_client/threaded.py

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import atexit
66
import time
77
from concurrent.futures import Future
8-
from threading import Event, Thread
8+
from functools import partial
9+
from threading import Thread
910
from typing import Any, Dict, List, Optional
1011

1112
import zmq
@@ -54,17 +55,22 @@ def __init__(
5455
self.socket = socket
5556
self.session = session
5657
self.ioloop = loop
57-
evt = Event()
58+
f: Future = Future()
5859

5960
def setup_stream():
60-
assert self.socket is not None
61-
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
62-
self.stream.on_recv(self._handle_recv)
63-
evt.set()
61+
try:
62+
assert self.socket is not None
63+
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
64+
self.stream.on_recv(self._handle_recv)
65+
except Exception as e:
66+
f.set_exception(e)
67+
else:
68+
f.set_result(None)
6469

6570
assert self.ioloop is not None
6671
self.ioloop.add_callback(setup_stream)
67-
evt.wait()
72+
# don't wait forever, raise any errors
73+
f.result(timeout=10)
6874

6975
_is_alive = False
7076

@@ -179,13 +185,31 @@ def flush(self, timeout: float = 1.0) -> None:
179185
"""
180186
# We do the IOLoop callback process twice to ensure that the IOLoop
181187
# gets to perform at least one full poll.
182-
stop_time = time.time() + timeout
188+
stop_time = time.monotonic() + timeout
183189
assert self.ioloop is not None
190+
if self.stream is None or self.stream.closed():
191+
# don't bother scheduling flush on a thread if we're closed
192+
_msg = "Attempt to flush closed stream"
193+
raise OSError(_msg)
194+
195+
def flush(f):
196+
try:
197+
self._flush()
198+
except Exception as e:
199+
f.set_exception(e)
200+
else:
201+
f.set_result(None)
202+
184203
for _ in range(2):
185-
self._flushed = False
186-
self.ioloop.add_callback(self._flush)
187-
while not self._flushed and time.time() < stop_time:
188-
time.sleep(0.01)
204+
f: Future = Future()
205+
self.ioloop.add_callback(partial(flush, f))
206+
# wait for async flush, re-raise any errors
207+
timeout = max(stop_time - time.monotonic(), 0)
208+
try:
209+
f.result(max(stop_time - time.monotonic(), 0))
210+
except TimeoutError:
211+
# flush with a timeout means stop waiting, not raise
212+
return
189213

190214
def _flush(self) -> None:
191215
"""Callback for :method:`self.flush`."""
@@ -219,24 +243,32 @@ def start(self) -> None:
219243
Don't return until self.ioloop is defined,
220244
which is created in the thread
221245
"""
222-
self._start_event = Event()
246+
self._start_future: Future = Future()
223247
Thread.start(self)
224-
self._start_event.wait()
248+
# wait for start, re-raise any errors
249+
self._start_future.result(timeout=10)
225250

226251
def run(self) -> None:
227252
"""Run my loop, ignoring EINTR events in the poller"""
228-
loop = asyncio.new_event_loop()
229-
asyncio.set_event_loop(loop)
253+
try:
254+
loop = asyncio.new_event_loop()
255+
asyncio.set_event_loop(loop)
256+
257+
async def assign_ioloop():
258+
self.ioloop = IOLoop.current()
259+
260+
loop.run_until_complete(assign_ioloop())
261+
except Exception as e:
262+
self._start_future.set_exception(e)
263+
else:
264+
self._start_future.set_result(None)
265+
230266
loop.run_until_complete(self._async_run())
231267

232268
async def _async_run(self):
233-
self.ioloop = IOLoop.current()
234-
# signal that self.ioloop is defined
235-
self._start_event.set()
236-
while True:
269+
"""Run forever (until self._exiting is set)"""
270+
while not self._exiting:
237271
await asyncio.sleep(1)
238-
if self._exiting:
239-
break
240272

241273
def stop(self) -> None:
242274
"""Stop the channel's event loop and join its thread.

0 commit comments

Comments
 (0)