Skip to content

Commit 278e5dd

Browse files
Revert "Reland "[lldb-dap] Improving consistency of tests by removing… (#165892)
… concurrency." (#165688)"" This reverts commit 17dbd86. This was causing timeouts on the premerge runners. Reverting for now until the timeouts trigger within lit and/or we have a better testing strategy for this.
1 parent 0d9c75b commit 278e5dd

File tree

8 files changed

+203
-161
lines changed

8 files changed

+203
-161
lines changed

lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py

Lines changed: 127 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import subprocess
1111
import signal
1212
import sys
13+
import threading
1314
import warnings
14-
import selectors
1515
import time
1616
from typing import (
1717
Any,
@@ -139,6 +139,35 @@ def dump_memory(base_addr, data, num_per_line, outfile):
139139
outfile.write("\n")
140140

141141

142+
def read_packet(
143+
f: IO[bytes], trace_file: Optional[IO[str]] = None
144+
) -> Optional[ProtocolMessage]:
145+
"""Decode a JSON packet that starts with the content length and is
146+
followed by the JSON bytes from a file 'f'. Returns None on EOF.
147+
"""
148+
line = f.readline().decode("utf-8")
149+
if len(line) == 0:
150+
return None # EOF.
151+
152+
# Watch for line that starts with the prefix
153+
prefix = "Content-Length: "
154+
if line.startswith(prefix):
155+
# Decode length of JSON bytes
156+
length = int(line[len(prefix) :])
157+
# Skip empty line
158+
separator = f.readline().decode()
159+
if separator != "":
160+
Exception("malformed DAP content header, unexpected line: " + separator)
161+
# Read JSON bytes
162+
json_str = f.read(length).decode()
163+
if trace_file:
164+
trace_file.write("from adapter:\n%s\n" % (json_str))
165+
# Decode the JSON bytes into a python dictionary
166+
return json.loads(json_str)
167+
168+
raise Exception("unexpected malformed message from lldb-dap: " + line)
169+
170+
142171
def packet_type_is(packet, packet_type):
143172
return "type" in packet and packet["type"] == packet_type
144173

@@ -170,8 +199,16 @@ def __init__(
170199
self.log_file = log_file
171200
self.send = send
172201
self.recv = recv
173-
self.selector = selectors.DefaultSelector()
174-
self.selector.register(recv, selectors.EVENT_READ)
202+
203+
# Packets that have been received and processed but have not yet been
204+
# requested by a test case.
205+
self._pending_packets: List[Optional[ProtocolMessage]] = []
206+
# Received packets that have not yet been processed.
207+
self._recv_packets: List[Optional[ProtocolMessage]] = []
208+
# Used as a mutex for _recv_packets and for notify when _recv_packets
209+
# changes.
210+
self._recv_condition = threading.Condition()
211+
self._recv_thread = threading.Thread(target=self._read_packet_thread)
175212

176213
# session state
177214
self.init_commands = init_commands
@@ -197,6 +234,9 @@ def __init__(
197234
# keyed by breakpoint id
198235
self.resolved_breakpoints: dict[str, Breakpoint] = {}
199236

237+
# trigger enqueue thread
238+
self._recv_thread.start()
239+
200240
@classmethod
201241
def encode_content(cls, s: str) -> bytes:
202242
return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
@@ -212,46 +252,17 @@ def validate_response(cls, command, response):
212252
f"seq mismatch in response {command['seq']} != {response['request_seq']}"
213253
)
214254

215-
def _read_packet(
216-
self,
217-
timeout: float = DEFAULT_TIMEOUT,
218-
) -> Optional[ProtocolMessage]:
219-
"""Decode a JSON packet that starts with the content length and is
220-
followed by the JSON bytes from self.recv. Returns None on EOF.
221-
"""
222-
223-
ready = self.selector.select(timeout)
224-
if not ready:
225-
warnings.warn(
226-
"timeout occurred waiting for a packet, check if the test has a"
227-
" negative assertion and see if it can be inverted.",
228-
stacklevel=4,
229-
)
230-
return None # timeout
231-
232-
line = self.recv.readline().decode("utf-8")
233-
if len(line) == 0:
234-
return None # EOF.
235-
236-
# Watch for line that starts with the prefix
237-
prefix = "Content-Length: "
238-
if line.startswith(prefix):
239-
# Decode length of JSON bytes
240-
length = int(line[len(prefix) :])
241-
# Skip empty line
242-
separator = self.recv.readline().decode()
243-
if separator != "":
244-
Exception("malformed DAP content header, unexpected line: " + separator)
245-
# Read JSON bytes
246-
json_str = self.recv.read(length).decode()
247-
if self.trace_file:
248-
self.trace_file.write(
249-
"%s from adapter:\n%s\n" % (time.time(), json_str)
250-
)
251-
# Decode the JSON bytes into a python dictionary
252-
return json.loads(json_str)
253-
254-
raise Exception("unexpected malformed message from lldb-dap: " + line)
255+
def _read_packet_thread(self):
256+
try:
257+
while True:
258+
packet = read_packet(self.recv, trace_file=self.trace_file)
259+
# `packet` will be `None` on EOF. We want to pass it down to
260+
# handle_recv_packet anyway so the main thread can handle unexpected
261+
# termination of lldb-dap and stop waiting for new packets.
262+
if not self._handle_recv_packet(packet):
263+
break
264+
finally:
265+
dump_dap_log(self.log_file)
255266

256267
def get_modules(
257268
self, start_module: Optional[int] = None, module_count: Optional[int] = None
@@ -299,6 +310,34 @@ def collect_output(
299310
output += self.get_output(category, clear=clear)
300311
return output
301312

313+
def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
314+
with self.recv_condition:
315+
self.recv_packets.append(packet)
316+
self.recv_condition.notify()
317+
318+
def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
319+
"""Handles an incoming packet.
320+
321+
Called by the read thread that is waiting for all incoming packets
322+
to store the incoming packet in "self._recv_packets" in a thread safe
323+
way. This function will then signal the "self._recv_condition" to
324+
indicate a new packet is available.
325+
326+
Args:
327+
packet: A new packet to store.
328+
329+
Returns:
330+
True if the caller should keep calling this function for more
331+
packets.
332+
"""
333+
with self._recv_condition:
334+
self._recv_packets.append(packet)
335+
self._recv_condition.notify()
336+
# packet is None on EOF
337+
return packet is not None and not (
338+
packet["type"] == "response" and packet["command"] == "disconnect"
339+
)
340+
302341
def _recv_packet(
303342
self,
304343
*,
@@ -322,34 +361,46 @@ def _recv_packet(
322361
The first matching packet for the given predicate, if specified,
323362
otherwise None.
324363
"""
325-
deadline = time.time() + timeout
326-
327-
while time.time() < deadline:
328-
packet = self._read_packet(timeout=deadline - time.time())
329-
if packet is None:
330-
return None
331-
self._process_recv_packet(packet)
332-
if not predicate or predicate(packet):
333-
return packet
334-
335-
def _process_recv_packet(self, packet) -> None:
364+
assert (
365+
threading.current_thread != self._recv_thread
366+
), "Must not be called from the _recv_thread"
367+
368+
def process_until_match():
369+
self._process_recv_packets()
370+
for i, packet in enumerate(self._pending_packets):
371+
if packet is None:
372+
# We need to return a truthy value to break out of the
373+
# wait_for, use `EOFError` as an indicator of EOF.
374+
return EOFError()
375+
if predicate and predicate(packet):
376+
self._pending_packets.pop(i)
377+
return packet
378+
379+
with self._recv_condition:
380+
packet = self._recv_condition.wait_for(process_until_match, timeout)
381+
return None if isinstance(packet, EOFError) else packet
382+
383+
def _process_recv_packets(self) -> None:
336384
"""Process received packets, updating the session state."""
337-
if packet and ("seq" not in packet or packet["seq"] == 0):
338-
warnings.warn(
339-
f"received a malformed packet, expected 'seq != 0' for {packet!r}"
340-
)
341-
# Handle events that may modify any stateful properties of
342-
# the DAP session.
343-
if packet and packet["type"] == "event":
344-
self._handle_event(packet)
345-
elif packet and packet["type"] == "request":
346-
# Handle reverse requests and keep processing.
347-
self._handle_reverse_request(packet)
385+
with self._recv_condition:
386+
for packet in self._recv_packets:
387+
if packet and ("seq" not in packet or packet["seq"] == 0):
388+
warnings.warn(
389+
f"received a malformed packet, expected 'seq != 0' for {packet!r}"
390+
)
391+
# Handle events that may modify any stateful properties of
392+
# the DAP session.
393+
if packet and packet["type"] == "event":
394+
self._handle_event(packet)
395+
elif packet and packet["type"] == "request":
396+
# Handle reverse requests and keep processing.
397+
self._handle_reverse_request(packet)
398+
# Move the packet to the pending queue.
399+
self._pending_packets.append(packet)
400+
self._recv_packets.clear()
348401

349402
def _handle_event(self, packet: Event) -> None:
350403
"""Handle any events that modify debug session state we track."""
351-
self.events.append(packet)
352-
353404
event = packet["event"]
354405
body: Optional[Dict] = packet.get("body", None)
355406

@@ -402,8 +453,6 @@ def _handle_event(self, packet: Event) -> None:
402453
self.invalidated_event = packet
403454
elif event == "memory":
404455
self.memory_event = packet
405-
elif event == "module":
406-
self.module_events.append(packet)
407456

408457
def _handle_reverse_request(self, request: Request) -> None:
409458
if request in self.reverse_requests:
@@ -472,14 +521,18 @@ def send_packet(self, packet: ProtocolMessage) -> int:
472521
473522
Returns the seq number of the request.
474523
"""
475-
packet["seq"] = self.sequence
476-
self.sequence += 1
524+
# Set the seq for requests.
525+
if packet["type"] == "request":
526+
packet["seq"] = self.sequence
527+
self.sequence += 1
528+
else:
529+
packet["seq"] = 0
477530

478531
# Encode our command dictionary as a JSON string
479532
json_str = json.dumps(packet, separators=(",", ":"))
480533

481534
if self.trace_file:
482-
self.trace_file.write("%s to adapter:\n%s\n" % (time.time(), json_str))
535+
self.trace_file.write("to adapter:\n%s\n" % (json_str))
483536

484537
length = len(json_str)
485538
if length > 0:
@@ -860,8 +913,6 @@ def request_restart(self, restartArguments=None):
860913
if restartArguments:
861914
command_dict["arguments"] = restartArguments
862915

863-
# Clear state, the process is about to restart...
864-
self._process_continued(True)
865916
response = self._send_recv(command_dict)
866917
# Caller must still call wait_for_stopped.
867918
return response
@@ -1428,10 +1479,8 @@ def request_testGetTargetBreakpoints(self):
14281479

14291480
def terminate(self):
14301481
self.send.close()
1431-
self.recv.close()
1432-
self.selector.close()
1433-
if self.log_file:
1434-
dump_dap_log(self.log_file)
1482+
if self._recv_thread.is_alive():
1483+
self._recv_thread.join()
14351484

14361485
def request_setInstructionBreakpoints(self, memory_reference=[]):
14371486
breakpoints = []
@@ -1528,7 +1577,6 @@ def launch(
15281577
stdout=subprocess.PIPE,
15291578
stderr=sys.stderr,
15301579
env=adapter_env,
1531-
bufsize=0,
15321580
)
15331581

15341582
if connection is None:

lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
# DAP tests as a whole have been flakey on the Windows on Arm bot. See:
1616
# https://github.com/llvm/llvm-project/issues/137660
1717
@skipIf(oslist=["windows"], archs=["aarch64"])
18-
# The Arm Linux bot needs stable resources before it can run these tests reliably.
19-
@skipIf(oslist=["linux"], archs=["arm$"])
2018
class DAPTestCaseBase(TestBase):
2119
# set timeout based on whether ASAN was enabled or not. Increase
2220
# timeout by a factor of 10 if ASAN is enabled.
@@ -418,7 +416,7 @@ def continue_to_next_stop(self):
418416
return self.dap_server.wait_for_stopped()
419417

420418
def continue_to_breakpoint(self, breakpoint_id: str):
421-
self.continue_to_breakpoints([breakpoint_id])
419+
self.continue_to_breakpoints((breakpoint_id))
422420

423421
def continue_to_breakpoints(self, breakpoint_ids):
424422
self.do_continue()

lldb/test/API/tools/lldb-dap/breakpoint-events/TestDAP_breakpointEvents.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,24 @@ def test_breakpoint_events(self):
8181
breakpoint["verified"], "expect foo breakpoint to not be verified"
8282
)
8383

84+
# Flush the breakpoint events.
85+
self.dap_server.wait_for_breakpoint_events()
86+
8487
# Continue to the breakpoint
85-
self.continue_to_breakpoint(foo_bp_id)
86-
self.continue_to_next_stop() # foo_bp2
87-
self.continue_to_breakpoint(main_bp_id)
88-
self.continue_to_exit()
88+
self.continue_to_breakpoints(dap_breakpoint_ids)
8989

90-
bp_events = [e for e in self.dap_server.events if e["event"] == "breakpoint"]
90+
verified_breakpoint_ids = []
91+
unverified_breakpoint_ids = []
92+
for breakpoint_event in self.dap_server.wait_for_breakpoint_events():
93+
breakpoint = breakpoint_event["body"]["breakpoint"]
94+
id = breakpoint["id"]
95+
if breakpoint["verified"]:
96+
verified_breakpoint_ids.append(id)
97+
else:
98+
unverified_breakpoint_ids.append(id)
9199

92-
main_bp_events = [
93-
e for e in bp_events if e["body"]["breakpoint"]["id"] == main_bp_id
94-
]
95-
foo_bp_events = [
96-
e for e in bp_events if e["body"]["breakpoint"]["id"] == foo_bp_id
97-
]
100+
self.assertIn(main_bp_id, unverified_breakpoint_ids)
101+
self.assertIn(foo_bp_id, unverified_breakpoint_ids)
98102

99-
self.assertTrue(main_bp_events)
100-
self.assertTrue(foo_bp_events)
103+
self.assertIn(main_bp_id, verified_breakpoint_ids)
104+
self.assertIn(foo_bp_id, verified_breakpoint_ids)

lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ def test_debuggerRoot(self):
156156
self.build_and_launch(
157157
program, debuggerRoot=program_parent_dir, initCommands=commands
158158
)
159-
self.continue_to_exit()
160159
output = self.get_console()
161160
self.assertTrue(output and len(output) > 0, "expect console output")
162161
lines = output.splitlines()
@@ -172,6 +171,7 @@ def test_debuggerRoot(self):
172171
% (program_parent_dir, line[len(prefix) :]),
173172
)
174173
self.assertTrue(found, "verified lldb-dap working directory")
174+
self.continue_to_exit()
175175

176176
def test_sourcePath(self):
177177
"""

0 commit comments

Comments
 (0)