Skip to content

Commit 1994870

Browse files
committed
Switch from Tornado Queues to anyio MemoryObjectStreams
1 parent 341f810 commit 1994870

File tree

3 files changed

+32
-17
lines changed

3 files changed

+32
-17
lines changed

python_packages/jupyter_lsp/jupyter_lsp/connection.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@
1414
import anyio
1515
from anyio.streams.buffered import BufferedByteReceiveStream
1616
from anyio.streams.text import TextSendStream
17+
from anyio.streams.stapled import StapledObjectStream
1718
from tornado.httputil import HTTPHeaders
18-
from tornado.ioloop import IOLoop
19-
from tornado.queues import Queue
2019
from traitlets import Instance, Int
2120
from traitlets.config import LoggingConfigurable
2221
from traitlets.traitlets import MetaHasTraits
@@ -31,7 +30,7 @@ class LspStreamBase(LoggingConfigurable, ABC, metaclass=LspStreamMeta):
3130
streams
3231
"""
3332

34-
queue = Instance(Queue, help="queue to get/put")
33+
queue = Instance(StapledObjectStream, help="queue to get/put")
3534

3635
def __repr__(self): # pragma: no cover
3736
return "<{}(parent={})>".format(self.__class__.__name__, self.parent)
@@ -71,7 +70,7 @@ async def read(self) -> None:
7170
message = None
7271
try:
7372
message = await self.read_one()
74-
IOLoop.current().add_callback(self.queue.put_nowait, message)
73+
await self.queue.send(message)
7574
except anyio.ClosedResourceError:
7675
# stream was closed -> terminate
7776
self.log.debug("Stream closed while a read was still in progress")
@@ -151,7 +150,7 @@ async def close(self):
151150
async def write(self) -> None:
152151
"""Write to a Language Server until it closes"""
153152
while True:
154-
message = await self.queue.get()
153+
message = await self.queue.receive()
155154
try:
156155
n_bytes = len(message.encode("utf-8"))
157156
response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message)
@@ -165,8 +164,6 @@ async def write(self) -> None:
165164
break
166165
except Exception: # pragma: no cover
167166
self.log.exception("%s couldn't write message: %s", self, response)
168-
finally:
169-
self.queue.task_done()
170167

171168
async def _write_one(self, message) -> None:
172169
await self.stream.send(message)

python_packages/jupyter_lsp/jupyter_lsp/session.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
""" A session for managing a language server process
22
"""
33
import atexit
4+
import math
45
import os
56
import string
67
import subprocess
@@ -14,8 +15,8 @@
1415
import anyio
1516
from anyio import CancelScope
1617
from anyio.abc import Process, SocketStream
18+
from anyio.streams.stapled import StapledObjectStream
1719
from tornado.ioloop import IOLoop
18-
from tornado.queues import Queue
1920
from tornado.websocket import WebSocketHandler
2021
from traitlets import Bunch, Float, Instance, Set, Unicode, UseEnum, observe
2122
from traitlets.config import LoggingConfigurable
@@ -62,10 +63,14 @@ class LanguageServerSessionBase(
6263
writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True)
6364
reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True)
6465
from_lsp = Instance(
65-
Queue, help="a queue for string messages from the server", allow_none=True
66+
StapledObjectStream,
67+
help="a queue for string messages from the server",
68+
allow_none=True
6669
)
6770
to_lsp = Instance(
68-
Queue, help="a queue for string message to the server", allow_none=True
71+
StapledObjectStream,
72+
help="a queue for string messages to the server",
73+
allow_none=True
6974
)
7075
handlers = Set(
7176
trait=Instance(WebSocketHandler),
@@ -80,6 +85,10 @@ class LanguageServerSessionBase(
8085
5,
8186
help="timeout after which a process will be terminated forcefully",
8287
).tag(config=True)
88+
queue_size = Float(
89+
math.inf,
90+
help="the maximum number of messages that can be buffered in the queue"
91+
).tag(config=True)
8392

8493
_skip_serialize = ["argv", "debug_argv"]
8594

@@ -170,6 +179,12 @@ async def cleanup(self):
170179
if self.process is not None:
171180
await self.stop_process(self.stop_timeout)
172181
self.process = None
182+
if self.from_lsp is not None:
183+
await self.from_lsp.aclose()
184+
self.from_lsp = None
185+
if self.to_lsp is not None:
186+
await self.to_lsp.aclose()
187+
self.to_lsp = None
173188

174189
self.status = SessionStatus.STOPPED
175190

@@ -184,7 +199,7 @@ def _on_handlers(self, change: Bunch):
184199
def write(self, message):
185200
"""wrapper around the write queue to keep it mostly internal"""
186201
self.last_handler_message_at = self.now()
187-
self.thread_loop.add_callback(self.to_lsp.put_nowait, message)
202+
self.thread_loop.add_callback(self.to_lsp.send, message)
188203

189204
def now(self):
190205
return datetime.now(timezone.utc)
@@ -242,8 +257,10 @@ async def stop_process(self, timeout: int = 5):
242257

243258
def init_queues(self):
244259
"""create the queues"""
245-
self.from_lsp = Queue()
246-
self.to_lsp = Queue()
260+
self.from_lsp = StapledObjectStream(
261+
*anyio.create_memory_object_stream(max_buffer_size=self.queue_size))
262+
self.to_lsp = StapledObjectStream(
263+
*anyio.create_memory_object_stream(max_buffer_size=self.queue_size))
247264

248265
def substitute_env(self, env, base):
249266
final_env = copy(os.environ)
@@ -286,7 +303,6 @@ async def _broadcast_from_lsp(self):
286303
self.last_server_message_at = self.now()
287304
# handle message in the main thread's event loop
288305
self.main_loop.add_callback(self.parent.on_server_message, message, self)
289-
self.from_lsp.task_done()
290306

291307

292308
class LanguageServerSessionStdio(LanguageServerSessionBase):

python_packages/jupyter_lsp/jupyter_lsp/tests/test_reader.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import subprocess
22

33
import anyio
4+
from anyio.streams.stapled import StapledObjectStream
5+
import math
46
import pytest
5-
from tornado.queues import Queue
67

78
from jupyter_lsp.connection import LspStreamReader
89
from jupyter_lsp.utils import get_unused_port
@@ -127,7 +128,8 @@ async def join_process(process: anyio.abc.Process, headstart=1, timeout=1):
127128
async def test_reader(
128129
message, repeats, interval, add_excess, mode, communicator_spawner
129130
):
130-
queue = Queue()
131+
queue = StapledObjectStream(
132+
*anyio.create_memory_object_stream(max_buffer_size=math.inf))
131133

132134
port = get_unused_port() if mode == "tcp" else None
133135
process = await communicator_spawner.spawn_writer(
@@ -154,5 +156,5 @@ async def test_reader(
154156
if port is not None:
155157
await stream.aclose()
156158

157-
result = queue.get_nowait()
159+
result = await queue.receive()
158160
assert result == message * repeats

0 commit comments

Comments
 (0)