1010import subprocess
1111import signal
1212import sys
13+ import threading
1314import warnings
14- import selectors
1515import time
1616from typing import (
1717 Any ,
@@ -139,6 +139,35 @@ def dump_memory(base_addr, data, num_per_line, outfile):
139139 outfile .write ("\n " )
140140
141141
142+ def read_packet (
143+ f : IO [bytes ], trace_file : Optional [IO [str ]] = None
144+ ) -> Optional [ProtocolMessage ]:
145+ """Decode a JSON packet that starts with the content length and is
146+ followed by the JSON bytes from a file 'f'. Returns None on EOF.
147+ """
148+ line = f .readline ().decode ("utf-8" )
149+ if len (line ) == 0 :
150+ return None # EOF.
151+
152+ # Watch for line that starts with the prefix
153+ prefix = "Content-Length: "
154+ if line .startswith (prefix ):
155+ # Decode length of JSON bytes
156+ length = int (line [len (prefix ) :])
157+ # Skip empty line
158+ separator = f .readline ().decode ()
159+ if separator != "" :
160+ Exception ("malformed DAP content header, unexpected line: " + separator )
161+ # Read JSON bytes
162+ json_str = f .read (length ).decode ()
163+ if trace_file :
164+ trace_file .write ("from adapter:\n %s\n " % (json_str ))
165+ # Decode the JSON bytes into a python dictionary
166+ return json .loads (json_str )
167+
168+ raise Exception ("unexpected malformed message from lldb-dap: " + line )
169+
170+
142171def packet_type_is (packet , packet_type ):
143172 return "type" in packet and packet ["type" ] == packet_type
144173
@@ -170,8 +199,16 @@ def __init__(
170199 self .log_file = log_file
171200 self .send = send
172201 self .recv = recv
173- self .selector = selectors .DefaultSelector ()
174- self .selector .register (recv , selectors .EVENT_READ )
202+
203+ # Packets that have been received and processed but have not yet been
204+ # requested by a test case.
205+ self ._pending_packets : List [Optional [ProtocolMessage ]] = []
206+ # Received packets that have not yet been processed.
207+ self ._recv_packets : List [Optional [ProtocolMessage ]] = []
208+ # Used as a mutex for _recv_packets and for notify when _recv_packets
209+ # changes.
210+ self ._recv_condition = threading .Condition ()
211+ self ._recv_thread = threading .Thread (target = self ._read_packet_thread )
175212
176213 # session state
177214 self .init_commands = init_commands
@@ -197,6 +234,9 @@ def __init__(
197234 # keyed by breakpoint id
198235 self .resolved_breakpoints : dict [str , Breakpoint ] = {}
199236
237+ # trigger enqueue thread
238+ self ._recv_thread .start ()
239+
200240 @classmethod
201241 def encode_content (cls , s : str ) -> bytes :
202242 return ("Content-Length: %u\r \n \r \n %s" % (len (s ), s )).encode ("utf-8" )
@@ -212,46 +252,17 @@ def validate_response(cls, command, response):
212252 f"seq mismatch in response { command ['seq' ]} != { response ['request_seq' ]} "
213253 )
214254
215- def _read_packet (
216- self ,
217- timeout : float = DEFAULT_TIMEOUT ,
218- ) -> Optional [ProtocolMessage ]:
219- """Decode a JSON packet that starts with the content length and is
220- followed by the JSON bytes from self.recv. Returns None on EOF.
221- """
222-
223- ready = self .selector .select (timeout )
224- if not ready :
225- warnings .warn (
226- "timeout occurred waiting for a packet, check if the test has a"
227- " negative assertion and see if it can be inverted." ,
228- stacklevel = 4 ,
229- )
230- return None # timeout
231-
232- line = self .recv .readline ().decode ("utf-8" )
233- if len (line ) == 0 :
234- return None # EOF.
235-
236- # Watch for line that starts with the prefix
237- prefix = "Content-Length: "
238- if line .startswith (prefix ):
239- # Decode length of JSON bytes
240- length = int (line [len (prefix ) :])
241- # Skip empty line
242- separator = self .recv .readline ().decode ()
243- if separator != "" :
244- Exception ("malformed DAP content header, unexpected line: " + separator )
245- # Read JSON bytes
246- json_str = self .recv .read (length ).decode ()
247- if self .trace_file :
248- self .trace_file .write (
249- "%s from adapter:\n %s\n " % (time .time (), json_str )
250- )
251- # Decode the JSON bytes into a python dictionary
252- return json .loads (json_str )
253-
254- raise Exception ("unexpected malformed message from lldb-dap: " + line )
255+ def _read_packet_thread (self ):
256+ try :
257+ while True :
258+ packet = read_packet (self .recv , trace_file = self .trace_file )
259+ # `packet` will be `None` on EOF. We want to pass it down to
260+ # handle_recv_packet anyway so the main thread can handle unexpected
261+ # termination of lldb-dap and stop waiting for new packets.
262+ if not self ._handle_recv_packet (packet ):
263+ break
264+ finally :
265+ dump_dap_log (self .log_file )
255266
256267 def get_modules (
257268 self , start_module : Optional [int ] = None , module_count : Optional [int ] = None
@@ -299,6 +310,34 @@ def collect_output(
299310 output += self .get_output (category , clear = clear )
300311 return output
301312
313+ def _enqueue_recv_packet (self , packet : Optional [ProtocolMessage ]):
314+ with self .recv_condition :
315+ self .recv_packets .append (packet )
316+ self .recv_condition .notify ()
317+
318+ def _handle_recv_packet (self , packet : Optional [ProtocolMessage ]) -> bool :
319+ """Handles an incoming packet.
320+
321+ Called by the read thread that is waiting for all incoming packets
322+ to store the incoming packet in "self._recv_packets" in a thread safe
323+ way. This function will then signal the "self._recv_condition" to
324+ indicate a new packet is available.
325+
326+ Args:
327+ packet: A new packet to store.
328+
329+ Returns:
330+ True if the caller should keep calling this function for more
331+ packets.
332+ """
333+ with self ._recv_condition :
334+ self ._recv_packets .append (packet )
335+ self ._recv_condition .notify ()
336+ # packet is None on EOF
337+ return packet is not None and not (
338+ packet ["type" ] == "response" and packet ["command" ] == "disconnect"
339+ )
340+
302341 def _recv_packet (
303342 self ,
304343 * ,
@@ -322,34 +361,46 @@ def _recv_packet(
322361 The first matching packet for the given predicate, if specified,
323362 otherwise None.
324363 """
325- deadline = time .time () + timeout
326-
327- while time .time () < deadline :
328- packet = self ._read_packet (timeout = deadline - time .time ())
329- if packet is None :
330- return None
331- self ._process_recv_packet (packet )
332- if not predicate or predicate (packet ):
333- return packet
334-
335- def _process_recv_packet (self , packet ) -> None :
364+ assert (
365+ threading .current_thread != self ._recv_thread
366+ ), "Must not be called from the _recv_thread"
367+
368+ def process_until_match ():
369+ self ._process_recv_packets ()
370+ for i , packet in enumerate (self ._pending_packets ):
371+ if packet is None :
372+ # We need to return a truthy value to break out of the
373+ # wait_for, use `EOFError` as an indicator of EOF.
374+ return EOFError ()
375+ if predicate and predicate (packet ):
376+ self ._pending_packets .pop (i )
377+ return packet
378+
379+ with self ._recv_condition :
380+ packet = self ._recv_condition .wait_for (process_until_match , timeout )
381+ return None if isinstance (packet , EOFError ) else packet
382+
383+ def _process_recv_packets (self ) -> None :
336384 """Process received packets, updating the session state."""
337- if packet and ("seq" not in packet or packet ["seq" ] == 0 ):
338- warnings .warn (
339- f"received a malformed packet, expected 'seq != 0' for { packet !r} "
340- )
341- # Handle events that may modify any stateful properties of
342- # the DAP session.
343- if packet and packet ["type" ] == "event" :
344- self ._handle_event (packet )
345- elif packet and packet ["type" ] == "request" :
346- # Handle reverse requests and keep processing.
347- self ._handle_reverse_request (packet )
385+ with self ._recv_condition :
386+ for packet in self ._recv_packets :
387+ if packet and ("seq" not in packet or packet ["seq" ] == 0 ):
388+ warnings .warn (
389+ f"received a malformed packet, expected 'seq != 0' for { packet !r} "
390+ )
391+ # Handle events that may modify any stateful properties of
392+ # the DAP session.
393+ if packet and packet ["type" ] == "event" :
394+ self ._handle_event (packet )
395+ elif packet and packet ["type" ] == "request" :
396+ # Handle reverse requests and keep processing.
397+ self ._handle_reverse_request (packet )
398+ # Move the packet to the pending queue.
399+ self ._pending_packets .append (packet )
400+ self ._recv_packets .clear ()
348401
349402 def _handle_event (self , packet : Event ) -> None :
350403 """Handle any events that modify debug session state we track."""
351- self .events .append (packet )
352-
353404 event = packet ["event" ]
354405 body : Optional [Dict ] = packet .get ("body" , None )
355406
@@ -402,8 +453,6 @@ def _handle_event(self, packet: Event) -> None:
402453 self .invalidated_event = packet
403454 elif event == "memory" :
404455 self .memory_event = packet
405- elif event == "module" :
406- self .module_events .append (packet )
407456
408457 def _handle_reverse_request (self , request : Request ) -> None :
409458 if request in self .reverse_requests :
@@ -472,14 +521,18 @@ def send_packet(self, packet: ProtocolMessage) -> int:
472521
473522 Returns the seq number of the request.
474523 """
475- packet ["seq" ] = self .sequence
476- self .sequence += 1
524+ # Set the seq for requests.
525+ if packet ["type" ] == "request" :
526+ packet ["seq" ] = self .sequence
527+ self .sequence += 1
528+ else :
529+ packet ["seq" ] = 0
477530
478531 # Encode our command dictionary as a JSON string
479532 json_str = json .dumps (packet , separators = ("," , ":" ))
480533
481534 if self .trace_file :
482- self .trace_file .write ("%s to adapter:\n %s\n " % (time . time (), json_str ))
535+ self .trace_file .write ("to adapter:\n %s\n " % (json_str ))
483536
484537 length = len (json_str )
485538 if length > 0 :
@@ -860,8 +913,6 @@ def request_restart(self, restartArguments=None):
860913 if restartArguments :
861914 command_dict ["arguments" ] = restartArguments
862915
863- # Clear state, the process is about to restart...
864- self ._process_continued (True )
865916 response = self ._send_recv (command_dict )
866917 # Caller must still call wait_for_stopped.
867918 return response
@@ -1428,10 +1479,8 @@ def request_testGetTargetBreakpoints(self):
14281479
14291480 def terminate (self ):
14301481 self .send .close ()
1431- self .recv .close ()
1432- self .selector .close ()
1433- if self .log_file :
1434- dump_dap_log (self .log_file )
1482+ if self ._recv_thread .is_alive ():
1483+ self ._recv_thread .join ()
14351484
14361485 def request_setInstructionBreakpoints (self , memory_reference = []):
14371486 breakpoints = []
@@ -1528,7 +1577,6 @@ def launch(
15281577 stdout = subprocess .PIPE ,
15291578 stderr = sys .stderr ,
15301579 env = adapter_env ,
1531- bufsize = 0 ,
15321580 )
15331581
15341582 if connection is None :
0 commit comments