|
1 | 1 | # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD |
2 | 2 | # SPDX-License-Identifier: Unlicense OR CC0-1.0 |
| 3 | +import json |
| 4 | +import logging |
| 5 | +import os.path |
3 | 6 | import re |
| 7 | +import signal |
4 | 8 | import time |
| 9 | +from telnetlib import Telnet |
| 10 | +from typing import Any |
| 11 | +from typing import Optional |
5 | 12 |
|
6 | | -import pexpect.fdpexpect |
| 13 | +import pexpect |
7 | 14 | import pytest |
| 15 | +from pytest_embedded.utils import to_bytes |
| 16 | +from pytest_embedded.utils import to_str |
8 | 17 | from pytest_embedded_idf import IdfDut |
9 | 18 | from pytest_embedded_idf.utils import idf_parametrize |
10 | 19 |
|
| 20 | +MAX_RETRIES = 3 |
| 21 | +RETRY_DELAY = 1 |
| 22 | +TELNET_PORT = 4444 |
| 23 | + |
| 24 | + |
| 25 | +class OpenOCD: |
| 26 | + def __init__(self, dut: 'IdfDut'): |
| 27 | + self.dut = dut |
| 28 | + self.telnet: Optional[Telnet] = None |
| 29 | + self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') |
| 30 | + self.proc: Optional[pexpect.spawn] = None |
| 31 | + |
| 32 | + def run(self) -> Optional['OpenOCD']: |
| 33 | + desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') |
| 34 | + |
| 35 | + try: |
| 36 | + with open(desc_path, 'r') as f: |
| 37 | + project_desc = json.load(f) |
| 38 | + except FileNotFoundError: |
| 39 | + logging.error('Project description file not found at %s', desc_path) |
| 40 | + return None |
| 41 | + |
| 42 | + openocd_scripts = os.getenv('OPENOCD_SCRIPTS') |
| 43 | + if not openocd_scripts: |
| 44 | + logging.error('OPENOCD_SCRIPTS environment variable is not set.') |
| 45 | + return None |
| 46 | + |
| 47 | + debug_args = project_desc.get('debug_arguments_openocd') |
| 48 | + if not debug_args: |
| 49 | + logging.error("'debug_arguments_openocd' key is missing in project_description.json") |
| 50 | + return None |
| 51 | + |
| 52 | + # For debug purposes, make the value '4' |
| 53 | + ocd_env = os.environ.copy() |
| 54 | + ocd_env['LIBUSB_DEBUG'] = '1' |
| 55 | + |
| 56 | + for _ in range(1, MAX_RETRIES + 1): |
| 57 | + try: |
| 58 | + self.proc = pexpect.spawn( |
| 59 | + command='openocd', |
| 60 | + args=['-s', openocd_scripts] + debug_args.split(), |
| 61 | + timeout=5, |
| 62 | + encoding='utf-8', |
| 63 | + codec_errors='ignore', |
| 64 | + env=ocd_env, |
| 65 | + ) |
| 66 | + if self.proc and self.proc.isalive(): |
| 67 | + self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) |
| 68 | + return self |
| 69 | + except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: |
| 70 | + logging.error('Error running OpenOCD: %s', str(e)) |
| 71 | + if self.proc and self.proc.isalive(): |
| 72 | + self.proc.terminate() |
| 73 | + time.sleep(RETRY_DELAY) |
| 74 | + |
| 75 | + logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) |
| 76 | + return None |
| 77 | + |
| 78 | + def connect_telnet(self) -> None: |
| 79 | + for attempt in range(1, MAX_RETRIES + 1): |
| 80 | + try: |
| 81 | + self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) |
| 82 | + break |
| 83 | + except ConnectionRefusedError as e: |
| 84 | + logging.error('Error telnet connection: %s in attempt:%d', e, attempt) |
| 85 | + time.sleep(1) |
| 86 | + else: |
| 87 | + raise ConnectionRefusedError |
| 88 | + |
| 89 | + def write(self, s: str) -> Any: |
| 90 | + if self.telnet is None: |
| 91 | + logging.error('Telnet connection is not established.') |
| 92 | + return '' |
| 93 | + resp = self.telnet.read_very_eager() |
| 94 | + self.telnet.write(to_bytes(s, '\n')) |
| 95 | + resp += self.telnet.read_until(b'>') |
| 96 | + return to_str(resp) |
| 97 | + |
| 98 | + def apptrace_wait_stop(self, timeout: int = 30) -> None: |
| 99 | + stopped = False |
| 100 | + end_before = time.time() + timeout |
| 101 | + while not stopped: |
| 102 | + cmd_out = self.write('esp apptrace status') |
| 103 | + for line in cmd_out.splitlines(): |
| 104 | + if line.startswith('Tracing is STOPPED.'): |
| 105 | + stopped = True |
| 106 | + break |
| 107 | + if not stopped and time.time() > end_before: |
| 108 | + raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') |
| 109 | + time.sleep(1) |
| 110 | + |
| 111 | + def kill(self) -> None: |
| 112 | + # Check if the process is still running |
| 113 | + if self.proc and self.proc.isalive(): |
| 114 | + self.proc.terminate() |
| 115 | + self.proc.kill(signal.SIGKILL) |
| 116 | + |
| 117 | + |
| 118 | +def _test_examples_sysview_tracing(dut: IdfDut) -> None: |
| 119 | + # Construct trace log paths |
| 120 | + trace_log = [ |
| 121 | + os.path.join(dut.logdir, 'sys_log0.svdat') # pylint: disable=protected-access |
| 122 | + ] |
| 123 | + if not dut.app.sdkconfig.get('ESP_SYSTEM_SINGLE_CORE_MODE') or dut.target == 'esp32s3': |
| 124 | + trace_log.append(os.path.join(dut.logdir, 'sys_log1.svdat')) # pylint: disable=protected-access |
| 125 | + trace_files = ' '.join([f'file://{log}' for log in trace_log]) |
| 126 | + |
| 127 | + # Prepare gdbinit file |
| 128 | + gdb_logfile = os.path.join(dut.logdir, 'gdb.txt') |
| 129 | + gdbinit_orig = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'gdbinit') |
| 130 | + gdbinit = os.path.join(dut.logdir, 'gdbinit') |
| 131 | + with open(gdbinit_orig, 'r') as f_r, open(gdbinit, 'w') as f_w: |
| 132 | + for line in f_r: |
| 133 | + if line.startswith('mon esp sysview start'): |
| 134 | + f_w.write(f'mon esp sysview start {trace_files}\n') |
| 135 | + else: |
| 136 | + f_w.write(line) |
| 137 | + |
| 138 | + def dut_expect_task_event() -> None: |
| 139 | + dut.expect(re.compile(rb'example: Task\[0x[0-9A-Fa-f]+\]: received event \d+'), timeout=30) |
| 140 | + |
| 141 | + dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) |
| 142 | + openocd = OpenOCD(dut).run() |
| 143 | + assert openocd |
| 144 | + try: |
| 145 | + openocd.connect_telnet() |
| 146 | + openocd.write('log_output {}'.format(openocd.log_file)) |
| 147 | + |
| 148 | + with open(gdb_logfile, 'w') as gdb_log, pexpect.spawn( |
| 149 | + f'idf.py -B {dut.app.binary_path} gdb --batch -x {gdbinit}', |
| 150 | + timeout=60, |
| 151 | + logfile=gdb_log, |
| 152 | + encoding='utf-8', |
| 153 | + codec_errors='ignore', |
| 154 | + ) as p: |
| 155 | + p.expect_exact('hit Breakpoint 1, app_main ()') |
| 156 | + dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect() |
| 157 | + dut_expect_task_event() |
| 158 | + |
| 159 | + # Do a sleep while sysview samples are captured. |
| 160 | + time.sleep(3) |
| 161 | + openocd.write('esp sysview stop') |
| 162 | + finally: |
| 163 | + openocd.kill() |
| 164 | + |
11 | 165 |
|
12 | 166 | @pytest.mark.jtag |
13 | | -@pytest.mark.parametrize( |
14 | | - 'embedded_services', |
15 | | - [ |
16 | | - 'esp,idf,jtag', |
17 | | - ], |
18 | | - indirect=True, |
19 | | -) |
20 | | -@idf_parametrize('target', ['esp32'], indirect=['target']) |
| 167 | +@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) |
21 | 168 | def test_examples_sysview_tracing(dut: IdfDut) -> None: |
22 | | - def dut_expect_task_event() -> None: |
23 | | - dut.expect(re.compile(rb'example: Task\[0x3[0-9A-Fa-f]+\]: received event \d+'), timeout=30) |
24 | | - |
25 | | - dut.gdb.write('mon reset halt') |
26 | | - dut.gdb.write('maintenance flush register-cache') |
27 | | - dut.gdb.write('b app_main') |
28 | | - |
29 | | - dut.gdb.write('commands', non_blocking=True) |
30 | | - dut.gdb.write( |
31 | | - 'mon esp sysview start file:///tmp/sysview_example0.svdat file:///tmp/sysview_example1.svdat', non_blocking=True |
32 | | - ) |
33 | | - dut.gdb.write('c', non_blocking=True) |
34 | | - dut.gdb.write('end') |
35 | | - |
36 | | - dut.gdb.write('c', non_blocking=True) |
37 | | - time.sleep(1) # to avoid EOF file error |
38 | | - with open(dut.gdb._logfile, encoding='utf-8') as fr: # pylint: disable=protected-access |
39 | | - gdb_pexpect_proc = pexpect.fdpexpect.fdspawn(fr.fileno()) |
40 | | - gdb_pexpect_proc.expect('Thread 2 "main" hit Breakpoint 1, app_main ()') |
41 | | - |
42 | | - dut.expect('example: Created task') # dut has been restarted by gdb since the last dut.expect() |
43 | | - dut_expect_task_event() |
44 | | - |
45 | | - # Do a sleep while sysview samples are captured. |
46 | | - time.sleep(3) |
47 | | - # GDB isn't responding now to any commands, therefore, the following command is issued to openocd |
48 | | - dut.openocd.write('esp sysview stop') |
| 169 | + _test_examples_sysview_tracing(dut) |
| 170 | + |
| 171 | + |
| 172 | +@pytest.mark.usb_serial_jtag |
| 173 | +@idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) |
| 174 | +def test_examples_sysview_tracing_usj(dut: IdfDut) -> None: |
| 175 | + _test_examples_sysview_tracing(dut) |
0 commit comments