|
1 | 1 | # SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD |
2 | 2 | # SPDX-License-Identifier: Unlicense OR CC0-1.0 |
| 3 | +import json |
| 4 | +import logging |
| 5 | +import os |
| 6 | +import signal |
3 | 7 | import time |
4 | | -import typing |
| 8 | +from telnetlib import Telnet |
| 9 | +from typing import Any |
| 10 | +from typing import Optional |
5 | 11 |
|
| 12 | +import pexpect |
6 | 13 | import pytest |
| 14 | +from pytest_embedded.utils import to_bytes |
| 15 | +from pytest_embedded.utils import to_str |
7 | 16 | from pytest_embedded_idf import IdfDut |
8 | 17 | from pytest_embedded_idf.utils import idf_parametrize |
9 | 18 |
|
10 | | -if typing.TYPE_CHECKING: |
11 | | - from conftest import OpenOCD |
| 19 | +MAX_RETRIES = 3 |
| 20 | +RETRY_DELAY = 1 |
| 21 | +TELNET_PORT = 4444 |
12 | 22 |
|
13 | 23 |
|
14 | | -def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: |
| 24 | +class OpenOCD: |
| 25 | + def __init__(self, dut: 'IdfDut'): |
| 26 | + self.dut = dut |
| 27 | + self.telnet: Optional[Telnet] = None |
| 28 | + self.log_file = os.path.join(self.dut.logdir, 'ocd.txt') |
| 29 | + self.proc: Optional[pexpect.spawn] = None |
| 30 | + |
| 31 | + def run(self) -> Optional['OpenOCD']: |
| 32 | + desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json') |
| 33 | + |
| 34 | + try: |
| 35 | + with open(desc_path, 'r') as f: |
| 36 | + project_desc = json.load(f) |
| 37 | + except FileNotFoundError: |
| 38 | + logging.error('Project description file not found at %s', desc_path) |
| 39 | + return None |
| 40 | + |
| 41 | + openocd_scripts = os.getenv('OPENOCD_SCRIPTS') |
| 42 | + if not openocd_scripts: |
| 43 | + logging.error('OPENOCD_SCRIPTS environment variable is not set.') |
| 44 | + return None |
| 45 | + |
| 46 | + debug_args = project_desc.get('debug_arguments_openocd') |
| 47 | + if not debug_args: |
| 48 | + logging.error("'debug_arguments_openocd' key is missing in project_description.json") |
| 49 | + return None |
| 50 | + |
| 51 | + # For debug purposes, make the value '4' |
| 52 | + ocd_env = os.environ.copy() |
| 53 | + ocd_env['LIBUSB_DEBUG'] = '1' |
| 54 | + |
| 55 | + for _ in range(1, MAX_RETRIES + 1): |
| 56 | + try: |
| 57 | + self.proc = pexpect.spawn( |
| 58 | + command='openocd', |
| 59 | + args=['-s', openocd_scripts] + debug_args.split(), |
| 60 | + timeout=5, |
| 61 | + encoding='utf-8', |
| 62 | + codec_errors='ignore', |
| 63 | + env=ocd_env, |
| 64 | + ) |
| 65 | + if self.proc and self.proc.isalive(): |
| 66 | + self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5) |
| 67 | + return self |
| 68 | + except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e: |
| 69 | + logging.error('Error running OpenOCD: %s', str(e)) |
| 70 | + if self.proc and self.proc.isalive(): |
| 71 | + self.proc.terminate() |
| 72 | + time.sleep(RETRY_DELAY) |
| 73 | + |
| 74 | + logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES) |
| 75 | + return None |
| 76 | + |
| 77 | + def connect_telnet(self) -> None: |
| 78 | + for attempt in range(1, MAX_RETRIES + 1): |
| 79 | + try: |
| 80 | + self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5) |
| 81 | + break |
| 82 | + except ConnectionRefusedError as e: |
| 83 | + logging.error('Error telnet connection: %s in attempt:%d', e, attempt) |
| 84 | + time.sleep(1) |
| 85 | + else: |
| 86 | + raise ConnectionRefusedError |
| 87 | + |
| 88 | + def write(self, s: str) -> Any: |
| 89 | + if self.telnet is None: |
| 90 | + logging.error('Telnet connection is not established.') |
| 91 | + return '' |
| 92 | + resp = self.telnet.read_very_eager() |
| 93 | + self.telnet.write(to_bytes(s, '\n')) |
| 94 | + resp += self.telnet.read_until(b'>') |
| 95 | + return to_str(resp) |
| 96 | + |
| 97 | + def apptrace_wait_stop(self, timeout: int = 30) -> None: |
| 98 | + stopped = False |
| 99 | + end_before = time.time() + timeout |
| 100 | + while not stopped: |
| 101 | + cmd_out = self.write('esp apptrace status') |
| 102 | + for line in cmd_out.splitlines(): |
| 103 | + if line.startswith('Tracing is STOPPED.'): |
| 104 | + stopped = True |
| 105 | + break |
| 106 | + if not stopped and time.time() > end_before: |
| 107 | + raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') |
| 108 | + time.sleep(1) |
| 109 | + |
| 110 | + def kill(self) -> None: |
| 111 | + # Check if the process is still running |
| 112 | + if self.proc and self.proc.isalive(): |
| 113 | + self.proc.terminate() |
| 114 | + self.proc.kill(signal.SIGKILL) |
| 115 | + |
| 116 | + |
| 117 | +def _test_examples_app_trace_basic(dut: IdfDut) -> None: |
15 | 118 | dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5) |
16 | | - with openocd.run() as openocd: |
| 119 | + openocd = OpenOCD(dut).run() |
| 120 | + assert openocd |
| 121 | + try: |
| 122 | + openocd.connect_telnet() |
| 123 | + openocd.write('log_output {}'.format(openocd.log_file)) |
17 | 124 | openocd.write('reset run') |
18 | 125 | dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5) |
19 | 126 | time.sleep(1) # wait for APPTRACE_INIT semihosting call |
@@ -44,15 +151,17 @@ def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: |
44 | 151 | break |
45 | 152 | if found is not True: |
46 | 153 | raise RuntimeError('"{}" could not be found in {}'.format(log_str, 'apptrace.log')) |
| 154 | + finally: |
| 155 | + openocd.kill() |
47 | 156 |
|
48 | 157 |
|
49 | 158 | @pytest.mark.jtag |
50 | 159 | @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) |
51 | | -def test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None: |
52 | | - _test_examples_app_trace_basic(openocd, dut) |
| 160 | +def test_examples_app_trace_basic(dut: IdfDut) -> None: |
| 161 | + _test_examples_app_trace_basic(dut) |
53 | 162 |
|
54 | 163 |
|
55 | 164 | @pytest.mark.usb_serial_jtag |
56 | 165 | @idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target']) |
57 | | -def test_examples_app_trace_basic_usj(openocd: 'OpenOCD', dut: IdfDut) -> None: |
58 | | - _test_examples_app_trace_basic(openocd, dut) |
| 166 | +def test_examples_app_trace_basic_usj(dut: IdfDut) -> None: |
| 167 | + _test_examples_app_trace_basic(dut) |
0 commit comments