Skip to content

Commit 112ca66

Browse files
minrkblink1073pre-commit-ci[bot]
authored
Avoid ResourceWarning on implicitly closed event pipe sockets (ipython#1125)
Co-authored-by: Steven Silvester <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 1c7f626 commit 112ca66

File tree

2 files changed

+78
-7
lines changed

2 files changed

+78
-7
lines changed

ipykernel/iostream.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Copyright (c) IPython Development Team.
44
# Distributed under the terms of the Modified BSD License.
55

6+
import asyncio
67
import atexit
78
import io
89
import os
@@ -14,8 +15,7 @@
1415
from collections import deque
1516
from io import StringIO, TextIOBase
1617
from threading import local
17-
from typing import Any, Callable, Deque, Optional
18-
from weakref import WeakSet
18+
from typing import Any, Callable, Deque, Dict, Optional
1919

2020
import zmq
2121
from jupyter_client.session import extract_header
@@ -63,7 +63,10 @@ def __init__(self, socket, pipe=False):
6363
self._setup_pipe_in()
6464
self._local = threading.local()
6565
self._events: Deque[Callable[..., Any]] = deque()
66-
self._event_pipes: WeakSet[Any] = WeakSet()
66+
self._event_pipes: Dict[threading.Thread, Any] = {}
67+
self._event_pipe_gc_lock: threading.Lock = threading.Lock()
68+
self._event_pipe_gc_seconds: float = 10
69+
self._event_pipe_gc_task: Optional[asyncio.Task] = None
6770
self._setup_event_pipe()
6871
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
6972
self.thread.daemon = True
@@ -73,7 +76,18 @@ def __init__(self, socket, pipe=False):
7376

7477
def _thread_main(self):
7578
"""The inner loop that's actually run in a thread"""
79+
80+
def _start_event_gc():
81+
self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())
82+
83+
self.io_loop.run_sync(_start_event_gc)
7684
self.io_loop.start()
85+
if self._event_pipe_gc_task is not None:
86+
# cancel gc task to avoid pending task warnings
87+
async def _cancel():
88+
self._event_pipe_gc_task.cancel() # type:ignore
89+
90+
self.io_loop.run_sync(_cancel)
7791
self.io_loop.close(all_fds=True)
7892

7993
def _setup_event_pipe(self):
@@ -88,6 +102,26 @@ def _setup_event_pipe(self):
88102
self._event_puller = ZMQStream(pipe_in, self.io_loop)
89103
self._event_puller.on_recv(self._handle_event)
90104

105+
async def _run_event_pipe_gc(self):
106+
"""Task to run event pipe gc continuously"""
107+
while True:
108+
await asyncio.sleep(self._event_pipe_gc_seconds)
109+
try:
110+
await self._event_pipe_gc()
111+
except Exception as e:
112+
print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__)
113+
114+
async def _event_pipe_gc(self):
115+
"""run a single garbage collection on event pipes"""
116+
if not self._event_pipes:
117+
# don't acquire the lock if there's nothing to do
118+
return
119+
with self._event_pipe_gc_lock:
120+
for thread, socket in list(self._event_pipes.items()):
121+
if not thread.is_alive():
122+
socket.close()
123+
del self._event_pipes[thread]
124+
91125
@property
92126
def _event_pipe(self):
93127
"""thread-local event pipe for signaling events that should be processed in the thread"""
@@ -100,9 +134,11 @@ def _event_pipe(self):
100134
event_pipe.linger = 0
101135
event_pipe.connect(self._event_interface)
102136
self._local.event_pipe = event_pipe
103-
# WeakSet so that event pipes will be closed by garbage collection
104-
# when their threads are terminated
105-
self._event_pipes.add(event_pipe)
137+
# associate event pipes to their threads
138+
# so they can be closed explicitly
139+
# implicit close on __del__ throws a ResourceWarning
140+
with self._event_pipe_gc_lock:
141+
self._event_pipes[threading.current_thread()] = event_pipe
106142
return event_pipe
107143

108144
def _handle_event(self, msg):
@@ -188,7 +224,7 @@ def stop(self):
188224
# close *all* event pipes, created in any thread
189225
# event pipes can only be used from other threads while self.thread.is_alive()
190226
# so after thread.join, this should be safe
191-
for event_pipe in self._event_pipes:
227+
for _thread, event_pipe in self._event_pipes.items():
192228
event_pipe.close()
193229

194230
def close(self):

ipykernel/tests/test_io.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import os
55
import subprocess
66
import sys
7+
import threading
78
import time
89
import warnings
10+
from concurrent.futures import Future, ThreadPoolExecutor
911
from unittest import mock
1012

1113
import pytest
@@ -114,6 +116,39 @@ def test_outstream(iopub_thread):
114116
assert stream.writable()
115117

116118

119+
async def test_event_pipe_gc(iopub_thread):
120+
session = Session(key=b'abc')
121+
stream = OutStream(
122+
session,
123+
iopub_thread,
124+
"stdout",
125+
isatty=True,
126+
watchfd=False,
127+
)
128+
save_stdout = sys.stdout
129+
assert iopub_thread._event_pipes == {}
130+
with stream, mock.patch.object(sys, "stdout", stream), ThreadPoolExecutor(1) as pool:
131+
pool.submit(print, "x").result()
132+
pool_thread = pool.submit(threading.current_thread).result()
133+
assert list(iopub_thread._event_pipes) == [pool_thread]
134+
135+
# run gc once in the iopub thread
136+
f: Future = Future()
137+
138+
async def run_gc():
139+
try:
140+
await iopub_thread._event_pipe_gc()
141+
except Exception as e:
142+
f.set_exception(e)
143+
else:
144+
f.set_result(None)
145+
146+
iopub_thread.io_loop.add_callback(run_gc)
147+
# wait for call to finish in iopub thread
148+
f.result()
149+
assert iopub_thread._event_pipes == {}
150+
151+
117152
def subprocess_test_echo_watch():
118153
# handshake Pub subscription
119154
session = Session(key=b'abc')

0 commit comments

Comments
 (0)