Skip to content

Commit e4849d4

Browse files
committed
Try to fix unclosed resources.
I think these might be one of the few places that create those socket, so maybe closing them is the right call. I'm not sure. There are random failures on some tests. as _handle_pipe_msg is use in a single place and `self._async_pipe_in1` is used only to be passed between _handle_pipe_msgs and _handle_pipe_msg I'm tempted to refactor and pass _async_pipe_in1 as a parameter maybe ? and also rename to _handle_single_pipe_msg to make it visually a bit more distinctive.
1 parent f3f2a60 commit e4849d4

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

ipykernel/iostream.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,20 +170,20 @@ async def _handle_event(self):
170170
*all* waiting events are processed in order.
171171
"""
172172
# create async wrapper within coroutine
173-
pipe_in = zmq.asyncio.Socket(self._pipe_in0)
174-
try:
175-
while True:
176-
await pipe_in.recv()
177-
# freeze event count so new writes don't extend the queue
178-
# while we are processing
179-
n_events = len(self._events)
180-
for _ in range(n_events):
181-
event_f = self._events.popleft()
182-
event_f()
183-
except Exception:
184-
if self.thread.__stop.is_set():
185-
return
186-
raise
173+
with zmq.asyncio.Socket(self._pipe_in0) as pipe_in:
174+
try:
175+
while True:
176+
await pipe_in.recv()
177+
# freeze event count so new writes don't extend the queue
178+
# while we are processing
179+
n_events = len(self._events)
180+
for _ in range(n_events):
181+
event_f = self._events.popleft()
182+
event_f()
183+
except Exception:
184+
if self.thread.__stop.is_set():
185+
return
186+
raise
187187

188188
def _setup_pipe_in(self):
189189
"""setup listening pipe for IOPub from forked subprocesses"""
@@ -218,6 +218,8 @@ async def _handle_pipe_msgs(self):
218218
if self.thread.__stop.is_set():
219219
return
220220
raise
221+
finally:
222+
self._async_pipe_in1.close()
221223

222224
async def _handle_pipe_msg(self, msg=None):
223225
"""handle a pipe message from a subprocess"""

0 commit comments

Comments
 (0)