Skip to content

Commit eefa576

Browse files
committed
Use zmq-anyio v0.3.0
1 parent 3a14e83 commit eefa576

File tree

9 files changed

+25
-31
lines changed

9 files changed

+25
-31
lines changed

ipykernel/debugger.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async def _send_request(self, msg):
241241
self.log.debug("DEBUGPYCLIENT:")
242242
self.log.debug(self.routing_id)
243243
self.log.debug(buf)
244-
await self.debugpy_socket.asend_multipart((self.routing_id, buf))
244+
await self.debugpy_socket.asend_multipart((self.routing_id, buf)).wait()
245245

246246
async def _wait_for_response(self):
247247
# Since events are never pushed to the message_queue
@@ -437,7 +437,7 @@ async def start(self):
437437
(self.shell_socket.getsockopt(ROUTING_ID)),
438438
)
439439

440-
msg = await self.shell_socket.arecv_multipart()
440+
msg = await self.shell_socket.arecv_multipart().wait()
441441
ident, msg = self.session.feed_identities(msg, copy=True)
442442
try:
443443
msg = self.session.deserialize(msg, content=True, copy=True)

ipykernel/inprocess/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
class Session(_Session):
55
async def recv(self, socket, copy=True):
6-
return await socket.arecv_multipart()
6+
return await socket.arecv_multipart().wait()
77

88
def send(
99
self,

ipykernel/iostream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ async def _handle_event(self):
143143
async with pipe_in:
144144
try:
145145
while True:
146-
await pipe_in.arecv()
146+
await pipe_in.arecv().wait()
147147
# freeze event count so new writes don't extend the queue
148148
# while we are processing
149149
n_events = len(self._events)
@@ -189,7 +189,7 @@ async def _handle_pipe_msgs(self):
189189

190190
async def _handle_pipe_msg(self, msg=None):
191191
"""handle a pipe message from a subprocess"""
192-
msg = msg or await self._pipe_in1.arecv_multipart()
192+
msg = msg or await self._pipe_in1.arecv_multipart().wait()
193193
if not self._pipe_flag or not self._is_main_process():
194194
return
195195
if msg[0] != self._pipe_uuid:

ipykernel/ipkernel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ async def receive_debugpy_message(self, msg=None):
236236

237237
if msg is None:
238238
assert self.debugpy_socket is not None
239-
msg = await self.debugpy_socket.arecv_multipart()
239+
msg = await self.debugpy_socket.arecv_multipart().wait()
240240
# The first frame is the socket id, we can drop it
241241
frame = msg[1].decode("utf-8")
242242
self.log.debug("Debugpy received: %s", frame)

ipykernel/kernelbase.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ async def process_control_message(self, msg=None):
269269
assert self.session is not None
270270
assert self.control_thread is None or threading.current_thread() == self.control_thread
271271

272-
msg = msg or await self.control_socket.arecv_multipart()
272+
msg = msg or await self.control_socket.arecv_multipart().wait()
273273
idents, msg = self.session.feed_identities(msg)
274274
try:
275275
msg = self.session.deserialize(msg, content=True)
@@ -369,7 +369,7 @@ async def shell_channel_thread_main(self):
369369
async with self.shell_socket, create_task_group() as tg:
370370
try:
371371
while True:
372-
msg = await self.shell_socket.arecv_multipart(copy=False)
372+
msg = await self.shell_socket.arecv_multipart(copy=False).wait()
373373
# deserialize only the header to get subshell_id
374374
# Keep original message to send to subshell_id unmodified.
375375
_, msg2 = self.session.feed_identities(msg, copy=False)
@@ -384,7 +384,7 @@ async def shell_channel_thread_main(self):
384384
assert socket is not None
385385
if not socket.started.is_set():
386386
await tg.start(socket.start)
387-
await socket.asend_multipart(msg, copy=False)
387+
socket.asend_multipart(msg, copy=False)
388388
except Exception:
389389
self.log.error("Invalid message", exc_info=True) # noqa: G201
390390
except BaseException:
@@ -444,8 +444,8 @@ async def process_shell_message(self, msg=None, socket=None):
444444
assert socket is None
445445
socket = self.shell_socket
446446

447-
no_msg = msg is None if self._is_test else not await socket.apoll(0)
448-
msg = msg or await socket.arecv_multipart(copy=False)
447+
no_msg = msg is None if self._is_test else not await socket.apoll(0).wait()
448+
msg = msg or await socket.arecv_multipart(copy=False).wait()
449449

450450
received_time = time.monotonic()
451451
copy = not isinstance(msg[0], zmq.Message)
@@ -499,7 +499,7 @@ async def process_shell_message(self, msg=None, socket=None):
499499
try:
500500
result = handler(socket, idents, msg)
501501
if inspect.isawaitable(result):
502-
result = await result
502+
await result
503503
except Exception:
504504
self.log.error("Exception in message handler:", exc_info=True) # noqa: G201
505505
except KeyboardInterrupt:
@@ -1090,8 +1090,8 @@ async def create_subshell_request(self, socket, ident, parent) -> None:
10901090
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
10911091
self.control_thread
10921092
)
1093-
await other_socket.asend_json({"type": "create"})
1094-
reply = await other_socket.arecv_json()
1093+
await other_socket.asend_json({"type": "create"}).wait()
1094+
reply = await other_socket.arecv_json().wait()
10951095

10961096
self.session.send(socket, "create_subshell_reply", reply, parent, ident)
10971097

@@ -1114,8 +1114,8 @@ async def delete_subshell_request(self, socket, ident, parent) -> None:
11141114
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
11151115
self.control_thread
11161116
)
1117-
await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id})
1118-
reply = await other_socket.arecv_json()
1117+
await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}).wait()
1118+
reply = await other_socket.arecv_json().wait()
11191119

11201120
self.session.send(socket, "delete_subshell_reply", reply, parent, ident)
11211121

@@ -1131,8 +1131,8 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
11311131
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
11321132
self.control_thread
11331133
)
1134-
await other_socket.asend_json({"type": "list"})
1135-
reply = await other_socket.arecv_json()
1134+
await other_socket.asend_json({"type": "list"}).wait()
1135+
reply = await other_socket.arecv_json().wait()
11361136

11371137
self.session.send(socket, "list_subshell_reply", reply, parent, ident)
11381138

ipykernel/subshell_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,9 @@ async def listen_from_control(self, subshell_task: t.Any) -> None:
158158
socket = self._control_shell_channel_socket
159159
async with socket:
160160
while True:
161-
request = await socket.arecv_json()
161+
request = await socket.arecv_json().wait()
162162
reply = await self._process_control_request(request, subshell_task)
163-
await socket.asend_json(reply)
163+
await socket.asend_json(reply).wait()
164164

165165
async def listen_from_subshells(self) -> None:
166166
"""Listen for reply messages on inproc sockets of all subshells and resend
@@ -270,9 +270,9 @@ async def _listen_for_subshell_reply(
270270
await shell_channel_socket.started.wait()
271271
try:
272272
while True:
273-
msg = await shell_channel_socket.arecv_multipart(copy=False)
273+
msg = await shell_channel_socket.arecv_multipart(copy=False).wait()
274274
with self._lock_shell_socket:
275-
await self._shell_socket.asend_multipart(msg)
275+
await self._shell_socket.asend_multipart(msg).wait()
276276
except BaseException:
277277
if not self._is_subshell(subshell_id):
278278
# Subshell no longer exists so exit gracefully

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ dependencies = [
3434
"psutil>=5.7",
3535
"packaging>=22",
3636
"anyio>=4.8.0,<5.0.0",
37-
"zmq-anyio >=0.2.5",
37+
"zmq-anyio >=0.3.0",
3838
]
3939

4040
[project.urls]

tests/test_io.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ async def test_echo_watch(ctx):
221221
print(f"{p.stdout=}")
222222
print(f"{p.stderr}=", file=sys.stderr)
223223
assert p.returncode == 0
224-
while await s.apoll(timeout=100):
225-
msg = await s.arecv_multipart()
224+
while await s.apoll(timeout=100).wait():
225+
msg = await s.arecv_multipart().wait()
226226
ident, msg = session.feed_identities(msg, copy=True)
227227
msg = session.deserialize(msg, content=True, copy=True)
228228
assert msg is not None # for type narrowing

tests/test_kernel.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ def thread_target():
8383
msg = kc.get_iopub_msg(timeout=interval * 2)
8484
if msg["msg_type"] != "stream":
8585
continue
86-
print(f"{thread_msg_id=}")
87-
print(f"{msg=}")
8886
content = msg["content"]
8987
assert content["name"] == "stdout"
9088
assert content["text"] == str(received)
@@ -123,8 +121,6 @@ def parent_target():
123121
msg = kc.get_iopub_msg(timeout=interval * 2)
124122
if msg["msg_type"] != "stream":
125123
continue
126-
print(f"{thread_msg_id=}")
127-
print(f"{msg=}")
128124
content = msg["content"]
129125
assert content["name"] == "stdout"
130126
assert content["text"] == str(received)
@@ -157,8 +153,6 @@ async def async_task():
157153
msg = kc.get_iopub_msg(timeout=interval * 2)
158154
if msg["msg_type"] != "stream":
159155
continue
160-
print(f"{thread_msg_id=}")
161-
print(f"{msg=}")
162156
content = msg["content"]
163157
assert content["name"] == "stdout"
164158
assert content["text"] == str(received)

0 commit comments

Comments
 (0)