Skip to content

Commit b64b3fb

Browse files
ashgtiaokblast
authored andcommitted
[lldb-dap] Improving consistency of tests by removing concurrency. (llvm#165496)
We currently use a background thread to read the DAP output. This means the test thread and the background thread can race at times and we may have inconsistent timing due to these races. To improve the consistency I've removed the reader thread and instead switched to using the `selectors` module that wraps `select` in a platform independent way.
1 parent 9d50c26 commit b64b3fb

File tree

8 files changed

+159
-203
lines changed

8 files changed

+159
-203
lines changed

lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py

Lines changed: 79 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import subprocess
1111
import signal
1212
import sys
13-
import threading
1413
import warnings
14+
import selectors
1515
import time
1616
from typing import (
1717
Any,
@@ -139,35 +139,6 @@ def dump_memory(base_addr, data, num_per_line, outfile):
139139
outfile.write("\n")
140140

141141

142-
def read_packet(
143-
f: IO[bytes], trace_file: Optional[IO[str]] = None
144-
) -> Optional[ProtocolMessage]:
145-
"""Decode a JSON packet that starts with the content length and is
146-
followed by the JSON bytes from a file 'f'. Returns None on EOF.
147-
"""
148-
line = f.readline().decode("utf-8")
149-
if len(line) == 0:
150-
return None # EOF.
151-
152-
# Watch for line that starts with the prefix
153-
prefix = "Content-Length: "
154-
if line.startswith(prefix):
155-
# Decode length of JSON bytes
156-
length = int(line[len(prefix) :])
157-
# Skip empty line
158-
separator = f.readline().decode()
159-
if separator != "":
160-
Exception("malformed DAP content header, unexpected line: " + separator)
161-
# Read JSON bytes
162-
json_str = f.read(length).decode()
163-
if trace_file:
164-
trace_file.write("from adapter:\n%s\n" % (json_str))
165-
# Decode the JSON bytes into a python dictionary
166-
return json.loads(json_str)
167-
168-
raise Exception("unexpected malformed message from lldb-dap: " + line)
169-
170-
171142
def packet_type_is(packet, packet_type):
172143
return "type" in packet and packet["type"] == packet_type
173144

@@ -199,16 +170,8 @@ def __init__(
199170
self.log_file = log_file
200171
self.send = send
201172
self.recv = recv
202-
203-
# Packets that have been received and processed but have not yet been
204-
# requested by a test case.
205-
self._pending_packets: List[Optional[ProtocolMessage]] = []
206-
# Received packets that have not yet been processed.
207-
self._recv_packets: List[Optional[ProtocolMessage]] = []
208-
# Used as a mutex for _recv_packets and for notify when _recv_packets
209-
# changes.
210-
self._recv_condition = threading.Condition()
211-
self._recv_thread = threading.Thread(target=self._read_packet_thread)
173+
self.selector = selectors.DefaultSelector()
174+
self.selector.register(recv, selectors.EVENT_READ)
212175

213176
# session state
214177
self.init_commands = init_commands
@@ -234,9 +197,6 @@ def __init__(
234197
# keyed by breakpoint id
235198
self.resolved_breakpoints: dict[str, Breakpoint] = {}
236199

237-
# trigger enqueue thread
238-
self._recv_thread.start()
239-
240200
@classmethod
241201
def encode_content(cls, s: str) -> bytes:
242202
return ("Content-Length: %u\r\n\r\n%s" % (len(s), s)).encode("utf-8")
@@ -252,17 +212,46 @@ def validate_response(cls, command, response):
252212
f"seq mismatch in response {command['seq']} != {response['request_seq']}"
253213
)
254214

255-
def _read_packet_thread(self):
256-
try:
257-
while True:
258-
packet = read_packet(self.recv, trace_file=self.trace_file)
259-
# `packet` will be `None` on EOF. We want to pass it down to
260-
# handle_recv_packet anyway so the main thread can handle unexpected
261-
# termination of lldb-dap and stop waiting for new packets.
262-
if not self._handle_recv_packet(packet):
263-
break
264-
finally:
265-
dump_dap_log(self.log_file)
215+
def _read_packet(
216+
self,
217+
timeout: float = DEFAULT_TIMEOUT,
218+
) -> Optional[ProtocolMessage]:
219+
"""Decode a JSON packet that starts with the content length and is
220+
followed by the JSON bytes from self.recv. Returns None on EOF.
221+
"""
222+
223+
ready = self.selector.select(timeout)
224+
if not ready:
225+
warnings.warn(
226+
"timeout occurred waiting for a packet, check if the test has a"
227+
" negative assertion and see if it can be inverted.",
228+
stacklevel=4,
229+
)
230+
return None # timeout
231+
232+
line = self.recv.readline().decode("utf-8")
233+
if len(line) == 0:
234+
return None # EOF.
235+
236+
# Watch for line that starts with the prefix
237+
prefix = "Content-Length: "
238+
if line.startswith(prefix):
239+
# Decode length of JSON bytes
240+
length = int(line[len(prefix) :])
241+
# Skip empty line
242+
separator = self.recv.readline().decode()
243+
if separator != "":
244+
Exception("malformed DAP content header, unexpected line: " + separator)
245+
# Read JSON bytes
246+
json_str = self.recv.read(length).decode()
247+
if self.trace_file:
248+
self.trace_file.write(
249+
"%s from adapter:\n%s\n" % (time.time(), json_str)
250+
)
251+
# Decode the JSON bytes into a python dictionary
252+
return json.loads(json_str)
253+
254+
raise Exception("unexpected malformed message from lldb-dap: " + line)
266255

267256
def get_modules(
268257
self, start_module: Optional[int] = None, module_count: Optional[int] = None
@@ -310,34 +299,6 @@ def collect_output(
310299
output += self.get_output(category, clear=clear)
311300
return output
312301

313-
def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
314-
with self.recv_condition:
315-
self.recv_packets.append(packet)
316-
self.recv_condition.notify()
317-
318-
def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
319-
"""Handles an incoming packet.
320-
321-
Called by the read thread that is waiting for all incoming packets
322-
to store the incoming packet in "self._recv_packets" in a thread safe
323-
way. This function will then signal the "self._recv_condition" to
324-
indicate a new packet is available.
325-
326-
Args:
327-
packet: A new packet to store.
328-
329-
Returns:
330-
True if the caller should keep calling this function for more
331-
packets.
332-
"""
333-
with self._recv_condition:
334-
self._recv_packets.append(packet)
335-
self._recv_condition.notify()
336-
# packet is None on EOF
337-
return packet is not None and not (
338-
packet["type"] == "response" and packet["command"] == "disconnect"
339-
)
340-
341302
def _recv_packet(
342303
self,
343304
*,
@@ -361,46 +322,34 @@ def _recv_packet(
361322
The first matching packet for the given predicate, if specified,
362323
otherwise None.
363324
"""
364-
assert (
365-
threading.current_thread != self._recv_thread
366-
), "Must not be called from the _recv_thread"
367-
368-
def process_until_match():
369-
self._process_recv_packets()
370-
for i, packet in enumerate(self._pending_packets):
371-
if packet is None:
372-
# We need to return a truthy value to break out of the
373-
# wait_for, use `EOFError` as an indicator of EOF.
374-
return EOFError()
375-
if predicate and predicate(packet):
376-
self._pending_packets.pop(i)
377-
return packet
378-
379-
with self._recv_condition:
380-
packet = self._recv_condition.wait_for(process_until_match, timeout)
381-
return None if isinstance(packet, EOFError) else packet
382-
383-
def _process_recv_packets(self) -> None:
325+
deadline = time.time() + timeout
326+
327+
while time.time() < deadline:
328+
packet = self._read_packet(timeout=deadline - time.time())
329+
if packet is None:
330+
return None
331+
self._process_recv_packet(packet)
332+
if not predicate or predicate(packet):
333+
return packet
334+
335+
def _process_recv_packet(self, packet) -> None:
384336
"""Process received packets, updating the session state."""
385-
with self._recv_condition:
386-
for packet in self._recv_packets:
387-
if packet and ("seq" not in packet or packet["seq"] == 0):
388-
warnings.warn(
389-
f"received a malformed packet, expected 'seq != 0' for {packet!r}"
390-
)
391-
# Handle events that may modify any stateful properties of
392-
# the DAP session.
393-
if packet and packet["type"] == "event":
394-
self._handle_event(packet)
395-
elif packet and packet["type"] == "request":
396-
# Handle reverse requests and keep processing.
397-
self._handle_reverse_request(packet)
398-
# Move the packet to the pending queue.
399-
self._pending_packets.append(packet)
400-
self._recv_packets.clear()
337+
if packet and ("seq" not in packet or packet["seq"] == 0):
338+
warnings.warn(
339+
f"received a malformed packet, expected 'seq != 0' for {packet!r}"
340+
)
341+
# Handle events that may modify any stateful properties of
342+
# the DAP session.
343+
if packet and packet["type"] == "event":
344+
self._handle_event(packet)
345+
elif packet and packet["type"] == "request":
346+
# Handle reverse requests and keep processing.
347+
self._handle_reverse_request(packet)
401348

402349
def _handle_event(self, packet: Event) -> None:
403350
"""Handle any events that modify debug session state we track."""
351+
self.events.append(packet)
352+
404353
event = packet["event"]
405354
body: Optional[Dict] = packet.get("body", None)
406355

@@ -453,6 +402,8 @@ def _handle_event(self, packet: Event) -> None:
453402
self.invalidated_event = packet
454403
elif event == "memory":
455404
self.memory_event = packet
405+
elif event == "module":
406+
self.module_events.append(packet)
456407

457408
def _handle_reverse_request(self, request: Request) -> None:
458409
if request in self.reverse_requests:
@@ -521,18 +472,14 @@ def send_packet(self, packet: ProtocolMessage) -> int:
521472
522473
Returns the seq number of the request.
523474
"""
524-
# Set the seq for requests.
525-
if packet["type"] == "request":
526-
packet["seq"] = self.sequence
527-
self.sequence += 1
528-
else:
529-
packet["seq"] = 0
475+
packet["seq"] = self.sequence
476+
self.sequence += 1
530477

531478
# Encode our command dictionary as a JSON string
532479
json_str = json.dumps(packet, separators=(",", ":"))
533480

534481
if self.trace_file:
535-
self.trace_file.write("to adapter:\n%s\n" % (json_str))
482+
self.trace_file.write("%s to adapter:\n%s\n" % (time.time(), json_str))
536483

537484
length = len(json_str)
538485
if length > 0:
@@ -913,6 +860,8 @@ def request_restart(self, restartArguments=None):
913860
if restartArguments:
914861
command_dict["arguments"] = restartArguments
915862

863+
# Clear state, the process is about to restart...
864+
self._process_continued(True)
916865
response = self._send_recv(command_dict)
917866
# Caller must still call wait_for_stopped.
918867
return response
@@ -1479,8 +1428,10 @@ def request_testGetTargetBreakpoints(self):
14791428

14801429
def terminate(self):
14811430
self.send.close()
1482-
if self._recv_thread.is_alive():
1483-
self._recv_thread.join()
1431+
self.recv.close()
1432+
self.selector.close()
1433+
if self.log_file:
1434+
dump_dap_log(self.log_file)
14841435

14851436
def request_setInstructionBreakpoints(self, memory_reference=[]):
14861437
breakpoints = []
@@ -1577,6 +1528,7 @@ def launch(
15771528
stdout=subprocess.PIPE,
15781529
stderr=sys.stderr,
15791530
env=adapter_env,
1531+
bufsize=0,
15801532
)
15811533

15821534
if connection is None:

lldb/packages/Python/lldbsuite/test/tools/lldb-dap/lldbdap_testcase.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ def continue_to_next_stop(self):
416416
return self.dap_server.wait_for_stopped()
417417

418418
def continue_to_breakpoint(self, breakpoint_id: str):
419-
self.continue_to_breakpoints((breakpoint_id))
419+
self.continue_to_breakpoints([breakpoint_id])
420420

421421
def continue_to_breakpoints(self, breakpoint_ids):
422422
self.do_continue()

lldb/test/API/tools/lldb-dap/breakpoint-events/TestDAP_breakpointEvents.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,24 +81,20 @@ def test_breakpoint_events(self):
8181
breakpoint["verified"], "expect foo breakpoint to not be verified"
8282
)
8383

84-
# Flush the breakpoint events.
85-
self.dap_server.wait_for_breakpoint_events()
86-
8784
# Continue to the breakpoint
88-
self.continue_to_breakpoints(dap_breakpoint_ids)
85+
self.continue_to_breakpoint(foo_bp_id)
86+
self.continue_to_next_stop() # foo_bp2
87+
self.continue_to_breakpoint(main_bp_id)
88+
self.continue_to_exit()
8989

90-
verified_breakpoint_ids = []
91-
unverified_breakpoint_ids = []
92-
for breakpoint_event in self.dap_server.wait_for_breakpoint_events():
93-
breakpoint = breakpoint_event["body"]["breakpoint"]
94-
id = breakpoint["id"]
95-
if breakpoint["verified"]:
96-
verified_breakpoint_ids.append(id)
97-
else:
98-
unverified_breakpoint_ids.append(id)
90+
bp_events = [e for e in self.dap_server.events if e["event"] == "breakpoint"]
9991

100-
self.assertIn(main_bp_id, unverified_breakpoint_ids)
101-
self.assertIn(foo_bp_id, unverified_breakpoint_ids)
92+
main_bp_events = [
93+
e for e in bp_events if e["body"]["breakpoint"]["id"] == main_bp_id
94+
]
95+
foo_bp_events = [
96+
e for e in bp_events if e["body"]["breakpoint"]["id"] == foo_bp_id
97+
]
10298

103-
self.assertIn(main_bp_id, verified_breakpoint_ids)
104-
self.assertIn(foo_bp_id, verified_breakpoint_ids)
99+
self.assertTrue(main_bp_events)
100+
self.assertTrue(foo_bp_events)

lldb/test/API/tools/lldb-dap/launch/TestDAP_launch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def test_debuggerRoot(self):
156156
self.build_and_launch(
157157
program, debuggerRoot=program_parent_dir, initCommands=commands
158158
)
159+
self.continue_to_exit()
159160
output = self.get_console()
160161
self.assertTrue(output and len(output) > 0, "expect console output")
161162
lines = output.splitlines()
@@ -171,7 +172,6 @@ def test_debuggerRoot(self):
171172
% (program_parent_dir, line[len(prefix) :]),
172173
)
173174
self.assertTrue(found, "verified lldb-dap working directory")
174-
self.continue_to_exit()
175175

176176
def test_sourcePath(self):
177177
"""

0 commit comments

Comments
 (0)