Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions doc/develop/test/twister.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,25 @@ In this case you can run twister with the following options:
The script is user-defined and handles delivering the messages which can be
used by twister to determine the test execution status.

To support devices that communicate via RTT, use the ``--device-rtt`` option. Twister
will connect to the device with ``west rtt`` command and capture the log messages.
In this case you can run twister with the following options:

.. tabs::

.. group-tab:: Linux

.. code-block:: bash

scripts/twister --device-testing --device-rtt \
-p nrf7002dk/nrf5340/cpuapp -T tests/kernel

.. group-tab:: Windows

.. note::

Not supported on Windows OS

The ``--device-flash-timeout`` option allows to set explicit timeout on the
device flash operation, for example when device flashing takes significantly
large time.
Expand Down Expand Up @@ -1480,6 +1499,23 @@ work. It is equivalent to following west and twister commands.
manually according to above example. This is because the serial port
of the PTY is not fixed and being allocated in the system at runtime.

RTT support using ``--device-rtt`` can also be used in the
hardware map:

.. code-block:: yaml

- connected: true
id: 001050795550
platform: nrf7002dk/nrf5340/cpuapp
product: J-Link
runner: nrfutil
rtt: true
rtt_runner: jlink

If a different runner should be used for RTT connection than for flashing, specify
``rtt_runner`` field, like shown above. If a different runner is not needed, then
``rtt_runner`` can be omitted.

If west is not available or does not know how to flash your system, a custom
flash command can be specified using the ``flash-command`` flag. The script is
called with a ``--build-dir`` with the path of the current build, as well as a
Expand Down
6 changes: 6 additions & 0 deletions samples/subsys/testsuite/pytest/shell/testcase.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ tests:
harness: pytest
extra_configs:
- CONFIG_SHELL_VT100_COLORS=n
sample.pytest.rtt:
harness: pytest
extra_configs:
- CONFIG_USE_SEGGER_RTT=y
- CONFIG_LOG_MODE_DEFERRED=y
- CONFIG_SHELL_BACKEND_RTT=y
sample.harness.shell:
harness: shell
harness_config:
Expand Down
7 changes: 7 additions & 0 deletions scripts/pylib/pytest-twister-harness/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ Run exemplary test shell application by Twister:
# hardware
./scripts/twister -p nrf52840dk/nrf52840 --device-testing --device-serial /dev/ttyACM0 -T samples/subsys/testsuite/pytest/shell

# hardware over RTT
./scripts/twister -p nrf52840dk/nrf52840 --device-testing --device-rtt -T samples/subsys/testsuite/pytest/shell --test sample.pytest.rtt

or build shell application by west and call pytest directly:

.. code-block:: sh
Expand All @@ -45,3 +48,7 @@ or build shell application by west and call pytest directly:
# hardware
west build -p -b nrf52840dk/nrf52840
pytest --twister-harness --device-type=hardware --device-serial=/dev/ttyACM0 --build-dir=build -p twister_harness.plugin

# hardware over RTT
west build -p -b nrf52840dk/nrf52840 . -T sample.pytest.rtt
pytest --twister-harness --device-type=hardware --device-rtt=True --build-dir=build -p twister_harness.plugin
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ def launch(self) -> None:
self._start_reader_thread()

if self.device_config.flash_before:
# For hardware devices with shared USB or software USB, connect after flashing.
# Retry for up to 10 seconds for USB-CDC based devices to enumerate.
# For hardware devices with shared USB or software USB or RTT, connect after
# flashing. Retry for up to 10 seconds for USB-CDC based devices to
# enumerate.
self._flash_and_run()
self.connect(retry_s = 10)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, device_config: DeviceConfig) -> None:
super().__init__(device_config)
self._flashing_timeout: float = device_config.flash_timeout
self._serial_connection: serial.Serial | None = None
self._serial_pty_proc: subprocess.Popen | None = None
self._proc: subprocess.Popen | None = None
self._serial_buffer: bytearray = bytearray()

self.device_log_path: Path = device_config.build_dir / 'device.log'
Expand Down Expand Up @@ -76,6 +76,25 @@ def generate_command(self) -> None:
command.extend(command_extra_args)
self.command = command

def generate_rtt_command(self):
"""Return command to connect to the device via RTT."""

command = ["west", "rtt", "--skip-rebuild", "-d",
str(self.device_config.build_dir)]

rtt_runner = self.device_config.rtt_runner
if rtt_runner:
command.append("--runner")
command.append(rtt_runner)

if rtt_runner in ("jlink", "pyocd"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probe-rs runner supports west rtt as well as --dev-id

Suggested change
if rtt_runner in ("jlink", "pyocd"):
if rtt_runner in ("jlink", "pyocd", "probe-rs"):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for noticing, I have added probe-rs.

command.append("--dev-id")
command.append(self.device_config.id)

# Since _start_pty expects a string, we need to convert the command list into
# one.
return " ".join(command)

def _prepare_runner_args(self) -> tuple[list[str], list[str]]:
base_args: list[str] = []
extra_args: list[str] = []
Expand Down Expand Up @@ -154,7 +173,13 @@ def _flash_and_run(self) -> None:
raise TwisterHarnessException(msg)

def _connect_device(self) -> None:
serial_name = self._open_serial_pty() or self.device_config.serial
if self.device_config.serial_pty:
serial_name = self._open_pty(self.device_config.serial_pty)
elif self.device_config.use_rtt:
serial_name = self._open_pty(self.generate_rtt_command())
else:
serial_name = self.device_config.serial

logger.debug('Opening serial connection for %s', serial_name)
try:
self._serial_connection = serial.Serial(
Expand All @@ -167,17 +192,15 @@ def _connect_device(self) -> None:
)
except serial.SerialException as exc:
logger.exception('Cannot open connection: %s', exc)
self._close_serial_pty()
self._close_pty()
raise

self._serial_connection.flush()
self._serial_connection.reset_input_buffer()
self._serial_connection.reset_output_buffer()

def _open_serial_pty(self) -> str | None:
"""Open a pty pair, run process and return tty name"""
if not self.device_config.serial_pty:
return None
def _open_pty(self, command: str) -> str | None:
"""Open a pty pair, run process with command and return tty name."""

try:
master, slave = pty.openpty()
Expand All @@ -186,32 +209,37 @@ def _open_serial_pty(self) -> str | None:
raise exc

try:
self._serial_pty_proc = subprocess.Popen(
re.split(',| ', self.device_config.serial_pty),
self._proc = subprocess.Popen(
re.split(',| ', command),
stdout=master,
stdin=master,
stderr=master
)
except subprocess.CalledProcessError as exc:
logger.exception('Failed to run subprocess %s, error %s', self.device_config.serial_pty, str(exc))
logger.exception('Failed to run subprocess %s, error %s', command, str(exc))
raise
return os.ttyname(slave)

def _disconnect_device(self) -> None:
if self._serial_connection:
serial_name = self._serial_connection.port
self._serial_connection.close()
# self._serial_connection = None
logger.debug('Closed serial connection for %s', serial_name)
self._close_serial_pty()
self._close_pty()

def _close_pty(self) -> None:
"""Terminate the process opened for serial pty script or RTT."""
if self._proc:
terminate_process(self._proc)
self._proc.communicate(timeout=self.base_timeout)

if self.device_config.serial_pty:
proc = self.device_config.serial_pty
else:
proc = "west rtt"

def _close_serial_pty(self) -> None:
"""Terminate the process opened for serial pty script"""
if self._serial_pty_proc:
self._serial_pty_proc.terminate()
self._serial_pty_proc.communicate(timeout=self.base_timeout)
logger.debug('Process %s terminated', self.device_config.serial_pty)
self._serial_pty_proc = None
logger.debug('Process %s terminated', proc)
self._proc = None

def _close_device(self) -> None:
if self.device_config.post_script:
Expand Down Expand Up @@ -282,7 +310,7 @@ def _flush_device_output(self) -> None:
def _clear_internal_resources(self) -> None:
super()._clear_internal_resources()
self._serial_connection = None
self._serial_pty_proc = None
self._proc = None
self._serial_buffer.clear()

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ def dut(request: pytest.FixtureRequest, device_object: DeviceAdapter) -> Generat
def shell(dut: DeviceAdapter) -> Shell:
"""Return ready to use shell interface"""
shell = Shell(dut, timeout=20.0)

symbol = 'CONFIG_SHELL_PROMPT_RTT' if dut.device_config.use_rtt else 'CONFIG_SHELL_PROMPT_UART'

if prompt := find_in_config(Path(dut.device_config.app_build_dir) / 'zephyr' / '.config',
'CONFIG_SHELL_PROMPT_UART'):
symbol):
shell.prompt = prompt
logger.info('Wait for prompt')
if not shell.wait_for_prompt():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,30 @@ def exec_command(
self, command: str, timeout: float | None = None, print_output: bool = True
) -> list[str]:
"""
Send shell command to a device and return response. Passed command
is extended by double enter sings - first one to execute this command
on a device, second one to receive next prompt what is a signal that
execution was finished. Method returns printout of the executed command.
Send shell command to a device and return response. Method returns printout of
the executed command.
"""
timeout = timeout or self.base_timeout
command_ext = f'{command}\n\n'
command_ext = f'{command}\n'
regex_prompt = re.escape(self.prompt)
regex_command = f'.*{re.escape(command)}'

# Execute command
self._device.clear_buffer()
self._device.write(command_ext.encode())
lines: list[str] = []
# wait for device command print - it should be done immediately after sending command to device
# wait for device command print - it should be done immediately after sending
# command to device.
lines.extend(
self._device.readlines_until(
regex=regex_command, timeout=1.0, print_output=print_output
)
)

# Send single enter to get next prompt after command execution, as it signals
# that execution finished.
self._device.write(b"\n")

# wait for device command execution
lines.extend(
self._device.readlines_until(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,17 @@ def pytest_addoption(parser: pytest.Parser):
)
twister_harness_group.addoption(
'--runner',
help='Use the specified west runner (pyocd, nrfjprog, etc.).'
help='Use the specified west runner (pyocd, nrfjprog, etc.) when running west flash.'
)
twister_harness_group.addoption(
'--runner-params',
action='append',
help='Use the specified west runner params.'
)
twister_harness_group.addoption(
'--rtt-runner',
help='Use the specified west runner (pyocd, nrfjprog, etc.) when running west rtt.'
)
twister_harness_group.addoption(
'--device-id',
help='ID of connected hardware device (for example 000682459367).'
Expand All @@ -89,6 +93,11 @@ def pytest_addoption(parser: pytest.Parser):
'--device-serial-pty',
help='Script for controlling pseudoterminal.'
)
twister_harness_group.addoption(
'--device-rtt',
type=bool,
help='Use RTT as communication transport.',
)
twister_harness_group.addoption(
'--flash-before',
type=bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class DeviceConfig:
baud: int = 115200
runner: str = ''
runner_params: list[str] = field(default_factory=list, repr=False)
rtt_runner: str = ''
use_rtt: bool = False
id: str = ''
product: str = ''
serial_pty: str = ''
Expand Down Expand Up @@ -68,6 +70,10 @@ def create(cls, config: pytest.Config) -> TwisterHarnessConfig:
runner_params: list[str] = []
if config.option.runner_params:
runner_params = [w.strip() for w in config.option.runner_params]
if config.option.device_rtt:
flash_before = True
else:
flash_before = bool(config.option.flash_before)
device_from_cli = DeviceConfig(
type=config.option.device_type,
build_dir=_cast_to_path(config.option.build_dir),
Expand All @@ -78,10 +84,12 @@ def create(cls, config: pytest.Config) -> TwisterHarnessConfig:
baud=config.option.device_serial_baud,
runner=config.option.runner,
runner_params=runner_params,
rtt_runner=config.option.rtt_runner,
use_rtt=config.option.device_rtt,
id=config.option.device_id,
product=config.option.device_product,
serial_pty=config.option.device_serial_pty,
flash_before=bool(config.option.flash_before),
flash_before=flash_before,
west_flash_extra_args=west_flash_extra_args,
flash_command=flash_command,
pre_script=_cast_to_path(config.option.pre_script),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def test_if_hardware_adapter_uses_serial_pty(

monkeypatch.setattr('twister_harness.device.hardware_adapter.pty.openpty', lambda: (123, 456))
monkeypatch.setattr('twister_harness.device.hardware_adapter.os.ttyname', lambda x: f'/pty/ttytest/{x}')
monkeypatch.setattr('twister_harness.device.hardware_adapter.terminate_process', lambda x: None)

serial_mock = mock.Mock()
serial_mock.port = '/pty/ttytest/456'
Expand All @@ -235,7 +236,7 @@ def test_if_hardware_adapter_uses_serial_pty(
device._device_run.set()
device.connect()
assert device._serial_connection.port == '/pty/ttytest/456' # type: ignore[union-attr]
assert device._serial_pty_proc
assert device._proc
patched_popen.assert_called_with(
['script.py'],
stdout=123,
Expand All @@ -244,7 +245,7 @@ def test_if_hardware_adapter_uses_serial_pty(
)

device.disconnect()
assert not device._serial_pty_proc
assert not device._proc


def test_if_hardware_adapter_properly_send_data_to_subprocess(
Expand Down
Loading
Loading