Skip to content

Commit 0eb74ff

Browse files
committed
ci: OpenOCD class as fixture
1 parent c9cf11c commit 0eb74ff

File tree

5 files changed

+172
-495
lines changed

5 files changed

+172
-495
lines changed

conftest.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@
1919

2020
import glob
2121
import io
22+
import json
2223
import logging
2324
import os
2425
import re
26+
import signal
27+
import time
2528
import typing as t
2629
import zipfile
2730
from copy import deepcopy
31+
from telnetlib import Telnet
2832
from urllib.parse import quote
2933

3034
import common_test_methods # noqa: F401
3135
import gitlab_api
36+
import pexpect
3237
import pytest
3338
import requests
3439
import yaml
@@ -53,6 +58,8 @@
5358
from idf_pytest.utils import format_case_id
5459
from pytest_embedded.plugin import multi_dut_argument
5560
from pytest_embedded.plugin import multi_dut_fixture
61+
from pytest_embedded.utils import to_bytes
62+
from pytest_embedded.utils import to_str
5663
from pytest_embedded_idf.dut import IdfDut
5764
from pytest_embedded_idf.unity_tester import CaseTester
5865

@@ -148,6 +155,113 @@ def download_app(self, app_build_path: str, artifact_type: t.Optional[ArtifactTy
148155
super().download_app(app_build_path, artifact_type)
149156

150157

158+
class OpenOCD:
159+
def __init__(self, dut: 'IdfDut'):
160+
self.MAX_RETRIES = 3
161+
self.RETRY_DELAY = 1
162+
self.TELNET_PORT = 4444
163+
self.dut = dut
164+
self.telnet: t.Optional[Telnet] = None
165+
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
166+
self.proc: t.Optional[pexpect.spawn] = None
167+
168+
def __enter__(self) -> t.Optional['OpenOCD']:
169+
return self.run()
170+
171+
def __exit__(self, exception_type: t.Any, exception_value: t.Any, exception_traceback: t.Any) -> None:
172+
self.kill()
173+
174+
def run(self) -> t.Optional['OpenOCD']:
175+
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
176+
177+
try:
178+
with open(desc_path, 'r') as f:
179+
project_desc = json.load(f)
180+
except FileNotFoundError:
181+
logging.error('Project description file not found at %s', desc_path)
182+
raise
183+
184+
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
185+
if not openocd_scripts:
186+
raise RuntimeError('OPENOCD_SCRIPTS environment variable is not set.')
187+
188+
debug_args = project_desc.get('debug_arguments_openocd')
189+
if not debug_args:
190+
raise KeyError("'debug_arguments_openocd' key is missing in project_description.json")
191+
192+
# For debug purposes, make the value '4'
193+
ocd_env = os.environ.copy()
194+
ocd_env['LIBUSB_DEBUG'] = '1'
195+
196+
for _ in range(1, self.MAX_RETRIES + 1):
197+
try:
198+
self.proc = pexpect.spawn(
199+
command='openocd',
200+
args=['-s', openocd_scripts] + debug_args.split(),
201+
timeout=5,
202+
encoding='utf-8',
203+
codec_errors='ignore',
204+
env=ocd_env,
205+
)
206+
if self.proc and self.proc.isalive():
207+
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
208+
self.connect_telnet()
209+
self.write('log_output {}'.format(self.log_file))
210+
return self
211+
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT, ConnectionRefusedError) as e:
212+
logging.error('Error running OpenOCD: %s', str(e))
213+
self.kill()
214+
time.sleep(self.RETRY_DELAY)
215+
216+
raise RuntimeError('Failed to run OpenOCD after %d attempts.', self.MAX_RETRIES)
217+
218+
def connect_telnet(self) -> None:
219+
for attempt in range(1, self.MAX_RETRIES + 1):
220+
try:
221+
self.telnet = Telnet('127.0.0.1', self.TELNET_PORT, 5)
222+
break
223+
except ConnectionRefusedError as e:
224+
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
225+
time.sleep(1)
226+
else:
227+
raise ConnectionRefusedError
228+
229+
def write(self, s: str) -> t.Any:
230+
if self.telnet is None:
231+
logging.error('Telnet connection is not established.')
232+
return ''
233+
resp = self.telnet.read_very_eager()
234+
self.telnet.write(to_bytes(s, '\n'))
235+
resp += self.telnet.read_until(b'>')
236+
return to_str(resp)
237+
238+
def apptrace_wait_stop(self, timeout: int = 30) -> None:
239+
stopped = False
240+
end_before = time.time() + timeout
241+
while not stopped:
242+
cmd_out = self.write('esp apptrace status')
243+
for line in cmd_out.splitlines():
244+
if line.startswith('Tracing is STOPPED.'):
245+
stopped = True
246+
break
247+
if not stopped and time.time() > end_before:
248+
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
249+
time.sleep(1)
250+
251+
def kill(self) -> None:
252+
# Check if the process is still running
253+
if self.proc and self.proc.isalive():
254+
self.proc.terminate()
255+
self.proc.kill(signal.SIGKILL)
256+
257+
258+
@pytest.fixture
259+
def openocd(dut: IdfDut) -> OpenOCD:
260+
if isinstance(dut, tuple):
261+
raise ValueError('Multi-DUT support is not implemented yet')
262+
return OpenOCD(dut)
263+
264+
151265
@pytest.fixture(scope='session')
152266
def app_downloader(pipeline_id: t.Optional[str]) -> t.Optional[AppDownloader]:
153267
if not pipeline_id:
Lines changed: 9 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,19 @@
11
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
22
# SPDX-License-Identifier: Unlicense OR CC0-1.0
3-
import json
4-
import logging
5-
import os
6-
import signal
73
import time
8-
from telnetlib import Telnet
9-
from typing import Any
10-
from typing import Optional
4+
import typing
115

12-
import pexpect
136
import pytest
14-
from pytest_embedded.utils import to_bytes
15-
from pytest_embedded.utils import to_str
167
from pytest_embedded_idf import IdfDut
178
from pytest_embedded_idf.utils import idf_parametrize
189

19-
MAX_RETRIES = 3
20-
RETRY_DELAY = 1
21-
TELNET_PORT = 4444
10+
if typing.TYPE_CHECKING:
11+
from conftest import OpenOCD
2212

2313

24-
class OpenOCD:
25-
def __init__(self, dut: 'IdfDut'):
26-
self.dut = dut
27-
self.telnet: Optional[Telnet] = None
28-
self.log_file = os.path.join(self.dut.logdir, 'ocd.txt')
29-
self.proc: Optional[pexpect.spawn] = None
30-
31-
def run(self) -> Optional['OpenOCD']:
32-
desc_path = os.path.join(self.dut.app.binary_path, 'project_description.json')
33-
34-
try:
35-
with open(desc_path, 'r') as f:
36-
project_desc = json.load(f)
37-
except FileNotFoundError:
38-
logging.error('Project description file not found at %s', desc_path)
39-
return None
40-
41-
openocd_scripts = os.getenv('OPENOCD_SCRIPTS')
42-
if not openocd_scripts:
43-
logging.error('OPENOCD_SCRIPTS environment variable is not set.')
44-
return None
45-
46-
debug_args = project_desc.get('debug_arguments_openocd')
47-
if not debug_args:
48-
logging.error("'debug_arguments_openocd' key is missing in project_description.json")
49-
return None
50-
51-
# For debug purposes, make the value '4'
52-
ocd_env = os.environ.copy()
53-
ocd_env['LIBUSB_DEBUG'] = '1'
54-
55-
for _ in range(1, MAX_RETRIES + 1):
56-
try:
57-
self.proc = pexpect.spawn(
58-
command='openocd',
59-
args=['-s', openocd_scripts] + debug_args.split(),
60-
timeout=5,
61-
encoding='utf-8',
62-
codec_errors='ignore',
63-
env=ocd_env,
64-
)
65-
if self.proc and self.proc.isalive():
66-
self.proc.expect_exact('Info : Listening on port 3333 for gdb connections', timeout=5)
67-
return self
68-
except (pexpect.exceptions.EOF, pexpect.exceptions.TIMEOUT) as e:
69-
logging.error('Error running OpenOCD: %s', str(e))
70-
if self.proc and self.proc.isalive():
71-
self.proc.terminate()
72-
time.sleep(RETRY_DELAY)
73-
74-
logging.error('Failed to run OpenOCD after %d attempts.', MAX_RETRIES)
75-
return None
76-
77-
def connect_telnet(self) -> None:
78-
for attempt in range(1, MAX_RETRIES + 1):
79-
try:
80-
self.telnet = Telnet('127.0.0.1', TELNET_PORT, 5)
81-
break
82-
except ConnectionRefusedError as e:
83-
logging.error('Error telnet connection: %s in attempt:%d', e, attempt)
84-
time.sleep(1)
85-
else:
86-
raise ConnectionRefusedError
87-
88-
def write(self, s: str) -> Any:
89-
if self.telnet is None:
90-
logging.error('Telnet connection is not established.')
91-
return ''
92-
resp = self.telnet.read_very_eager()
93-
self.telnet.write(to_bytes(s, '\n'))
94-
resp += self.telnet.read_until(b'>')
95-
return to_str(resp)
96-
97-
def apptrace_wait_stop(self, timeout: int = 30) -> None:
98-
stopped = False
99-
end_before = time.time() + timeout
100-
while not stopped:
101-
cmd_out = self.write('esp apptrace status')
102-
for line in cmd_out.splitlines():
103-
if line.startswith('Tracing is STOPPED.'):
104-
stopped = True
105-
break
106-
if not stopped and time.time() > end_before:
107-
raise pexpect.TIMEOUT('Failed to wait for apptrace stop!')
108-
time.sleep(1)
109-
110-
def kill(self) -> None:
111-
# Check if the process is still running
112-
if self.proc and self.proc.isalive():
113-
self.proc.terminate()
114-
self.proc.kill(signal.SIGKILL)
115-
116-
117-
def _test_examples_app_trace_basic(dut: IdfDut) -> None:
14+
def _test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None:
11815
dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5)
119-
openocd = OpenOCD(dut).run()
120-
assert openocd
121-
try:
122-
openocd.connect_telnet()
123-
openocd.write('log_output {}'.format(openocd.log_file))
16+
with openocd.run() as openocd:
12417
openocd.write('reset run')
12518
dut.expect_exact('example: Waiting for OpenOCD connection', timeout=5)
12619
time.sleep(1) # wait for APPTRACE_INIT semihosting call
@@ -151,17 +44,15 @@ def _test_examples_app_trace_basic(dut: IdfDut) -> None:
15144
break
15245
if found is not True:
15346
raise RuntimeError('"{}" could not be found in {}'.format(log_str, 'apptrace.log'))
154-
finally:
155-
openocd.kill()
15647

15748

15849
@pytest.mark.jtag
15950
@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target'])
160-
def test_examples_app_trace_basic(dut: IdfDut) -> None:
161-
_test_examples_app_trace_basic(dut)
51+
def test_examples_app_trace_basic(openocd: 'OpenOCD', dut: IdfDut) -> None:
52+
_test_examples_app_trace_basic(openocd, dut)
16253

16354

16455
@pytest.mark.usb_serial_jtag
16556
@idf_parametrize('target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2'], indirect=['target'])
166-
def test_examples_app_trace_basic_usj(dut: IdfDut) -> None:
167-
_test_examples_app_trace_basic(dut)
57+
def test_examples_app_trace_basic_usj(openocd: 'OpenOCD', dut: IdfDut) -> None:
58+
_test_examples_app_trace_basic(openocd, dut)

0 commit comments

Comments
 (0)