|
1 | 1 | # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD |
2 | 2 | # SPDX-License-Identifier: Unlicense OR CC0-1.0 |
3 | | -import json |
4 | | -import logging |
5 | | -import os |
6 | | -import signal |
7 | 3 | import time |
8 | | -from telnetlib import Telnet |
9 | | -from typing import Any |
10 | | -from typing import Optional |
| 4 | +import typing |
11 | 5 |
|
12 | | -import pexpect |
13 | 6 | import pytest |
14 | | -from pytest_embedded.utils import to_bytes |
15 | | -from pytest_embedded.utils import to_str |
16 | 7 | from pytest_embedded_idf import IdfDut |
17 | 8 | from pytest_embedded_idf.utils import idf_parametrize |
18 | 9 |
|
19 | | -MAX_RETRIES = 3 |
20 | | -RETRY_DELAY = 1 |
21 | | -TELNET_PORT = 4444 |
| 10 | +if typing.TYPE_CHECKING: |
| 11 | + from conftest import OpenOCD |
22 | 12 |
|
23 | 13 |
|
24 | | -class OpenOCD: |
25 | | - def __init__(self, dut: 'IdfDut'): |
26 | | - self.dut = dut |
27 | | - self.telnet: Optional[Telnet] = None |
28 | | - self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') |
29 | | - self.proc: Optional[pexpect.spawn] = None |
30 | | - |
31 | | - def run(self) -> Optional['OpenOCD']: |
32 | | - desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') |
33 | | - |
34 | | - try: |
35 | | - with open(desc_path, 'r') as f: |
36 | | - project_desc = json.load(f) |
37 | | - except FileNotFoundError: |
38 | | - logging.error('Project description file not found at %s', desc_path) |
39 | | - return None |
40 | | - |
41 | | - openocd_scripts = os.getenv('OPENOCD_SCRIPTS') |
42 | | - if not openocd_scripts: |
43 | | - logging.error('OPENOCD_SCRIPTS environment variable is not set.') |
44 | | - return None |
45 | | - |
46 | | - debug_args = project_desc.get('debug_arguments_openocd') |
47 | | - if not debug_args: |
48 | | - logging.error("'debug_arguments_openocd' key is missing in project_description.json") |
49 | | - return None |
50 | | - |
51 | | - # For debug purposes, make the value '4' |
52 | | - ocd_env = os.environ.copy() |
53 | | - ocd_env['LIBUSB_DEBUG'] = '1' |
54 | | - |
55 | | - for _ in range(1, MAX_RETRIES + 1): |
56 | | - try: |
57 | | - self.proc = pexpect.spawn( |
58 | | - command='openocd', |
59 | | - args=['-s', openocd_scripts] + debug_args.split(), |
60 | | - timeout=5, |
61 | | - encoding='utf-8', |
62 | | - codec_errors='ignore', |
63 | | - env=ocd_env, |
64 | | - ) |
65 | | - if self.proc and self.proc.isalive(): |
66 | | - self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) |
67 | | - return self |
68 | | - except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: |
69 | | - logging.error('Error running OpenOCD: %s', str(e)) |
70 | | - if self.proc and self.proc.isalive(): |
71 | | - self.proc.terminate() |
72 | | - time.sleep(RETRY_DELAY) |
73 | | - |
74 | | - logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) |
75 | | - return None |
76 | | - |
77 | | - def connect_telnet(self) -> None: |
78 | | - for attempt in range(1, MAX_RETRIES + 1): |
79 | | - try: |
80 | | - self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) |
81 | | - break |
82 | | - except ConnectionRefusedError as e: |
83 | | - logging.error('Error telnet connection: %s in attempt:%d', e, attempt) |
84 | | - time.sleep(1) |
85 | | - else: |
86 | | - raise ConnectionRefusedError |
87 | | - |
88 | | - def write(self, s: str) -> Any: |
89 | | - if self.telnet is None: |
90 | | - logging.error('Telnet connection is not established.') |
91 | | - return '' |
92 | | - resp = self.telnet.read_very_eager() |
93 | | - self.telnet.write(to_bytes(s, '\n')) |
94 | | - resp += self.telnet.read_until(b'>') |
95 | | - return to_str(resp) |
96 | | - |
97 | | - def apptrace_wait_stop(self, timeout: int = 30) -> None: |
98 | | - stopped = False |
99 | | - end_before = time.time() + timeout |
100 | | - while not stopped: |
101 | | - cmd_out = self.write('esp apptrace status') |
102 | | - for line in cmd_out.splitlines(): |
103 | | - if line.startswith('Tracing is STOPPED.'): |
104 | | - stopped = True |
105 | | - break |
106 | | - if not stopped and time.time() > end_before: |
107 | | - raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') |
108 | | - time.sleep(1) |
109 | | - |
110 | | - def kill(self) -> None: |
111 | | - # Check if the process is still running |
112 | | - if self.proc and self.proc.isalive(): |
113 | | - self.proc.terminate() |
114 | | - self.proc.kill(signal.SIGKILL) |
115 | | - |
116 | | - |
117 | | -def _test_examples_app_trace_basic(dut: IdfDut) -> None: |
| 14 | +def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: |
118 | 15 | dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5) |
119 | | - openocd = OpenOCD(dut).run() |
120 | | - assert openocd |
121 | | - try: |
122 | | - openocd.connect_telnet() |
123 | | - openocd.write('log_output {}'.format(openocd.log_file)) |
| 16 | + with openocd.run() as openocd: |
124 | 17 | openocd.write('reset run') |
125 | 18 | dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5) |
126 | 19 | time.sleep(1) # wait for APPTRACE_INIT semihosting call |
@@ -151,17 +44,15 @@ def _test_examples_app_trace_basic(dut: IdfDut) -> None: |
151 | 44 | break |
152 | 45 | if found is not True: |
153 | 46 | raise RuntimeError('"{}" could not be found in {}'.format(log_str, 'apptrace.log')) |
154 | | - finally: |
155 | | - openocd.kill() |
156 | 47 |
|
157 | 48 |
|
158 | 49 | @pytest.mark.jtag |
159 | 50 | @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) |
160 | | -def test_examples_app_trace_basic(dut: IdfDut) -> None: |
161 | | - _test_examples_app_trace_basic(dut) |
| 51 | +def test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: |
| 52 | + _test_examples_app_trace_basic(openocd, dut) |
162 | 53 |
|
163 | 54 |
|
164 | 55 | @pytest.mark.usb_serial_jtag |
165 | 56 | @idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) |
166 | | -def test_examples_app_trace_basic_usj(dut: IdfDut) -> None: |
167 | | - _test_examples_app_trace_basic(dut) |
| 57 | +def test_examples_app_trace_basic_usj(openocd: 'OpenOCD', dut: IdfDut) -> None: |
| 58 | + _test_examples_app_trace_basic(openocd, dut) |
0 commit comments