Skip to content

Commit 82fd8f4

Browse files
authored
Merge pull request #494 from dpriedel/FixTruncatedJediResponses
Attempt to fix truncated Jedi LSP responses
2 parents e9b115d + 6463113 commit 82fd8f4

File tree

4 files changed

+109
-29
lines changed

4 files changed

+109
-29
lines changed

atest/05_Features/Completion.robot

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ Trigger Completer
299299
Completer Should Include Documentation
300300
[Arguments] ${text}
301301
Wait Until Page Contains Element ${DOCUMENTATION_PANEL} timeout=10s
302+
Wait Until Keyword Succeeds 10 x 1 s Element Should Contain ${DOCUMENTATION_PANEL} ${text}
302303
Element Should Contain ${DOCUMENTATION_PANEL} ${text}
303304

304305
Restart Kernel

python_packages/jupyter_lsp/jupyter_lsp/session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def init_process(self):
137137
stdin=subprocess.PIPE,
138138
stdout=subprocess.PIPE,
139139
env=self.substitute_env(self.spec.get("env", {}), os.environ),
140+
bufsize=0,
140141
)
141142

142143
def init_queues(self):

python_packages/jupyter_lsp/jupyter_lsp/stdio.py

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class LspStdIoBase(LoggingConfigurable):
3131
executor = None
3232

3333
stream = Instance(
34-
io.BufferedIOBase, help="the stream to read/write"
35-
) # type: io.BufferedIOBase
34+
io.RawIOBase, help="the stream to read/write"
35+
) # type: io.RawIOBase
3636
queue = Instance(Queue, help="queue to get/put")
3737

3838
def __repr__(self): # pragma: no cover
@@ -61,7 +61,7 @@ class LspStdIoReader(LspStdIoBase):
6161

6262
@default("max_wait")
6363
def _default_max_wait(self):
64-
return 2.0 if os.name == "nt" else self.min_wait
64+
return 0.1 if os.name == "nt" else self.min_wait * 2
6565

6666
async def sleep(self):
6767
"""Simple exponential backoff for sleeping"""
@@ -93,12 +93,17 @@ async def read(self) -> None:
9393
self.wake()
9494

9595
IOLoop.current().add_callback(self.queue.put_nowait, message)
96-
except Exception: # pragma: no cover
97-
self.log.exception("%s couldn't enqueue message: %s", self, message)
96+
except Exception as e: # pragma: no cover
97+
self.log.exception(
98+
"%s couldn't enqueue message: %s (%s)", self, message, e
99+
)
98100
await self.sleep()
99101

100-
def _read_content(self, length: int, max_parts=1000) -> Optional[bytes]:
101-
"""Read the full length of the message unless exceeding max_parts.
102+
async def _read_content(
103+
self, length: int, max_parts=1000, max_empties=200
104+
) -> Optional[bytes]:
105+
"""Read the full length of the message unless exceeding max_parts or
106+
max_empties empty reads occur.
102107
103108
See https://github.com/krassowski/jupyterlab-lsp/issues/450
104109
@@ -116,19 +121,27 @@ def _read_content(self, length: int, max_parts=1000) -> Optional[bytes]:
116121
raw = None
117122
raw_parts: List[bytes] = []
118123
received_size = 0
119-
while received_size < length and len(raw_parts) < max_parts:
120-
part = self.stream.read(length)
124+
while received_size < length and len(raw_parts) < max_parts and max_empties > 0:
125+
part = None
126+
try:
127+
part = self.stream.read(length)
128+
except OSError: # pragma: no cover
129+
pass
121130
if part is None:
122-
break # pragma: no cover
131+
max_empties -= 1
132+
await self.sleep()
133+
continue
123134
received_size += len(part)
124135
raw_parts.append(part)
125136

126137
if raw_parts:
127138
raw = b"".join(raw_parts)
128139
if len(raw) != length: # pragma: no cover
129140
self.log.warning(
130-
f"Readout and content-length mismatch:" f" {len(raw)} vs {length}"
141+
f"Readout and content-length mismatch: {len(raw)} vs {length};"
142+
f"remaining empties: {max_empties}; remaining parts: {max_parts}"
131143
)
144+
132145
return raw
133146

134147
async def read_one(self) -> Text:
@@ -146,24 +159,15 @@ async def read_one(self) -> Text:
146159
content_length = int(headers.get("content-length", "0"))
147160

148161
if content_length:
149-
raw = None
150-
retries = 5
151-
while raw is None and retries:
152-
try:
153-
raw = self._read_content(length=content_length)
154-
except OSError: # pragma: no cover
155-
raw = None
156-
if raw is None: # pragma: no cover
157-
self.log.warning(
158-
"%s failed to read message of length %s",
159-
self,
160-
content_length,
161-
)
162-
await self.sleep()
163-
retries -= 1
164-
else:
165-
message = raw.decode("utf-8").strip()
166-
break
162+
raw = await self._read_content(length=content_length)
163+
if raw is not None:
164+
message = raw.decode("utf-8").strip()
165+
else: # pragma: no cover
166+
self.log.warning(
167+
"%s failed to read message of length %s",
168+
self,
169+
content_length,
170+
)
167171

168172
return message
169173

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import asyncio
2+
import subprocess
3+
4+
import pytest
5+
from tornado.queues import Queue
6+
7+
from jupyter_lsp.stdio import LspStdIoReader
8+
9+
WRITER_TEMPLATE = """
10+
from time import sleep
11+
12+
print('Content-Length: {length}')
13+
print()
14+
15+
for repeat in range({repeats}):
16+
sleep({interval})
17+
print('{message}', end='')
18+
print()
19+
"""
20+
21+
22+
class CommunicatorSpawner:
23+
def __init__(self, tmp_path):
24+
self.tmp_path = tmp_path
25+
26+
def spawn_writer(self, message: str, repeats: int = 1, interval=None):
27+
length = len(message) * repeats
28+
commands_file = self.tmp_path / "writer.py"
29+
commands_file.write_text(
30+
WRITER_TEMPLATE.format(
31+
length=length, repeats=repeats, interval=interval or 0, message=message
32+
)
33+
)
34+
return subprocess.Popen(
35+
["python", "-u", str(commands_file)], stdout=subprocess.PIPE, bufsize=0
36+
)
37+
38+
39+
@pytest.fixture
40+
def communicator_spawner(tmp_path):
41+
return CommunicatorSpawner(tmp_path)
42+
43+
44+
async def join_process(process: subprocess.Popen, headstart=1, timeout=1):
45+
await asyncio.sleep(headstart)
46+
result = process.wait(timeout=timeout)
47+
if process.stdout:
48+
process.stdout.close()
49+
return result
50+
51+
52+
@pytest.mark.parametrize(
53+
"message,repeats,interval",
54+
[
55+
["short", 1, None],
56+
["ab" * 10_0000, 1, None],
57+
["ab", 2, 0.01],
58+
["ab", 45, 0.01],
59+
],
60+
ids=["short", "long", "intermittent", "intensive-intermittent"],
61+
)
62+
@pytest.mark.asyncio
63+
async def test_reader(message, repeats, interval, communicator_spawner):
64+
queue = Queue()
65+
66+
process = communicator_spawner.spawn_writer(
67+
message=message, repeats=repeats, interval=interval
68+
)
69+
reader = LspStdIoReader(stream=process.stdout, queue=queue)
70+
71+
await asyncio.gather(join_process(process, headstart=3, timeout=1), reader.read())
72+
73+
result = queue.get_nowait()
74+
assert result == message * repeats

0 commit comments

Comments
 (0)