|
4 | 4 | import asyncio
|
5 | 5 | import atexit
|
6 | 6 | import time
|
| 7 | +from concurrent.futures import Future |
7 | 8 | from threading import Event, Thread
|
8 | 9 | from typing import Any, Dict, List, Optional
|
9 | 10 |
|
10 | 11 | import zmq
|
11 | 12 | from tornado.ioloop import IOLoop
|
12 | 13 | from traitlets import Instance, Type
|
| 14 | +from traitlets.log import get_logger |
13 | 15 | from zmq.eventloop import zmqstream
|
14 | 16 |
|
15 | 17 | from .channels import HBChannel
|
@@ -45,7 +47,7 @@ def __init__(
|
45 | 47 | session : :class:`session.Session`
|
46 | 48 | The session to use.
|
47 | 49 | loop
|
48 |
| - A pyzmq ioloop to connect the socket to using a ZMQStream |
| 50 | + A tornado ioloop to connect the socket to using a ZMQStream |
49 | 51 | """
|
50 | 52 | super().__init__()
|
51 | 53 |
|
@@ -79,7 +81,30 @@ def stop(self) -> None:
|
79 | 81 | self._is_alive = False
|
80 | 82 |
|
81 | 83 | def close(self) -> None:
|
82 |
| - """ "Close the channel.""" |
| 84 | + """Close the channel.""" |
| 85 | + if self.stream is not None and self.ioloop is not None: |
| 86 | + # c.f.Future for threadsafe results |
| 87 | + f: Future = Future() |
| 88 | + |
| 89 | + def close_stream(): |
| 90 | + try: |
| 91 | + if self.stream is not None: |
| 92 | + self.stream.close(linger=0) |
| 93 | + self.stream = None |
| 94 | + except Exception as e: |
| 95 | + f.set_exception(e) |
| 96 | + else: |
| 97 | + f.set_result(None) |
| 98 | + |
| 99 | + self.ioloop.add_callback(close_stream) |
| 100 | + # wait for result |
| 101 | + try: |
| 102 | + f.result(timeout=5) |
| 103 | + except Exception as e: |
| 104 | + log = get_logger() |
| 105 | + msg = f"Error closing stream {self.stream}: {e}" |
| 106 | + log.warning(msg, RuntimeWarning, stacklevel=2) |
| 107 | + |
83 | 108 | if self.socket is not None:
|
84 | 109 | try:
|
85 | 110 | self.socket.close(linger=0)
|
|
0 commit comments