Skip to content

Commit 024a156

Browse files
committed
Add tests for long stdio messages, fix await logic
1 parent b6b8ba4 commit 024a156

File tree

2 files changed

+105
-27
lines changed

2 files changed

+105
-27
lines changed

python_packages/jupyter_lsp/jupyter_lsp/stdio.py

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,14 @@ async def read(self) -> None:
9393
self.wake()
9494

9595
IOLoop.current().add_callback(self.queue.put_nowait, message)
96-
except Exception: # pragma: no cover
97-
self.log.exception("%s couldn't enqueue message: %s", self, message)
96+
except Exception as e: # pragma: no cover
97+
self.log.exception(
98+
"%s couldn't enqueue message: %s (%s)", self, message, e
99+
)
98100
await self.sleep()
99101

100-
def _read_content(
101-
self, length: int, max_parts=100, max_empties=50
102+
async def _read_content(
103+
self, length: int, max_parts=500, max_empties=50
102104
) -> Optional[bytes]:
103105
"""Read the full length of the message unless exceeding max_parts or
104106
max_empties empty reads occur.
@@ -119,13 +121,16 @@ def _read_content(
119121
raw = None
120122
raw_parts: List[bytes] = []
121123
received_size = 0
122-
while (
123-
received_size < length and len(raw_parts) < max_parts and max_empties > 0
124-
):
125-
part = self.stream.read(length)
124+
while received_size < length and len(raw_parts) < max_parts and max_empties > 0:
125+
part = None
126+
try:
127+
part = self.stream.read(length)
128+
except OSError: # pragma: no cover
129+
pass
126130
if part is None:
127131
max_empties -= 1
128-
continue # pragma: no cover
132+
await self.sleep()
133+
continue
129134
received_size += len(part)
130135
raw_parts.append(part)
131136

@@ -135,6 +140,7 @@ def _read_content(
135140
self.log.warning(
136141
f"Readout and content-length mismatch:" f" {len(raw)} vs {length}"
137142
)
143+
138144
return raw
139145

140146
async def read_one(self) -> Text:
@@ -152,24 +158,15 @@ async def read_one(self) -> Text:
152158
content_length = int(headers.get("content-length", "0"))
153159

154160
if content_length:
155-
raw = None
156-
retries = 5
157-
while raw is None and retries:
158-
try:
159-
raw = self._read_content(length=content_length)
160-
except OSError: # pragma: no cover
161-
raw = None
162-
if raw is None: # pragma: no cover
163-
self.log.warning(
164-
"%s failed to read message of length %s",
165-
self,
166-
content_length,
167-
)
168-
await self.sleep()
169-
retries -= 1
170-
else:
171-
message = raw.decode("utf-8").strip()
172-
break
161+
raw = await self._read_content(length=content_length)
162+
if raw is not None:
163+
message = raw.decode("utf-8").strip()
164+
else: # pragma: no cover
165+
self.log.warning(
166+
"%s failed to read message of length %s",
167+
self,
168+
content_length,
169+
)
173170

174171
return message
175172

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import asyncio
2+
import subprocess
3+
import time
4+
from threading import Thread
5+
6+
import pytest
7+
from tornado.queues import Queue
8+
9+
from jupyter_lsp.stdio import LspStdIoReader
10+
11+
WRITER_TEMPLATE = """
12+
from time import sleep
13+
14+
print('Content-Length: {length}')
15+
print()
16+
17+
for repeat in range({repeats}):
18+
sleep({interval})
19+
print('{message}', end='')
20+
print()
21+
"""
22+
23+
24+
class CommunicatorSpawner:
25+
def __init__(self, tmpdir):
26+
self.tmpdir = tmpdir
27+
28+
def spawn_writer(self, message: str, repeats: int = 1, interval=None):
29+
length = len(message) * repeats
30+
commands_file = self.tmpdir / "writer.py"
31+
commands_file.write(
32+
WRITER_TEMPLATE.format(
33+
length=length, repeats=repeats, interval=interval or 0, message=message
34+
)
35+
)
36+
return subprocess.Popen(
37+
["python", "-u", commands_file.realpath()],
38+
stdin=subprocess.PIPE,
39+
stdout=subprocess.PIPE,
40+
)
41+
42+
43+
@pytest.fixture
44+
def communicator_spawner(tmpdir):
45+
return CommunicatorSpawner(tmpdir)
46+
47+
48+
def communicate_and_close(process, wait=1):
49+
def communicate_and_close():
50+
time.sleep(wait)
51+
process.communicate()
52+
53+
thread = Thread(target=communicate_and_close)
54+
thread.start()
55+
56+
57+
@pytest.mark.parametrize(
58+
"message,repeats,interval",
59+
[
60+
["short", 1, None],
61+
["ab" * 10_0000, 1, None],
62+
["ab", 2, 0.01],
63+
["ab", 45, 0.01],
64+
],
65+
ids=["short", "long", "intermittent", "intensive-intermittent"],
66+
)
67+
@pytest.mark.asyncio
68+
async def test_reader(message, repeats, interval, communicator_spawner):
69+
queue = Queue()
70+
71+
process = communicator_spawner.spawn_writer(
72+
message=message, repeats=repeats, interval=interval
73+
)
74+
reader = LspStdIoReader(stream=process.stdout, queue=queue)
75+
timeout = 2 + (interval or 1) * repeats * 2
76+
77+
communicate_and_close(process)
78+
await asyncio.wait_for(reader.read(), timeout=timeout)
79+
80+
result = queue.get_nowait()
81+
assert result == message * repeats

0 commit comments

Comments
 (0)