1010import subprocess
1111import signal
1212import sys
13- import threading
1413import warnings
14+ import selectors
1515import time
1616from typing import (
1717 Any ,
@@ -139,35 +139,6 @@ def dump_memory(base_addr, data, num_per_line, outfile):
139139 outfile .write ("\n " )
140140
141141
142- def read_packet (
143- f : IO [bytes ], trace_file : Optional [IO [str ]] = None
144- ) -> Optional [ProtocolMessage ]:
145- """Decode a JSON packet that starts with the content length and is
146- followed by the JSON bytes from a file 'f'. Returns None on EOF.
147- """
148- line = f .readline ().decode ("utf-8" )
149- if len (line ) == 0 :
150- return None # EOF.
151-
152- # Watch for line that starts with the prefix
153- prefix = "Content-Length: "
154- if line .startswith (prefix ):
155- # Decode length of JSON bytes
156- length = int (line [len (prefix ) :])
157- # Skip empty line
158- separator = f .readline ().decode ()
159- if separator != "" :
160- Exception ("malformed DAP content header, unexpected line: " + separator )
161- # Read JSON bytes
162- json_str = f .read (length ).decode ()
163- if trace_file :
164- trace_file .write ("from adapter:\n %s\n " % (json_str ))
165- # Decode the JSON bytes into a python dictionary
166- return json .loads (json_str )
167-
168- raise Exception ("unexpected malformed message from lldb-dap: " + line )
169-
170-
171142def packet_type_is (packet , packet_type ):
172143 return "type" in packet and packet ["type" ] == packet_type
173144
@@ -199,16 +170,8 @@ def __init__(
199170 self .log_file = log_file
200171 self .send = send
201172 self .recv = recv
202-
203- # Packets that have been received and processed but have not yet been
204- # requested by a test case.
205- self ._pending_packets : List [Optional [ProtocolMessage ]] = []
206- # Received packets that have not yet been processed.
207- self ._recv_packets : List [Optional [ProtocolMessage ]] = []
208- # Used as a mutex for _recv_packets and for notify when _recv_packets
209- # changes.
210- self ._recv_condition = threading .Condition ()
211- self ._recv_thread = threading .Thread (target = self ._read_packet_thread )
173+ self .selector = selectors .DefaultSelector ()
174+ self .selector .register (recv , selectors .EVENT_READ )
212175
213176 # session state
214177 self .init_commands = init_commands
@@ -234,9 +197,6 @@ def __init__(
234197 # keyed by breakpoint id
235198 self .resolved_breakpoints : dict [str , Breakpoint ] = {}
236199
237- # trigger enqueue thread
238- self ._recv_thread .start ()
239-
240200 @classmethod
241201 def encode_content (cls , s : str ) -> bytes :
242202 return ("Content-Length: %u\r \n \r \n %s" % (len (s ), s )).encode ("utf-8" )
@@ -252,17 +212,46 @@ def validate_response(cls, command, response):
252212 f"seq mismatch in response { command ['seq' ]} != { response ['request_seq' ]} "
253213 )
254214
255- def _read_packet_thread (self ):
256- try :
257- while True :
258- packet = read_packet (self .recv , trace_file = self .trace_file )
259- # `packet` will be `None` on EOF. We want to pass it down to
260- # handle_recv_packet anyway so the main thread can handle unexpected
261- # termination of lldb-dap and stop waiting for new packets.
262- if not self ._handle_recv_packet (packet ):
263- break
264- finally :
265- dump_dap_log (self .log_file )
215+ def _read_packet (
216+ self ,
217+ timeout : float = DEFAULT_TIMEOUT ,
218+ ) -> Optional [ProtocolMessage ]:
219+ """Decode a JSON packet that starts with the content length and is
220+ followed by the JSON bytes from self.recv. Returns None on EOF.
221+ """
222+
223+ ready = self .selector .select (timeout )
224+ if not ready :
225+ warnings .warn (
226+ "timeout occurred waiting for a packet, check if the test has a"
227+ " negative assertion and see if it can be inverted." ,
228+ stacklevel = 4 ,
229+ )
230+ return None # timeout
231+
232+ line = self .recv .readline ().decode ("utf-8" )
233+ if len (line ) == 0 :
234+ return None # EOF.
235+
236+ # Watch for line that starts with the prefix
237+ prefix = "Content-Length: "
238+ if line .startswith (prefix ):
239+ # Decode length of JSON bytes
240+ length = int (line [len (prefix ) :])
241+ # Skip empty line
242+ separator = self .recv .readline ().decode ()
243+ if separator != "" :
244+ Exception ("malformed DAP content header, unexpected line: " + separator )
245+ # Read JSON bytes
246+ json_str = self .recv .read (length ).decode ()
247+ if self .trace_file :
248+ self .trace_file .write (
249+ "%s from adapter:\n %s\n " % (time .time (), json_str )
250+ )
251+ # Decode the JSON bytes into a python dictionary
252+ return json .loads (json_str )
253+
254+ raise Exception ("unexpected malformed message from lldb-dap: " + line )
266255
267256 def get_modules (
268257 self , start_module : Optional [int ] = None , module_count : Optional [int ] = None
@@ -310,34 +299,6 @@ def collect_output(
310299 output += self .get_output (category , clear = clear )
311300 return output
312301
313- def _enqueue_recv_packet (self , packet : Optional [ProtocolMessage ]):
314- with self .recv_condition :
315- self .recv_packets .append (packet )
316- self .recv_condition .notify ()
317-
318- def _handle_recv_packet (self , packet : Optional [ProtocolMessage ]) -> bool :
319- """Handles an incoming packet.
320-
321- Called by the read thread that is waiting for all incoming packets
322- to store the incoming packet in "self._recv_packets" in a thread safe
323- way. This function will then signal the "self._recv_condition" to
324- indicate a new packet is available.
325-
326- Args:
327- packet: A new packet to store.
328-
329- Returns:
330- True if the caller should keep calling this function for more
331- packets.
332- """
333- with self ._recv_condition :
334- self ._recv_packets .append (packet )
335- self ._recv_condition .notify ()
336- # packet is None on EOF
337- return packet is not None and not (
338- packet ["type" ] == "response" and packet ["command" ] == "disconnect"
339- )
340-
341302 def _recv_packet (
342303 self ,
343304 * ,
@@ -361,46 +322,34 @@ def _recv_packet(
361322 The first matching packet for the given predicate, if specified,
362323 otherwise None.
363324 """
364- assert (
365- threading .current_thread != self ._recv_thread
366- ), "Must not be called from the _recv_thread"
367-
368- def process_until_match ():
369- self ._process_recv_packets ()
370- for i , packet in enumerate (self ._pending_packets ):
371- if packet is None :
372- # We need to return a truthy value to break out of the
373- # wait_for, use `EOFError` as an indicator of EOF.
374- return EOFError ()
375- if predicate and predicate (packet ):
376- self ._pending_packets .pop (i )
377- return packet
378-
379- with self ._recv_condition :
380- packet = self ._recv_condition .wait_for (process_until_match , timeout )
381- return None if isinstance (packet , EOFError ) else packet
382-
383- def _process_recv_packets (self ) -> None :
325+ deadline = time .time () + timeout
326+
327+ while time .time () < deadline :
328+ packet = self ._read_packet (timeout = deadline - time .time ())
329+ if packet is None :
330+ return None
331+ self ._process_recv_packet (packet )
332+ if not predicate or predicate (packet ):
333+ return packet
334+
335+ def _process_recv_packet (self , packet ) -> None :
384336 """Process received packets, updating the session state."""
385- with self ._recv_condition :
386- for packet in self ._recv_packets :
387- if packet and ("seq" not in packet or packet ["seq" ] == 0 ):
388- warnings .warn (
389- f"received a malformed packet, expected 'seq != 0' for { packet !r} "
390- )
391- # Handle events that may modify any stateful properties of
392- # the DAP session.
393- if packet and packet ["type" ] == "event" :
394- self ._handle_event (packet )
395- elif packet and packet ["type" ] == "request" :
396- # Handle reverse requests and keep processing.
397- self ._handle_reverse_request (packet )
398- # Move the packet to the pending queue.
399- self ._pending_packets .append (packet )
400- self ._recv_packets .clear ()
337+ if packet and ("seq" not in packet or packet ["seq" ] == 0 ):
338+ warnings .warn (
339+ f"received a malformed packet, expected 'seq != 0' for { packet !r} "
340+ )
341+ # Handle events that may modify any stateful properties of
342+ # the DAP session.
343+ if packet and packet ["type" ] == "event" :
344+ self ._handle_event (packet )
345+ elif packet and packet ["type" ] == "request" :
346+ # Handle reverse requests and keep processing.
347+ self ._handle_reverse_request (packet )
401348
402349 def _handle_event (self , packet : Event ) -> None :
403350 """Handle any events that modify debug session state we track."""
351+ self .events .append (packet )
352+
404353 event = packet ["event" ]
405354 body : Optional [Dict ] = packet .get ("body" , None )
406355
@@ -453,6 +402,8 @@ def _handle_event(self, packet: Event) -> None:
453402 self .invalidated_event = packet
454403 elif event == "memory" :
455404 self .memory_event = packet
405+ elif event == "module" :
406+ self .module_events .append (packet )
456407
457408 def _handle_reverse_request (self , request : Request ) -> None :
458409 if request in self .reverse_requests :
@@ -521,18 +472,14 @@ def send_packet(self, packet: ProtocolMessage) -> int:
521472
522473 Returns the seq number of the request.
523474 """
524- # Set the seq for requests.
525- if packet ["type" ] == "request" :
526- packet ["seq" ] = self .sequence
527- self .sequence += 1
528- else :
529- packet ["seq" ] = 0
475+ packet ["seq" ] = self .sequence
476+ self .sequence += 1
530477
531478 # Encode our command dictionary as a JSON string
532479 json_str = json .dumps (packet , separators = ("," , ":" ))
533480
534481 if self .trace_file :
535- self .trace_file .write ("to adapter:\n %s\n " % (json_str ))
482+ self .trace_file .write ("%s to adapter:\n %s\n " % (time . time (), json_str ))
536483
537484 length = len (json_str )
538485 if length > 0 :
@@ -913,6 +860,8 @@ def request_restart(self, restartArguments=None):
913860 if restartArguments :
914861 command_dict ["arguments" ] = restartArguments
915862
863+ # Clear state, the process is about to restart...
864+ self ._process_continued (True )
916865 response = self ._send_recv (command_dict )
917866 # Caller must still call wait_for_stopped.
918867 return response
@@ -1479,8 +1428,10 @@ def request_testGetTargetBreakpoints(self):
14791428
14801429 def terminate (self ):
14811430 self .send .close ()
1482- if self ._recv_thread .is_alive ():
1483- self ._recv_thread .join ()
1431+ self .recv .close ()
1432+ self .selector .close ()
1433+ if self .log_file :
1434+ dump_dap_log (self .log_file )
14841435
14851436 def request_setInstructionBreakpoints (self , memory_reference = []):
14861437 breakpoints = []
@@ -1577,6 +1528,7 @@ def launch(
15771528 stdout = subprocess .PIPE ,
15781529 stderr = sys .stderr ,
15791530 env = adapter_env ,
1531+ bufsize = 0 ,
15801532 )
15811533
15821534 if connection is None :
0 commit comments