Skip to content

Commit e731fa7

Browse files
committed
Stop zmq sockets
1 parent 0d7f721 commit e731fa7

File tree

5 files changed

+44
-27
lines changed

5 files changed

+44
-27
lines changed

ipykernel/kernelbase.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,8 @@ async def shell_main(self, subshell_id: str | None):
428428
await to_thread.run_sync(self.shell_stop.wait)
429429
tg.cancel_scope.cancel()
430430

431+
await socket.stop()
432+
431433
async def process_shell(self, socket=None):
432434
# socket=None is valid if kernel subshells are not supported.
433435
try:

ipykernel/subshell.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,4 @@ async def create_pair_socket(
3232
self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR)
3333
self._pair_socket.connect(address)
3434
self.start_soon(self._pair_socket.start)
35-
36-
def run(self) -> None:
37-
try:
38-
super().run()
39-
finally:
40-
if self._pair_socket is not None:
41-
self._pair_socket.close()
42-
self._pair_socket = None
35+
self.add_teardown_callback(self._pair_socket.stop)

ipykernel/subshell_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,13 @@ def close(self) -> None:
116116
async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket:
117117
if not self._control_other_socket.started.is_set():
118118
await thread.task_group.start(self._control_other_socket.start)
119+
thread.add_teardown_callback(self._control_other_socket.stop)
119120
return self._control_other_socket
120121

121122
async def get_control_shell_channel_socket(self, thread: BaseThread) -> zmq_anyio.Socket:
122123
if not self._control_shell_channel_socket.started.is_set():
123124
await thread.task_group.start(self._control_shell_channel_socket.start)
125+
thread.add_teardown_callback(self._control_shell_channel_socket.stop)
124126
return self._control_shell_channel_socket
125127

126128
def get_other_socket(self, subshell_id: str | None) -> zmq_anyio.Socket:
@@ -281,6 +283,8 @@ async def _listen_for_subshell_reply(
281283
# Subshell no longer exists so exit gracefully
282284
return
283285
raise
286+
finally:
287+
await shell_channel_socket.stop()
284288

285289
async def _process_control_request(
286290
self, request: dict[str, t.Any], subshell_task: t.Any

ipykernel/thread.py

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
from collections.abc import Awaitable
6+
from inspect import isawaitable
67
from queue import Queue
78
from threading import Event, Thread
89
from typing import Any, Callable
@@ -26,6 +27,7 @@ def __init__(self, **kwargs):
2627
self.is_pydev_daemon_thread = True
2728
self._tasks: Queue[tuple[str, Callable[[], Awaitable[Any]]] | None] = Queue()
2829
self._result: Queue[Any] = Queue()
30+
self._teardown_callbacks: list[Callable[[], Any] | Callable[[], Awaitable[Any]]] = []
2931
self._exception: Exception | None = None
3032

3133
@property
@@ -47,6 +49,9 @@ def run_sync(self, func: Callable[..., Any]) -> Any:
4749
self._tasks.put(("run_sync", func))
4850
return self._result.get()
4951

52+
def add_teardown_callback(self, func: Callable[[], Any] | Callable[[], Awaitable[Any]]) -> None:
53+
self._teardown_callbacks.append(func)
54+
5055
def run(self) -> None:
5156
"""Run the thread."""
5257
try:
@@ -55,24 +60,37 @@ def run(self) -> None:
5560
self._exception = exc
5661

5762
async def _main(self) -> None:
58-
async with create_task_group() as tg:
59-
self._task_group = tg
60-
self.started.set()
61-
while True:
62-
task = await to_thread.run_sync(self._tasks.get)
63-
if task is None:
64-
break
65-
func, arg = task
66-
if func == "start_soon":
67-
tg.start_soon(arg)
68-
elif func == "run_async":
69-
res = await arg
70-
self._result.put(res)
71-
else: # func == "run_sync"
72-
res = arg()
73-
self._result.put(res)
74-
75-
tg.cancel_scope.cancel()
63+
try:
64+
async with create_task_group() as tg:
65+
self._task_group = tg
66+
self.started.set()
67+
while True:
68+
task = await to_thread.run_sync(self._tasks.get)
69+
if task is None:
70+
break
71+
func, arg = task
72+
if func == "start_soon":
73+
tg.start_soon(arg)
74+
elif func == "run_async":
75+
res = await arg
76+
self._result.put(res)
77+
else: # func == "run_sync"
78+
res = arg()
79+
self._result.put(res)
80+
81+
tg.cancel_scope.cancel()
82+
finally:
83+
exception = None
84+
for teardown_callback in self._teardown_callbacks[::-1]:
85+
try:
86+
res = teardown_callback()
87+
if isawaitable(res):
88+
await res
89+
except Exception as exc:
90+
if exception is None:
91+
exception = exc
92+
if exception is not None:
93+
raise exception
7694

7795
def stop(self) -> None:
7896
"""Stop the thread.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ dependencies = [
3333
"psutil>=5.7",
3434
"packaging>=22",
3535
"anyio>=4.8.0,<5.0.0",
36-
"zmq-anyio >=0.3.6",
36+
"zmq-anyio >=0.3.9",
3737
]
3838

3939
[project.urls]

0 commit comments

Comments
 (0)