Skip to content

Commit 9c91055

Browse files
nordic-baminordicjm
authored andcommitted
tests: drivers: uart: Add pytest based uart passthrough test
Verify the passtrough function with the automated two terminal test Signed-off-by: Bartosz Miller <[email protected]>
1 parent 153c932 commit 9c91055

File tree

8 files changed

+310
-0
lines changed

8 files changed

+310
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#
2+
# Copyright (c) 2025 Nordic Semiconductor ASA
3+
#
4+
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
5+
#
6+
7+
cmake_minimum_required(VERSION 3.20.0)
8+
9+
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
10+
11+
project(uart_passtrough)
12+
13+
target_sources(app PRIVATE src/main.c)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2025 Nordic Semiconductor ASA
3+
*
4+
* SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
5+
*/
6+
7+
/ {
8+
chosen {
9+
uart,passthrough = &uart1;
10+
};
11+
};
12+
13+
&uart1 {
14+
compatible = "nordic,nrf-uarte";
15+
current-speed = <115200>;
16+
status = "okay";
17+
hw-flow-control;
18+
};
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) 2025 Nordic Semiconductor ASA
3+
*
4+
* SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
5+
*/
6+
7+
/ {
8+
chosen {
9+
uart,passthrough = &uart135;
10+
};
11+
};
12+
13+
&uart135 {
14+
compatible = "nordic,nrf-uarte";
15+
current-speed = <115200>;
16+
status = "okay";
17+
memory-regions = <&cpuapp_dma_region>;
18+
hw-flow-control;
19+
};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
CONFIG_SERIAL=y
2+
CONFIG_UART_INTERRUPT_DRIVEN=y
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# Copyright (c) 2025 Nordic Semiconductor ASA
2+
#
3+
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
4+
5+
6+
import logging
7+
import time
8+
9+
import serial
10+
11+
logger = logging.getLogger("serial_port")
12+
logger.setLevel(logging.DEBUG)
13+
14+
15+
class SerialPort:
16+
"""
17+
Class for handling serial port
18+
Default settings:
19+
8 data bits, 1 stop bit, no parity
20+
"""
21+
22+
def __init__(self, serial_port: str, baudrate: int, timeout: int = 15):
23+
"""
24+
:param serial_port: full name of the serial device port
25+
:param baudrate: buadrate [bps]
26+
:param timeout: read/write timeout [s]
27+
"""
28+
self.port_name = serial_port
29+
self.baud_rate = baudrate
30+
self._timeout = timeout
31+
self.serial_port = self._instanitate_serial_port()
32+
33+
def _instanitate_serial_port(self) -> serial.Serial:
34+
"""
35+
Auxiliary method for handling serial port handler object instantiation
36+
"""
37+
try:
38+
port = serial.Serial(
39+
port=self.port_name, baudrate=self.baud_rate, timeout=self._timeout, rtscts=True
40+
)
41+
port.set_low_latency_mode(True)
42+
return port
43+
except serial.SerialException as exc:
44+
logger.error(f"Opening '{self.port_name}' failed with error: {exc}!")
45+
raise exc
46+
47+
def close(
48+
self,
49+
):
50+
"""
51+
Close serial port
52+
"""
53+
if self.serial_port:
54+
logger.debug("Closing serial port connection")
55+
self.serial_port.close()
56+
logger.debug("Serial port connection closed")
57+
self.serial_port = None
58+
59+
def open(self) -> None:
60+
"""
61+
Open serial port
62+
"""
63+
if not self.serial_port.is_open:
64+
logger.debug(f"Opening: '{self}'")
65+
self.serial_port.open()
66+
self.serial_port.reset_input_buffer()
67+
self.serial_port.reset_output_buffer()
68+
69+
def send(self, message: str, get_response: bool = False) -> str:
70+
"""
71+
Send one message (and optionally receive response)
72+
:param get_response: if True returns the received response
73+
:return: received response or empty string
74+
"""
75+
response: str = ""
76+
if self.serial_port.is_open is False:
77+
raise IOError("Serial port is closed")
78+
79+
try:
80+
time.sleep(0.25)
81+
self.serial_port.write_timeout = self._timeout
82+
logger.info(f"[{self.serial_port.port}] Serial --> {message}")
83+
self.serial_port.write((message + "\n").encode())
84+
if get_response:
85+
start = time.time()
86+
while (self.serial_port.in_waiting == 0) and (time.time() - start < self._timeout):
87+
# no data in buffer yet
88+
time.sleep(0.1)
89+
while (self.serial_port.in_waiting > 0) and (time.time() - start < self._timeout):
90+
time.sleep(0.1)
91+
response += self.serial_port.read_all().decode()
92+
logger.info(f"Serial <-- {response}")
93+
return response
94+
95+
except serial.SerialTimeoutException as exc:
96+
logger.error(f"Serial port read timeout: {exc}")
97+
98+
except serial.SerialException as exc:
99+
logger.error(f"Serial port read error: {exc}")
100+
101+
except UnicodeDecodeError as exc:
102+
logger.error(f"Decoding error: {exc}")
103+
104+
return response.rstrip("\n")
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Copyright (c) 2025 Nordic Semiconductor ASA
2+
#
3+
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
4+
5+
import logging
6+
7+
logger = logging.getLogger("uart_passtrough")
8+
logger.setLevel(logging.DEBUG)
9+
10+
from twister_harness import DeviceAdapter
11+
from serial_port import SerialPort
12+
13+
14+
def test_uart_passtrough(dut: DeviceAdapter):
15+
"""
16+
Verify the UART darta forwarding capability.
17+
Send message from 'passtrough' UART to 'console 'UART' and verify it,
18+
then send message from 'console' to 'passtrough' UART and verify it.
19+
"""
20+
21+
if "if00" in dut.device_config.serial:
22+
second_serial_port_name = dut.device_config.serial.replace("if00", "if02")
23+
elif "if02" in dut.device_config.serial:
24+
second_serial_port_name = dut.device_config.serial.replace("if02", "if00")
25+
26+
second_serial_port: SerialPort = SerialPort(
27+
serial_port=second_serial_port_name, baudrate=dut.device_config.baud
28+
)
29+
30+
dut.readlines_until(regex="Ready", print_output=True, timeout=2)
31+
second_serial_port.open()
32+
second_serial_port.send("con->pass")
33+
dut.readlines_until(regex="con->pass", print_output=True, timeout=2)
34+
dut.write("pass<-con".encode())
35+
second_port_data: str = second_serial_port.send("", get_response=True)
36+
assert "pass<-con" in second_port_data
37+
second_serial_port.close()
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (c) 2025 Nordic Semiconductor ASA
3+
*
4+
* SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
5+
*/
6+
7+
#include <zephyr/kernel.h>
8+
#include <zephyr/device.h>
9+
#include <zephyr/drivers/uart.h>
10+
11+
const struct device *const console_dev = DEVICE_DT_GET(DT_CHOSEN(zephyr_console));
12+
const struct device *const passtrough_dev = DEVICE_DT_GET(DT_CHOSEN(uart_passthrough));
13+
14+
typedef struct {
15+
uint8_t *rx_buffer;
16+
uint8_t *tx_buffer;
17+
const struct device *other_dev;
18+
19+
} uart_pass;
20+
21+
/*
22+
* Note: this simple test uses one byte buffer,
23+
* CPU must be able to receive and send one character,
24+
* before another one is received
25+
* This limits the usable buadrates range
26+
*/
27+
28+
static uint8_t console_buf[1];
29+
static uint8_t passthrough_buf[1];
30+
31+
void uart_tx_interrupt_service(const struct device *dev, uart_pass *test_data)
32+
{
33+
uart_fifo_fill(dev, test_data->tx_buffer, 1);
34+
uart_irq_tx_disable(dev);
35+
}
36+
37+
void uart_rx_interrupt_service(const struct device *dev, uart_pass *test_data)
38+
{
39+
int rx_data_length = 0;
40+
41+
do {
42+
rx_data_length = uart_fifo_read(dev, test_data->rx_buffer, 1);
43+
} while (rx_data_length);
44+
uart_irq_tx_enable(test_data->other_dev);
45+
}
46+
47+
void uart_isr_handler(const struct device *dev, void *user_data)
48+
{
49+
uart_pass *test_data = (uart_pass *)user_data;
50+
51+
uart_irq_update(dev);
52+
while (uart_irq_is_pending(dev)) {
53+
if (uart_irq_rx_ready(dev)) {
54+
uart_rx_interrupt_service(dev, test_data);
55+
}
56+
if (uart_irq_tx_ready(dev)) {
57+
uart_tx_interrupt_service(dev, test_data);
58+
}
59+
}
60+
}
61+
62+
int main(void)
63+
{
64+
int err;
65+
66+
uart_pass console_dev_data;
67+
uart_pass passtrough_dev_data;
68+
69+
console_dev_data.rx_buffer = console_buf;
70+
console_dev_data.tx_buffer = passthrough_buf;
71+
console_dev_data.other_dev = passtrough_dev;
72+
73+
err = uart_irq_callback_set(console_dev, uart_isr_handler);
74+
printk("uart_irq_callback_set(console_dev) err=%d\n", err);
75+
err = uart_irq_callback_user_data_set(console_dev, uart_isr_handler,
76+
(void *)&console_dev_data);
77+
printk("uart_irq_callback_user_data_set(console_dev) err=%d\n", err);
78+
79+
passtrough_dev_data.rx_buffer = passthrough_buf;
80+
passtrough_dev_data.tx_buffer = console_buf;
81+
passtrough_dev_data.other_dev = console_dev;
82+
83+
err = uart_irq_callback_set(passtrough_dev, uart_isr_handler);
84+
printk("uart_irq_callback_set(passtrough_dev) err=%d\n", err);
85+
err = uart_irq_callback_user_data_set(passtrough_dev, uart_isr_handler,
86+
(void *)&passtrough_dev_data);
87+
printk("uart_irq_callback_user_data_set(passtrough_devle_dev) err=%d\n", err);
88+
89+
k_msleep(10);
90+
printk("Ready\n");
91+
92+
uart_irq_rx_enable(console_dev);
93+
uart_irq_rx_enable(passtrough_dev);
94+
95+
k_sleep(K_FOREVER);
96+
97+
return 0;
98+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
common:
2+
sysbuild: true
3+
depends_on: gpio
4+
tags:
5+
- drivers
6+
- uart
7+
- ci_tests_drivers_uart
8+
harness: pytest
9+
harness_config:
10+
pytest_dut_scope: session
11+
platform_allow:
12+
- nrf5340dk/nrf5340/cpuapp
13+
- nrf54h20dk/nrf54h20/cpuapp
14+
integration_platforms:
15+
- nrf5340dk/nrf5340/cpuapp
16+
- nrf54h20dk/nrf54h20/cpuapp
17+
18+
tests:
19+
drivers.uart_passtrough: {}

0 commit comments

Comments
 (0)