Skip to content

Commit fc70b18

Browse files
committed
scripts: pytest: add RTT support in pytest-twister-harness
This commit introduces --device-rtt flag in the Pytest Twister harness, allowing RTT as a communication protocol between the host PC and the DUT. To demonstrate this, a new build configuration was added to the pytest/shell sample. By following the newly added instructions in the scripts/pylib/pytest-twister-harness/README.rst, users can first build the pytest/shell sample with the RTT shell and then run the pytest with specific flags to run the tests via the RTT. Two additional notes related to the changes: - The way the pty process was terminated had to be changed. west rtt command spawns two subprocesses, and the previous implementation only terminated a single one. The already provided terminate_process function was used to terminate all child processes that pty process started. - Previously, exec_command sent out a command string, followed by two newline characters. The first one was meant to execute the command on the device, the second one was meant to produce the next prompt, which would signal that execution of the command finished. In some instances, when RTT was used, the device ignored the second newline, and thus the prompt was not emitted, bringing the function to a halt, until the timeout expired. This was noticed with the `kernel version` command, the one that is used in one of the tests in pytest/shell. Weirdly enough, that was the only instance of the "newline ignoring" that could be found. Testing `kernel uptime`, for example, worked fine. The workaround was to send a command string with a single newline, wait for the command print and then send the second newline. It was confirmed that this approach works both for the UART and RTT. Signed-off-by: Marko Sagadin <[email protected]>
1 parent deb47c7 commit fc70b18

File tree

9 files changed

+94
-32
lines changed

9 files changed

+94
-32
lines changed

samples/subsys/testsuite/pytest/shell/testcase.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ tests:
1818
harness: pytest
1919
extra_configs:
2020
- CONFIG_SHELL_VT100_COLORS=n
21+
sample.pytest.rtt:
22+
harness: pytest
23+
extra_configs:
24+
- CONFIG_USE_SEGGER_RTT=y
25+
- CONFIG_LOG_MODE_DEFERRED=y
26+
- CONFIG_SHELL_BACKEND_RTT=y
2127
sample.harness.shell:
2228
harness: shell
2329
harness_config:

scripts/pylib/pytest-twister-harness/README.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,7 @@ or build shell application by west and call pytest directly:
4545
# hardware
4646
west build -p -b nrf52840dk/nrf52840
4747
pytest --twister-harness --device-type=hardware --device-serial=/dev/ttyACM0 --build-dir=build -p twister_harness.plugin
48+
49+
# hardware over RTT
50+
west build -p -b nrf52840dk/nrf52840 . -T sample.pytest.rtt
51+
pytest --twister-harness --device-type=hardware --device-rtt=True --build-dir=build -p twister_harness.plugin

scripts/pylib/pytest-twister-harness/src/twister_harness/device/device_adapter.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ def launch(self) -> None:
8181
self._start_reader_thread()
8282

8383
if self.device_config.flash_before:
84-
# For hardware devices with shared USB or software USB, connect after flashing.
85-
# Retry for up to 10 seconds for USB-CDC based devices to enumerate.
84+
# For hardware devices with shared USB or software USB or RTT, connect after
85+
# flashing. Retry for up to 10 seconds for USB-CDC based devices to
86+
# enumerate.
8687
self._flash_and_run()
8788
self.connect(retry_s = 10)
8889
else:

scripts/pylib/pytest-twister-harness/src/twister_harness/device/hardware_adapter.py

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, device_config: DeviceConfig) -> None:
3232
super().__init__(device_config)
3333
self._flashing_timeout: float = device_config.flash_timeout
3434
self._serial_connection: serial.Serial | None = None
35-
self._serial_pty_proc: subprocess.Popen | None = None
35+
self._proc: subprocess.Popen | None = None
3636
self._serial_buffer: bytearray = bytearray()
3737

3838
self.device_log_path: Path = device_config.build_dir / 'device.log'
@@ -76,6 +76,25 @@ def generate_command(self) -> None:
7676
command.extend(command_extra_args)
7777
self.command = command
7878

79+
def generate_rtt_command(self):
80+
"""Return command to connect to the device via RTT."""
81+
82+
command = ["west", "rtt", "--skip-rebuild", "-d",
83+
str(self.device_config.build_dir)]
84+
85+
rtt_runner = self.device_config.rtt_runner
86+
if rtt_runner:
87+
command.append("--runner")
88+
command.append(rtt_runner)
89+
90+
if rtt_runner in ("jlink", "pyocd"):
91+
command.append("--dev-id")
92+
command.append(self.device_config.id)
93+
94+
# Since _start_pty expects a string, we need to convert the command list into
95+
# one.
96+
return " ".join(command)
97+
7998
def _prepare_runner_args(self) -> tuple[list[str], list[str]]:
8099
base_args: list[str] = []
81100
extra_args: list[str] = []
@@ -154,7 +173,13 @@ def _flash_and_run(self) -> None:
154173
raise TwisterHarnessException(msg)
155174

156175
def _connect_device(self) -> None:
157-
serial_name = self._open_serial_pty() or self.device_config.serial
176+
if self.device_config.serial_pty:
177+
serial_name = self._open_pty(self.device_config.serial_pty)
178+
elif self.device_config.use_rtt:
179+
serial_name = self._open_pty(self.generate_rtt_command())
180+
else:
181+
serial_name = self.device_config.serial
182+
158183
logger.debug('Opening serial connection for %s', serial_name)
159184
try:
160185
self._serial_connection = serial.Serial(
@@ -167,17 +192,15 @@ def _connect_device(self) -> None:
167192
)
168193
except serial.SerialException as exc:
169194
logger.exception('Cannot open connection: %s', exc)
170-
self._close_serial_pty()
195+
self._close_pty()
171196
raise
172197

173198
self._serial_connection.flush()
174199
self._serial_connection.reset_input_buffer()
175200
self._serial_connection.reset_output_buffer()
176201

177-
def _open_serial_pty(self) -> str | None:
178-
"""Open a pty pair, run process and return tty name"""
179-
if not self.device_config.serial_pty:
180-
return None
202+
def _open_pty(self, command: str) -> str | None:
203+
"""Open a pty pair, run process with command and return tty name."""
181204

182205
try:
183206
master, slave = pty.openpty()
@@ -186,32 +209,37 @@ def _open_serial_pty(self) -> str | None:
186209
raise exc
187210

188211
try:
189-
self._serial_pty_proc = subprocess.Popen(
190-
re.split(',| ', self.device_config.serial_pty),
212+
self._proc = subprocess.Popen(
213+
re.split(',| ', command),
191214
stdout=master,
192215
stdin=master,
193216
stderr=master
194217
)
195218
except subprocess.CalledProcessError as exc:
196-
logger.exception('Failed to run subprocess %s, error %s', self.device_config.serial_pty, str(exc))
219+
logger.exception('Failed to run subprocess %s, error %s', command, str(exc))
197220
raise
198221
return os.ttyname(slave)
199222

200223
def _disconnect_device(self) -> None:
201224
if self._serial_connection:
202225
serial_name = self._serial_connection.port
203226
self._serial_connection.close()
204-
# self._serial_connection = None
205227
logger.debug('Closed serial connection for %s', serial_name)
206-
self._close_serial_pty()
228+
self._close_pty()
229+
230+
def _close_pty(self) -> None:
231+
"""Terminate the process opened for serial pty script or RTT."""
232+
if self._proc:
233+
terminate_process(self._proc)
234+
self._proc.communicate(timeout=self.base_timeout)
235+
236+
if self.device_config.serial_pty:
237+
proc = self.device_config.serial_pty
238+
else:
239+
proc = "west rtt"
207240

208-
def _close_serial_pty(self) -> None:
209-
"""Terminate the process opened for serial pty script"""
210-
if self._serial_pty_proc:
211-
self._serial_pty_proc.terminate()
212-
self._serial_pty_proc.communicate(timeout=self.base_timeout)
213-
logger.debug('Process %s terminated', self.device_config.serial_pty)
214-
self._serial_pty_proc = None
241+
logger.debug('Process %s terminated', proc)
242+
self._proc = None
215243

216244
def _close_device(self) -> None:
217245
if self.device_config.post_script:
@@ -282,7 +310,7 @@ def _flush_device_output(self) -> None:
282310
def _clear_internal_resources(self) -> None:
283311
super()._clear_internal_resources()
284312
self._serial_connection = None
285-
self._serial_pty_proc = None
313+
self._proc = None
286314
self._serial_buffer.clear()
287315

288316
@staticmethod

scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ def dut(request: pytest.FixtureRequest, device_object: DeviceAdapter) -> Generat
7373
def shell(dut: DeviceAdapter) -> Shell:
7474
"""Return ready to use shell interface"""
7575
shell = Shell(dut, timeout=20.0)
76+
77+
symbol = 'CONFIG_SHELL_PROMPT_RTT' if dut.device_config.use_rtt else 'CONFIG_SHELL_PROMPT_UART'
78+
7679
if prompt := find_in_config(Path(dut.device_config.app_build_dir) / 'zephyr' / '.config',
77-
'CONFIG_SHELL_PROMPT_UART'):
80+
symbol):
7881
shell.prompt = prompt
7982
logger.info('Wait for prompt')
8083
if not shell.wait_for_prompt():

scripts/pylib/pytest-twister-harness/src/twister_harness/helpers/shell.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,24 +54,30 @@ def exec_command(
5454
self, command: str, timeout: float | None = None, print_output: bool = True
5555
) -> list[str]:
5656
"""
57-
Send shell command to a device and return response. Passed command
58-
is extended by double enter sings - first one to execute this command
59-
on a device, second one to receive next prompt what is a signal that
60-
execution was finished. Method returns printout of the executed command.
57+
Send shell command to a device and return response. Method returns printout of
58+
the executed command.
6159
"""
6260
timeout = timeout or self.base_timeout
63-
command_ext = f'{command}\n\n'
61+
command_ext = f'{command}\n'
6462
regex_prompt = re.escape(self.prompt)
6563
regex_command = f'.*{re.escape(command)}'
64+
65+
# Execute command
6666
self._device.clear_buffer()
6767
self._device.write(command_ext.encode())
6868
lines: list[str] = []
69-
# wait for device command print - it should be done immediately after sending command to device
69+
# wait for device command print - it should be done immediately after sending
70+
# command to device.
7071
lines.extend(
7172
self._device.readlines_until(
7273
regex=regex_command, timeout=1.0, print_output=print_output
7374
)
7475
)
76+
77+
# Send single enter to get next prompt after command execution, as it signals
78+
# that execution finished.
79+
self._device.write(b"\n")
80+
7581
# wait for device command execution
7682
lines.extend(
7783
self._device.readlines_until(

scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,17 @@ def pytest_addoption(parser: pytest.Parser):
7070
)
7171
twister_harness_group.addoption(
7272
'--runner',
73-
help='Use the specified west runner (pyocd, nrfjprog, etc.).'
73+
help='Use the specified west runner (pyocd, nrfjprog, etc.) when running west flash.'
7474
)
7575
twister_harness_group.addoption(
7676
'--runner-params',
7777
action='append',
7878
help='Use the specified west runner params.'
7979
)
80+
twister_harness_group.addoption(
81+
'--rtt-runner',
82+
help='Use the specified west runner (pyocd, nrfjprog, etc.) when running west rtt.'
83+
)
8084
twister_harness_group.addoption(
8185
'--device-id',
8286
help='ID of connected hardware device (for example 000682459367).'
@@ -89,6 +93,11 @@ def pytest_addoption(parser: pytest.Parser):
8993
'--device-serial-pty',
9094
help='Script for controlling pseudoterminal.'
9195
)
96+
twister_harness_group.addoption(
97+
'--device-rtt',
98+
type=bool,
99+
help='Use RTT as communication transport.',
100+
)
92101
twister_harness_group.addoption(
93102
'--flash-before',
94103
type=bool,

scripts/pylib/pytest-twister-harness/src/twister_harness/twister_harness_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class DeviceConfig:
2626
baud: int = 115200
2727
runner: str = ''
2828
runner_params: list[str] = field(default_factory=list, repr=False)
29+
rtt_runner: str = ''
30+
use_rtt: bool = False
2931
id: str = ''
3032
product: str = ''
3133
serial_pty: str = ''
@@ -78,6 +80,8 @@ def create(cls, config: pytest.Config) -> TwisterHarnessConfig:
7880
baud=config.option.device_serial_baud,
7981
runner=config.option.runner,
8082
runner_params=runner_params,
83+
rtt_runner=config.option.rtt_runner,
84+
use_rtt=config.option.device_rtt,
8185
id=config.option.device_id,
8286
product=config.option.device_product,
8387
serial_pty=config.option.device_serial_pty,

scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ def test_if_hardware_adapter_uses_serial_pty(
227227

228228
monkeypatch.setattr('twister_harness.device.hardware_adapter.pty.openpty', lambda: (123, 456))
229229
monkeypatch.setattr('twister_harness.device.hardware_adapter.os.ttyname', lambda x: f'/pty/ttytest/{x}')
230+
monkeypatch.setattr('twister_harness.device.hardware_adapter.terminate_process', lambda x: None)
230231

231232
serial_mock = mock.Mock()
232233
serial_mock.port = '/pty/ttytest/456'
@@ -235,7 +236,7 @@ def test_if_hardware_adapter_uses_serial_pty(
235236
device._device_run.set()
236237
device.connect()
237238
assert device._serial_connection.port == '/pty/ttytest/456' # type: ignore[union-attr]
238-
assert device._serial_pty_proc
239+
assert device._proc
239240
patched_popen.assert_called_with(
240241
['script.py'],
241242
stdout=123,
@@ -244,7 +245,7 @@ def test_if_hardware_adapter_uses_serial_pty(
244245
)
245246

246247
device.disconnect()
247-
assert not device._serial_pty_proc
248+
assert not device._proc
248249

249250

250251
def test_if_hardware_adapter_properly_send_data_to_subprocess(

0 commit comments

Comments
 (0)