Skip to content

Commit 8444ec6

Browse files
committed
Remove old synchronous code from Reader and Writer
1 parent f492327 commit 8444ec6

File tree

2 files changed

+25
-91
lines changed

2 files changed

+25
-91
lines changed

python_packages/jupyter_lsp/jupyter_lsp/connection.py

Lines changed: 18 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@
77
> > MIT License https://github.com/palantir/python-jsonrpc-server/blob/0.2.0/LICENSE
88
> > Copyright 2018 Palantir Technologies, Inc.
99
"""
10-
import os
1110
from abc import ABC, ABCMeta, abstractmethod
12-
from typing import List, Optional, Text
11+
from typing import Optional, Text
1312

1413
# pylint: disable=broad-except
1514
import anyio
1615
from anyio.streams.buffered import BufferedByteReceiveStream
1716
from anyio.streams.text import TextSendStream
18-
from tornado.gen import convert_yielded
1917
from tornado.httputil import HTTPHeaders
2018
from tornado.ioloop import IOLoop
2119
from tornado.queues import Queue
22-
from traitlets import Float, Instance, Int, default
20+
from traitlets import Instance, Int
2321
from traitlets.config import LoggingConfigurable
2422
from traitlets.traitlets import MetaHasTraits
2523

@@ -48,15 +46,8 @@ async def close(self):
4846

4947

5048
class LspStreamReader(LspStreamBase):
51-
"""Language Server Reader
49+
"""Language Server Reader"""
5250

53-
Because non-blocking (but still synchronous) IO is used, rudimentary
54-
exponential backoff is used.
55-
"""
56-
57-
max_wait = Float(help="maximum time to wait on idle stream").tag(config=True)
58-
min_wait = Float(0.05, help="minimum time to wait on idle stream").tag(config=True)
59-
next_wait = Float(0.05, help="next time to wait on idle stream").tag(config=True)
6051
receive_max_bytes = Int(
6152
65536,
6253
help="the maximum size a header line send by the language server may have",
@@ -74,32 +65,12 @@ async def close(self):
7465
await self.stream.aclose()
7566
self.log.debug("%s closed", self)
7667

77-
@default("max_wait")
78-
def _default_max_wait(self):
79-
return 0.1 if os.name == "nt" else self.min_wait * 2
80-
81-
async def sleep(self):
82-
"""Simple exponential backoff for sleeping"""
83-
self.next_wait = min(self.next_wait * 2, self.max_wait)
84-
await anyio.sleep(self.next_wait)
85-
86-
def wake(self):
87-
"""Reset the wait time"""
88-
self.wait = self.min_wait
89-
9068
async def read(self) -> None:
9169
"""Read from a Language Server until it is closed"""
9270
while True:
9371
message = None
9472
try:
9573
message = await self.read_one()
96-
97-
if not message:
98-
await self.sleep()
99-
continue
100-
else:
101-
self.wake()
102-
10374
IOLoop.current().add_callback(self.queue.put_nowait, message)
10475
except anyio.ClosedResourceError:
10576
# stream was closed -> terminate
@@ -109,89 +80,48 @@ async def read(self) -> None:
10980
self.log.exception(
11081
"%s couldn't enqueue message: %s (%s)", self, message, e
11182
)
112-
await self.sleep()
113-
114-
async def _read_content(
115-
self, length: int, max_parts=1000, max_empties=200
116-
) -> Optional[bytes]:
117-
"""Read the full length of the message unless exceeding max_parts or
118-
max_empties empty reads occur.
11983

120-
See https://github.com/jupyter-lsp/jupyterlab-lsp/issues/450
121-
122-
Crucial docs or read():
123-
"If the argument is positive, and the underlying raw
124-
stream is not interactive, multiple raw reads may be issued
125-
to satisfy the byte count (unless EOF is reached first)"
84+
async def _read_content(self, length: int) -> Optional[bytes]:
85+
"""Read the full length of the message.
12686
12787
Args:
12888
- length: the content length
129-
- max_parts: prevent absurdly long messages (1000 parts is several MBs):
130-
1 part is usually sufficient but not enough for some long
131-
messages 2 or 3 parts are often needed.
13289
"""
133-
raw = None
134-
raw_parts: List[bytes] = []
135-
received_size = 0
136-
while received_size < length and len(raw_parts) < max_parts and max_empties > 0:
137-
part = None
138-
try:
139-
part = await self.stream.receive_exactly(length - received_size)
140-
except anyio.IncompleteRead: # pragma: no cover
141-
pass
142-
if part is None: # pragma: no cover
143-
max_empties -= 1
144-
await self.sleep()
145-
continue
146-
received_size += len(part)
147-
raw_parts.append(part)
148-
149-
if raw_parts:
150-
raw = b"".join(raw_parts)
151-
if len(raw) != length: # pragma: no cover
152-
self.log.warning(
153-
f"Readout and content-length mismatch: {len(raw)} vs {length};"
154-
f"remaining empties: {max_empties}; remaining parts: {max_parts}"
155-
)
156-
157-
return raw
90+
try:
91+
return await self.stream.receive_exactly(length)
92+
except anyio.IncompleteRead: # pragma: no cover
93+
# resource has been closed before the requested bytes could be retrieved
94+
# -> signal recource closed
95+
raise anyio.ClosedResourceError
15896

15997
async def read_one(self) -> Text:
16098
"""Read a single message"""
16199
message = ""
162100
headers = HTTPHeaders()
163101

164-
line = await convert_yielded(self._readline())
102+
line = await self._readline()
165103

166104
if line:
167105
while line and line.strip():
168106
headers.parse_line(line)
169-
line = await convert_yielded(self._readline())
107+
line = await self._readline()
170108

171109
content_length = int(headers.get("content-length", "0"))
172110

173111
if content_length:
174112
raw = await self._read_content(length=content_length)
175-
if raw is not None:
176-
message = raw.decode("utf-8").strip()
177-
else: # pragma: no cover
178-
self.log.warning(
179-
"%s failed to read message of length %s",
180-
self,
181-
content_length,
182-
)
113+
message = raw.decode("utf-8").strip()
183114

184115
return message
185116

186117
async def _readline(self) -> Text:
187-
"""Read a line (or immediately return None)"""
118+
"""Read a line"""
188119
try:
189120
# use same max_bytes as is default for receive for now. It seems there is no
190121
# way of getting the bytes read until max_bytes is reached, so we cannot
191122
# iterate the receive_until call with smaller max_bytes values
192-
async with anyio.move_on_after(0.2):
193-
line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes)
194-
return line.decode("utf-8").strip()
123+
line = await self.stream.receive_until(b"\r\n", self.receive_max_bytes)
124+
return line.decode("utf-8").strip()
195125
except anyio.IncompleteRead:
196126
# resource has been closed before the requested bytes could be retrieved
197127
# -> signal recource closed
@@ -225,7 +155,7 @@ async def write(self) -> None:
225155
try:
226156
n_bytes = len(message.encode("utf-8"))
227157
response = "Content-Length: {}\r\n\r\n{}".format(n_bytes, message)
228-
await convert_yielded(self._write_one(response))
158+
await self._write_one(response)
229159
except (
230160
anyio.ClosedResourceError,
231161
anyio.BrokenResourceError,

python_packages/jupyter_lsp/jupyter_lsp/session.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ class LanguageServerSessionBase(
5757
)
5858
main_loop = Instance(
5959
IOLoop, help="the event loop of the main thread", allow_none=True)
60+
thread_loop = Instance(
61+
IOLoop, help="the event loop of the worker thread", allow_none=True)
6062
writer = Instance(LspStreamWriter, help="the JSON-RPC writer", allow_none=True)
6163
reader = Instance(LspStreamReader, help="the JSON-RPC reader", allow_none=True)
6264
from_lsp = Instance(
@@ -118,8 +120,7 @@ def start(self):
118120
def stop(self):
119121
"""shut down the session"""
120122
if self.cancelscope is not None:
121-
self.cancelscope.cancel()
122-
self.cancelscope = None
123+
self.thread_loop.add_callback(self.cancelscope.cancel)
123124

124125
# wait for the session to get cleaned up
125126
if self.thread and self.thread.is_alive():
@@ -132,12 +133,15 @@ async def run(self):
132133
the event `self.started` will be set when everything is set up and the session
133134
will be ready for communication
134135
"""
136+
self.thread_loop = IOLoop.current()
135137
async with CancelScope() as scope:
136138
self.cancelscope = scope
137139
await self.initialize()
138140
self.started.set()
139141
await self.listen()
140142
await self.cleanup()
143+
self.cancelscope = None
144+
self.thread_loop = None
141145

142146
async def initialize(self):
143147
"""initialize a language server session"""
@@ -187,7 +191,7 @@ def _on_handlers(self, change: Bunch):
187191
def write(self, message):
188192
"""wrapper around the write queue to keep it mostly internal"""
189193
self.last_handler_message_at = self.now()
190-
IOLoop.current().add_callback(self.to_lsp.put_nowait, message)
194+
self.thread_loop.add_callback(self.to_lsp.put_nowait, message)
191195

192196
def now(self):
193197
return datetime.now(timezone.utc)

0 commit comments

Comments
 (0)