Skip to content

Commit c654fdf

Browse files
committed
More tests
1 parent 715af27 commit c654fdf

File tree

1 file changed

+191
-58
lines changed

1 file changed

+191
-58
lines changed

Lib/test/test_remote_pdb.py

Lines changed: 191 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -252,39 +252,48 @@ def setUp(self):
252252
self.server_sock.listen(1)
253253
self.port = self.server_sock.getsockname()[1]
254254

255+
def _create_script(self, script=None):
255256
# Create a file for subprocess script
257+
if script is None:
258+
script = textwrap.dedent(
259+
f"""
260+
import pdb
261+
import sys
262+
import time
263+
264+
def foo():
265+
x = 42
266+
return bar()
267+
268+
def bar():
269+
return 42
270+
271+
def connect_to_debugger():
272+
# Create a frame to debug
273+
def dummy_function():
274+
x = 42
275+
# Call connect to establish connection
276+
# with the test server
277+
frame = sys._getframe() # Get the current frame
278+
pdb._connect(
279+
host='127.0.0.1',
280+
port={self.port},
281+
frame=frame,
282+
commands="",
283+
version=pdb._PdbServer.protocol_version(),
284+
)
285+
return x # This line won't be reached in debugging
286+
287+
return dummy_function()
288+
289+
result = connect_to_debugger()
290+
foo()
291+
print(f"Function returned: {{result}}")
292+
""")
293+
256294
self.script_path = TESTFN + "_connect_test.py"
257295
with open(self.script_path, 'w') as f:
258-
f.write(
259-
textwrap.dedent(
260-
f"""
261-
import pdb
262-
import sys
263-
import time
264-
265-
def connect_to_debugger():
266-
# Create a frame to debug
267-
def dummy_function():
268-
x = 42
269-
# Call connect to establish connection
270-
# with the test server
271-
frame = sys._getframe() # Get the current frame
272-
pdb._connect(
273-
host='127.0.0.1',
274-
port={self.port},
275-
frame=frame,
276-
commands="",
277-
version=pdb._PdbServer.protocol_version(),
278-
)
279-
return x # This line won't be reached in debugging
280-
281-
return dummy_function()
282-
283-
result = connect_to_debugger()
284-
print(f"Function returned: {{result}}")
285-
"""
286-
)
287-
)
296+
f.write(script)
288297

289298
def tearDown(self):
290299
self.server_sock.close()
@@ -293,21 +302,62 @@ def tearDown(self):
293302
except OSError:
294303
pass
295304

296-
def test_connect_and_basic_commands(self):
297-
"""Test connecting to a remote debugger and sending basic commands."""
305+
def _connect_and_get_client_file(self):
306+
"""Helper to start subprocess and get connected client file."""
298307
# Start the subprocess that will connect to our socket
299-
with subprocess.Popen(
308+
process = subprocess.Popen(
300309
[sys.executable, self.script_path],
301310
stdout=subprocess.PIPE,
302311
stderr=subprocess.PIPE,
303312
text=True
304-
) as process:
305-
# Accept the connection from the subprocess
306-
client_sock, _ = self.server_sock.accept()
307-
client_file = client_sock.makefile('rwb')
308-
self.addCleanup(client_file.close)
309-
self.addCleanup(client_sock.close)
313+
)
314+
315+
# Accept the connection from the subprocess
316+
client_sock, _ = self.server_sock.accept()
317+
client_file = client_sock.makefile('rwb')
318+
self.addCleanup(client_file.close)
319+
self.addCleanup(client_sock.close)
320+
321+
return process, client_file
322+
323+
def _read_until_prompt(self, client_file):
324+
"""Helper to read messages until a prompt is received."""
325+
messages = []
326+
while True:
327+
data = client_file.readline()
328+
if not data:
329+
break
330+
msg = json.loads(data.decode())
331+
messages.append(msg)
332+
if 'prompt' in msg:
333+
break
334+
return messages
335+
336+
def _send_command(self, client_file, command):
337+
"""Helper to send a command to the debugger."""
338+
client_file.write(json.dumps({"reply": command}).encode() + b"\n")
339+
client_file.flush()
340+
341+
def _send_interrupt(self, pid):
342+
"""Helper to send an interrupt signal to the debugger."""
343+
# with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
344+
interrupt_script = TESTFN + "_interrupt_script.py"
345+
with open(interrupt_script, 'w') as f:
346+
f.write(
347+
'import pdb, sys\n'
348+
'print("Hello, world!")\n'
349+
'if inst := pdb.Pdb._last_pdb_instance:\n'
350+
' inst.set_trace(sys._getframe(1))\n'
351+
)
352+
sys.remote_exec(pid, interrupt_script)
353+
self.addCleanup(unlink, interrupt_script)
310354

355+
def test_connect_and_basic_commands(self):
356+
"""Test connecting to a remote debugger and sending basic commands."""
357+
self._create_script()
358+
process, client_file = self._connect_and_get_client_file()
359+
360+
with process:
311361
# We should receive initial data from the debugger
312362
data = client_file.readline()
313363
initial_data = json.loads(data.decode())
@@ -326,25 +376,15 @@ def test_connect_and_basic_commands(self):
326376
self.assertEqual(prompt_data['state'], 'pdb')
327377

328378
# Send 'bt' (backtrace) command
329-
client_file.write(json.dumps({"reply": "bt"}).encode() + b"\n")
330-
client_file.flush()
379+
self._send_command(client_file, "bt")
331380

332381
# Check for response - we should get some stack frames
333-
# We may get multiple messages so we need to read until we get a new prompt
334-
got_stack_info = False
335-
text_msg = []
336-
while True:
337-
data = client_file.readline()
338-
if not data:
339-
break
340-
341-
msg = json.loads(data.decode())
342-
if 'message' in msg and 'connect_to_debugger' in msg['message']:
343-
got_stack_info = True
344-
text_msg.append(msg['message'])
345-
346-
if 'prompt' in msg:
347-
break
382+
messages = self._read_until_prompt(client_file)
383+
384+
# Extract text messages containing stack info
385+
text_msg = [msg['message'] for msg in messages
386+
if 'message' in msg and 'connect_to_debugger' in msg['message']]
387+
got_stack_info = bool(text_msg)
348388

349389
expected_stacks = [
350390
"<module>",
@@ -357,8 +397,7 @@ def test_connect_and_basic_commands(self):
357397
self.assertTrue(got_stack_info, "Should have received stack trace information")
358398

359399
# Send 'c' (continue) command to let the program finish
360-
client_file.write(json.dumps({"reply": "c"}).encode() + b"\n")
361-
client_file.flush()
400+
self._send_command(client_file, "c")
362401

363402
# Wait for process to finish
364403
stdout, _ = process.communicate(timeout=5)
@@ -367,6 +406,100 @@ def test_connect_and_basic_commands(self):
367406
self.assertIn("Function returned: 42", stdout)
368407
self.assertEqual(process.returncode, 0)
369408

409+
def test_breakpoints(self):
410+
"""Test setting and hitting breakpoints."""
411+
self._create_script()
412+
process, client_file = self._connect_and_get_client_file()
413+
with process:
414+
# Skip initial messages until we get to the prompt
415+
self._read_until_prompt(client_file)
416+
417+
# Set a breakpoint at the return statement
418+
self._send_command(client_file, "break bar")
419+
messages = self._read_until_prompt(client_file)
420+
bp_msg = next(msg['message'] for msg in messages if 'message' in msg)
421+
self.assertIn("Breakpoint", bp_msg)
422+
423+
# Continue execution until breakpoint
424+
self._send_command(client_file, "c")
425+
messages = self._read_until_prompt(client_file)
426+
427+
# Verify we hit the breakpoint
428+
hit_msg = next(msg['message'] for msg in messages if 'message' in msg)
429+
self.assertIn("bar()", hit_msg)
430+
431+
# Check breakpoint list
432+
self._send_command(client_file, "b")
433+
messages = self._read_until_prompt(client_file)
434+
list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
435+
self.assertIn("1 breakpoint", list_msg)
436+
self.assertIn("breakpoint already hit 1 time", list_msg)
437+
438+
# Clear breakpoint
439+
self._send_command(client_file, "clear 1")
440+
messages = self._read_until_prompt(client_file)
441+
clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
442+
self.assertIn("Deleted breakpoint", clear_msg)
443+
444+
# Continue to end
445+
self._send_command(client_file, "c")
446+
stdout, _ = process.communicate(timeout=5)
447+
448+
self.assertIn("Function returned: 42", stdout)
449+
self.assertEqual(process.returncode, 0)
450+
451+
def test_keyboard_interrupt(self):
452+
"""Test that sending keyboard interrupt breaks into pdb."""
453+
script = f"""
454+
import time
455+
import sys
456+
import pdb
457+
def bar():
458+
frame = sys._getframe() # Get the current frame
459+
pdb._connect(
460+
host='127.0.0.1',
461+
port={self.port},
462+
frame=frame,
463+
commands="",
464+
version=pdb._PdbServer.protocol_version(),
465+
)
466+
print("Connected to debugger")
467+
iterations = 10
468+
while iterations > 0:
469+
print("Iteration", iterations)
470+
time.sleep(1)
471+
iterations -= 1
472+
return 42
473+
474+
if __name__ == "__main__":
475+
print("Function returned:", bar())
476+
"""
477+
self._create_script(script=script)
478+
process, client_file = self._connect_and_get_client_file()
479+
480+
with process:
481+
482+
# Skip initial messages until we get to the prompt
483+
self._read_until_prompt(client_file)
484+
485+
# Continue execution
486+
self._send_command(client_file, "c")
487+
488+
# Send keyboard interrupt signal
489+
self._send_command(client_file, json.dumps({"signal": "INT"}))
490+
self._send_interrupt(process.pid)
491+
messages = self._read_until_prompt(client_file)
492+
493+
# Verify we got the keyboard interrupt message
494+
interrupt_msg = next(msg['message'] for msg in messages if 'message' in msg)
495+
self.assertIn("bar()", interrupt_msg)
496+
497+
# Continue to end
498+
self._send_command(client_file, "iterations = 0")
499+
self._send_command(client_file, "c")
500+
stdout, _ = process.communicate(timeout=5)
501+
self.assertIn("Function returned: 42", stdout)
502+
self.assertEqual(process.returncode, 0)
370503

371504
if __name__ == "__main__":
372505
unittest.main()

0 commit comments

Comments
 (0)