From 9c022262b2731b092a48d8e2213561a2919084c0 Mon Sep 17 00:00:00 2001 From: Lukasz Mrugala Date: Thu, 16 May 2024 07:41:19 +0000 Subject: [PATCH] scripts: twister: Fix Unit Tests on Windows Unit tests were failing on Windows, indicating that current Twister code is not truly multiplatform. Linux-specific code parts were changed into multiplatform ones. Paths were deemed the main culprit, so this commit introduces a new TPath, that aims to fix the Windows limitation of supporting only up to 260 char paths by default. Signed-off-by: Lukasz Mrugala --- .../tests/device/hardware_adapter_test.py | 2 + .../tests/device/qemu_adapter_test.py | 2 + .../pylib/twister/twisterlib/config_parser.py | 2 +- scripts/pylib/twister/twisterlib/coverage.py | 2 + .../pylib/twister/twisterlib/environment.py | 64 ++++--- scripts/pylib/twister/twisterlib/handlers.py | 22 ++- scripts/pylib/twister/twisterlib/reports.py | 19 ++- scripts/pylib/twister/twisterlib/size_calc.py | 4 +- .../pylib/twister/twisterlib/testinstance.py | 2 +- scripts/pylib/twister/twisterlib/testplan.py | 11 +- scripts/pylib/twister/twisterlib/testsuite.py | 27 ++- .../pylib/twister/twisterlib/twister_main.py | 11 +- .../pylib/twister/twisterlib/twister_path.py | 96 +++++++++++ .../pytest_integration/test_harness_pytest.py | 64 ++++--- scripts/tests/twister/test_environment.py | 66 +++++--- scripts/tests/twister/test_errors.py | 3 +- scripts/tests/twister/test_handlers.py | 159 +++++++++++++----- scripts/tests/twister/test_hardwaremap.py | 26 +-- scripts/tests/twister/test_harness.py | 2 +- scripts/tests/twister/test_runner.py | 20 +-- scripts/tests/twister/test_testinstance.py | 72 +++++--- scripts/tests/twister/test_testplan.py | 82 ++++----- scripts/tests/twister/test_testsuite.py | 44 +++-- 23 files changed, 530 insertions(+), 272 deletions(-) create mode 100644 scripts/pylib/twister/twisterlib/twister_path.py diff --git a/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py index 589124f7c22c4..87201f75a1130 100644 --- a/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py +++ b/scripts/pylib/pytest-twister-harness/tests/device/hardware_adapter_test.py @@ -205,6 +205,7 @@ def test_device_log_correct_error_handle(patched_popen, device: HardwareAdapter, assert 'flashing error' in file.readlines() +@pytest.mark.skipif(os.name == 'nt', reason='PTY is not used on Windows.') @mock.patch('twister_harness.device.hardware_adapter.subprocess.Popen') @mock.patch('twister_harness.device.hardware_adapter.serial.Serial') def test_if_hardware_adapter_uses_serial_pty( @@ -238,6 +239,7 @@ def test_if_hardware_adapter_uses_serial_pty( assert not device._serial_pty_proc +@pytest.mark.skipif(os.name == 'nt', reason='PTY is not used on Windows.') def test_if_hardware_adapter_properly_send_data_to_subprocess( device: HardwareAdapter, shell_simulator_path: str ) -> None: diff --git a/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py b/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py index 42406cb702d14..0f441c03387f1 100755 --- a/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py +++ b/scripts/pylib/pytest-twister-harness/tests/device/qemu_adapter_test.py @@ -32,6 +32,7 @@ def test_if_generate_command_creates_proper_command(patched_which, device: QemuA assert device.command == ['west', 'build', '-d', 'build_dir', '-t', 'run'] +@pytest.mark.skipif(os.name == 'nt', reason='QEMU FIFO fails on Windows.') def test_if_qemu_adapter_runs_without_errors(resources, device: QemuAdapter) -> None: fifo_file_path = str(device.device_config.build_dir / 'qemu-fifo') script_path = resources.joinpath('fifo_mock.py') @@ -46,6 +47,7 @@ def test_if_qemu_adapter_runs_without_errors(resources, device: QemuAdapter) -> assert file_lines[-2:] == lines[-2:] +@pytest.mark.skipif(os.name == 'nt', reason='QEMU FIFO fails on Windows.') def test_if_qemu_adapter_raise_exception_due_to_no_fifo_connection(device: QemuAdapter) -> None: device.base_timeout = 0.3 device.command = ['sleep', '1'] diff --git a/scripts/pylib/twister/twisterlib/config_parser.py b/scripts/pylib/twister/twisterlib/config_parser.py index b330e1395d5fa..2405dc0895b47 100644 --- a/scripts/pylib/twister/twisterlib/config_parser.py +++ b/scripts/pylib/twister/twisterlib/config_parser.py @@ -96,7 +96,7 @@ def __init__(self, filename, schema): self.common = {} def load(self): - data = scl.yaml_load_verify(self.filename, self.schema) + data = scl.yaml_load_verify(str(self.filename), self.schema) self.data = data if 'tests' in self.data: diff --git a/scripts/pylib/twister/twisterlib/coverage.py b/scripts/pylib/twister/twisterlib/coverage.py index dd647c60ddc34..6bfe195e94eff 100644 --- a/scripts/pylib/twister/twisterlib/coverage.py +++ b/scripts/pylib/twister/twisterlib/coverage.py @@ -236,6 +236,7 @@ def run_command(self, cmd, coveragelog): "--ignore-errors", "mismatch,mismatch", ] + cmd = [str(c) for c in cmd] cmd_str = " ".join(cmd) logger.debug(f"Running {cmd_str}...") return subprocess.call(cmd, stdout=coveragelog) @@ -363,6 +364,7 @@ def _generate(self, outdir, coveragelog): "--gcov-executable", self.gcov_tool, "-e", "tests/*"] cmd += excludes + mode_options + ["--json", "-o", coveragefile, outdir] + cmd = [str(c) for c in cmd] cmd_str = " ".join(cmd) logger.debug(f"Running {cmd_str}...") subprocess.call(cmd, stdout=coveragelog) diff --git a/scripts/pylib/twister/twisterlib/environment.py b/scripts/pylib/twister/twisterlib/environment.py index 0cf1ea18ce6f7..7cb9cc6fc0772 100644 --- a/scripts/pylib/twister/twisterlib/environment.py +++ b/scripts/pylib/twister/twisterlib/environment.py @@ -18,13 +18,13 @@ from collections.abc import Generator from datetime import datetime, timezone from importlib import metadata -from pathlib import Path import zephyr_module from twisterlib.constants import SUPPORTED_SIMS from twisterlib.coverage import supported_coverage_formats from twisterlib.error import TwisterRuntimeError from twisterlib.log_helper import log_command +from twisterlib.twister_path import TPath logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) @@ -133,7 +133,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: ) case_select.add_argument( - "-T", "--testsuite-root", action="append", default=[], type = norm_path, + "-T", "--testsuite-root", action="append", default=[], type = TPath, help="Base directory to recursively search for test cases. All " "testcase.yaml files under here will be processed. May be " "called multiple times. Defaults to the 'samples/' and " @@ -248,7 +248,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: and global timeout multiplier (this parameter)""") test_xor_subtest.add_argument( - "-s", "--test", "--scenario", action="append", type = norm_path, + "-s", "--test", "--scenario", action="append", type = TPath, help="""Run only the specified test suite scenario. These are named by 'path/relative/to/Zephyr/base/section.subsection_in_testcase_yaml', or just 'section.subsection' identifier. With '--testsuite-root' option @@ -293,16 +293,19 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: # Start of individual args place them in alpha-beta order - board_root_list = [f"{ZEPHYR_BASE}/boards", f"{ZEPHYR_BASE}/subsys/testsuite/boards"] + board_root_list = [ + TPath(f"{ZEPHYR_BASE}/boards"), + TPath(f"{ZEPHYR_BASE}/subsys/testsuite/boards") + ] modules = zephyr_module.parse_modules(ZEPHYR_BASE) for module in modules: board_root = module.meta.get("build", {}).get("settings", {}).get("board_root") if board_root: - board_root_list.append(os.path.join(module.project, board_root, "boards")) + board_root_list.append(TPath(os.path.join(module.project, board_root, "boards"))) parser.add_argument( - "-A", "--board-root", action="append", default=board_root_list, + "-A", "--board-root", action="append", default=board_root_list, type=TPath, help="""Directory to search for board configuration files. All .yaml files in the directory will be processed. The directory should have the same structure in the main Zephyr tree: boards///""") @@ -349,7 +352,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: "--cmake-only", action="store_true", help="Only run cmake, do not build or run.") - parser.add_argument("--coverage-basedir", default=ZEPHYR_BASE, + parser.add_argument("--coverage-basedir", default=ZEPHYR_BASE, type=TPath, help="Base source directory for coverage report.") parser.add_argument("--coverage-platform", action="append", default=[], @@ -374,7 +377,8 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: parser.add_argument( "--test-config", action="store", - default=os.path.join(ZEPHYR_BASE, "tests", "test_config.yaml"), + type=TPath, + default=TPath(os.path.join(ZEPHYR_BASE, "tests", "test_config.yaml")), help="Path to file with plans and test configurations." ) @@ -431,7 +435,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: help="Do not filter based on toolchain, use the set " " toolchain unconditionally") - parser.add_argument("--gcov-tool", type=Path, default=None, + parser.add_argument("--gcov-tool", type=TPath, default=None, help="Path to the gcov tool to use for code coverage " "reports") @@ -513,6 +517,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: "-z", "--size", action="append", metavar='FILENAME', + type=TPath, help="Ignore all other command line options and just produce a report to " "stdout with ROM/RAM section sizes on the specified binary images.") @@ -543,7 +548,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: test_plan_report_xor.add_argument("--list-tags", action="store_true", help="List all tags occurring in selected tests.") - parser.add_argument("--log-file", metavar="FILENAME", action="store", + parser.add_argument("--log-file", metavar="FILENAME", action="store", type=TPath, help="Specify a file where to save logs.") parser.add_argument( @@ -604,15 +609,15 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: ) parser.add_argument( - "-O", "--outdir", - default=os.path.join(os.getcwd(), "twister-out"), + "-O", "--outdir", type=TPath, + default=TPath(os.path.join(os.getcwd(), "twister-out")), help="Output directory for logs and binaries. " "Default is 'twister-out' in the current directory. " "This directory will be cleaned unless '--no-clean' is set. " "The '--clobber-output' option controls what cleaning does.") parser.add_argument( - "-o", "--report-dir", + "-o", "--report-dir", type=TPath, help="""Output reports containing results of the test run into the specified directory. The output will be both in JSON and JUNIT format @@ -663,6 +668,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: "--quarantine-list", action="append", metavar="FILENAME", + type=TPath, help="Load list of test scenarios under quarantine. The entries in " "the file need to correspond to the test scenarios names as in " "corresponding tests .yaml files. These scenarios " @@ -831,7 +837,7 @@ def add_parse_arguments(parser = None) -> argparse.ArgumentParser: parser.add_argument("extra_test_args", nargs=argparse.REMAINDER, help="Additional args following a '--' are passed to the test binary") - parser.add_argument("--alt-config-root", action="append", default=[], + parser.add_argument("--alt-config-root", action="append", default=[], type=TPath, help="Alternative test configuration root/s. When a test is found, " "Twister will check if a test configuration file exist in any of " "the alternative test configuration root folders. For example, " @@ -878,8 +884,8 @@ def parse_arguments( # check again and make sure we have something set if not options.testsuite_root: - options.testsuite_root = [os.path.join(ZEPHYR_BASE, "tests"), - os.path.join(ZEPHYR_BASE, "samples")] + options.testsuite_root = [TPath(os.path.join(ZEPHYR_BASE, "tests")), + TPath(os.path.join(ZEPHYR_BASE, "samples"))] if options.last_metrics or options.compare_report: options.enable_size_report = True @@ -1019,34 +1025,38 @@ def __init__(self, options : argparse.Namespace, default_options=None) -> None: self.test_roots = options.testsuite_root - if not isinstance(options.board_root, list): - self.board_roots = [options.board_root] + if options: + if not isinstance(options.board_root, list): + self.board_roots = [self.options.board_root] + else: + self.board_roots = self.options.board_root + self.outdir = TPath(os.path.abspath(options.outdir)) else: self.board_roots = options.board_root self.outdir = os.path.abspath(options.outdir) - self.snippet_roots = [Path(ZEPHYR_BASE)] + self.snippet_roots = [TPath(ZEPHYR_BASE)] modules = zephyr_module.parse_modules(ZEPHYR_BASE) for module in modules: snippet_root = module.meta.get("build", {}).get("settings", {}).get("snippet_root") if snippet_root: - self.snippet_roots.append(Path(module.project) / snippet_root) + self.snippet_roots.append(TPath(module.project) / snippet_root) - self.soc_roots = [Path(ZEPHYR_BASE), Path(ZEPHYR_BASE) / 'subsys' / 'testsuite'] - self.dts_roots = [Path(ZEPHYR_BASE)] - self.arch_roots = [Path(ZEPHYR_BASE)] + self.soc_roots = [TPath(ZEPHYR_BASE), TPath(ZEPHYR_BASE) / 'subsys' / 'testsuite'] + self.dts_roots = [TPath(ZEPHYR_BASE)] + self.arch_roots = [TPath(ZEPHYR_BASE)] for module in modules: soc_root = module.meta.get("build", {}).get("settings", {}).get("soc_root") if soc_root: - self.soc_roots.append(Path(module.project) / Path(soc_root)) + self.soc_roots.append(TPath(module.project) / TPath(soc_root)) dts_root = module.meta.get("build", {}).get("settings", {}).get("dts_root") if dts_root: - self.dts_roots.append(Path(module.project) / Path(dts_root)) + self.dts_roots.append(TPath(module.project) / TPath(dts_root)) arch_root = module.meta.get("build", {}).get("settings", {}).get("arch_root") if arch_root: - self.arch_roots.append(Path(module.project) / Path(arch_root)) + self.arch_roots.append(TPath(module.project) / TPath(arch_root)) self.hwm = None @@ -1143,7 +1153,7 @@ def run_cmake_script(args=None): return results def get_toolchain(self): - toolchain_script = Path(ZEPHYR_BASE) / Path('cmake/verify-toolchain.cmake') + toolchain_script = TPath(ZEPHYR_BASE) / TPath('cmake/verify-toolchain.cmake') result = self.run_cmake_script([toolchain_script, "FORMAT=json"]) try: diff --git a/scripts/pylib/twister/twisterlib/handlers.py b/scripts/pylib/twister/twisterlib/handlers.py index eb7d90ddd3ebf..dcbb2f930f874 100755 --- a/scripts/pylib/twister/twisterlib/handlers.py +++ b/scripts/pylib/twister/twisterlib/handlers.py @@ -59,13 +59,19 @@ def terminate_process(proc): so we need to use try_kill_process_by_pid. """ - for child in psutil.Process(proc.pid).children(recursive=True): + parent = psutil.Process(proc.pid) + to_terminate = parent.children(recursive=True) + to_terminate.append(parent) + + for p in to_terminate: + with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess): + p.terminate() + _, alive = psutil.wait_procs(to_terminate, timeout=1) + + for p in alive: with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess): - os.kill(child.pid, signal.SIGTERM) - proc.terminate() - # sleep for a while before attempting to kill - time.sleep(0.5) - proc.kill() + p.kill() + _, alive = psutil.wait_procs(to_terminate, timeout=1) class Handler: @@ -203,8 +209,8 @@ def try_kill_process_by_pid(self): pid = int(pid_file.read()) os.unlink(self.pid_fn) self.pid_fn = None # clear so we don't try to kill the binary twice - with contextlib.suppress(ProcessLookupError, psutil.NoSuchProcess): - os.kill(pid, signal.SIGKILL) + p = psutil.Process(pid) + terminate_process(p) def _output_reader(self, proc): self.line = proc.stdout.readline() diff --git a/scripts/pylib/twister/twisterlib/reports.py b/scripts/pylib/twister/twisterlib/reports.py index cbf605542fbbf..e267b27830fd6 100644 --- a/scripts/pylib/twister/twisterlib/reports.py +++ b/scripts/pylib/twister/twisterlib/reports.py @@ -11,10 +11,11 @@ import xml.etree.ElementTree as ET from datetime import datetime from enum import Enum -from pathlib import Path, PosixPath +from pathlib import PosixPath, WindowsPath from colorama import Fore from twisterlib.statuses import TwisterStatus +from twisterlib.twister_path import TPath logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) @@ -171,7 +172,7 @@ def xunit_report_suites(self, json_file, filename): runnable = suite.get('runnable', 0) duration += float(handler_time) ts_status = TwisterStatus(suite.get('status')) - classname = Path(suite.get("name","")).name + classname = TPath(suite.get("name","")).name for tc in suite.get("testcases", []): status = TwisterStatus(tc.get('status')) reason = tc.get('reason', suite.get('reason', 'Unknown')) @@ -253,7 +254,7 @@ def xunit_report(self, json_file, filename, selected_platform=None, full_report= ): continue if full_report: - classname = Path(ts.get("name","")).name + classname = TPath(ts.get("name","")).name for tc in ts.get("testcases", []): status = TwisterStatus(tc.get('status')) reason = tc.get('reason', ts.get('reason', 'Unknown')) @@ -295,8 +296,14 @@ def json_report(self, filename, version="NA", platform=None, filters=None): report_options = self.env.non_default_options() # Resolve known JSON serialization problems. - for k,v in report_options.items(): - report_options[k] = str(v) if type(v) in [PosixPath] else v + for k, v in report_options.items(): + pathlikes = [PosixPath, WindowsPath, TPath] + value = v + if type(v) in pathlikes: + value = os.fspath(v) + if type(v) in [list]: + value = [os.fspath(x) if type(x) in pathlikes else x for x in v] + report_options[k] = value report = {} report["environment"] = {"os": os.name, @@ -342,7 +349,7 @@ def json_report(self, filename, version="NA", platform=None, filters=None): "name": instance.testsuite.name, "arch": instance.platform.arch, "platform": instance.platform.name, - "path": instance.testsuite.source_dir_rel + "path": os.fspath(instance.testsuite.source_dir_rel) } if instance.run_id: suite['run_id'] = instance.run_id diff --git a/scripts/pylib/twister/twisterlib/size_calc.py b/scripts/pylib/twister/twisterlib/size_calc.py index 8034dca44eed3..3f4175e446db7 100644 --- a/scripts/pylib/twister/twisterlib/size_calc.py +++ b/scripts/pylib/twister/twisterlib/size_calc.py @@ -205,7 +205,7 @@ def _check_is_xip(self) -> None: # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK. # GREP can not be used as it returns an error if the symbol is not # found. - is_xip_command = "nm " + self.elf_filename + \ + is_xip_command = "nm " + str(self.elf_filename) + \ " | awk '/CONFIG_XIP/ { print $3 }'" is_xip_output = subprocess.check_output( is_xip_command, shell=True, stderr=subprocess.STDOUT).decode( @@ -221,7 +221,7 @@ def _check_is_xip(self) -> None: def _get_info_elf_sections(self) -> None: """Calculate RAM and ROM usage and information about issues by section""" - objdump_command = "objdump -h " + self.elf_filename + objdump_command = "objdump -h " + str(self.elf_filename) objdump_output = subprocess.check_output( objdump_command, shell=True).decode("utf-8").splitlines() diff --git a/scripts/pylib/twister/twisterlib/testinstance.py b/scripts/pylib/twister/twisterlib/testinstance.py index 80a371f929631..a19ac3a946b94 100644 --- a/scripts/pylib/twister/twisterlib/testinstance.py +++ b/scripts/pylib/twister/twisterlib/testinstance.py @@ -72,7 +72,7 @@ def __init__(self, testsuite, platform, outdir): else: # if suite is not in zephyr, # keep only the part after ".." in reconstructed dir structure - source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1] + source_dir_rel = testsuite.source_dir_rel.get_rel_after_dots() self.build_dir = os.path.join( outdir, platform.normalized_name, diff --git a/scripts/pylib/twister/twisterlib/testplan.py b/scripts/pylib/twister/twisterlib/testplan.py index 638794fb9d51b..07966802322b6 100755 --- a/scripts/pylib/twister/twisterlib/testplan.py +++ b/scripts/pylib/twister/twisterlib/testplan.py @@ -36,6 +36,7 @@ from twisterlib.statuses import TwisterStatus from twisterlib.testinstance import TestInstance from twisterlib.testsuite import TestSuite, scan_testsuite_path +from twisterlib.twister_path import TPath from zephyr_module import parse_modules logger = logging.getLogger('twister') @@ -447,7 +448,9 @@ def add_configurations(self): # Note, internally in twister a board root includes the `boards` folder # but in Zephyr build system, the board root is without the `boards` in folder path. board_roots = [Path(os.path.dirname(root)) for root in self.env.board_roots] - lb_args = Namespace(arch_roots=self.env.arch_roots, soc_roots=self.env.soc_roots, + arch_roots = [Path(root) for root in self.env.arch_roots] + soc_roots = [Path(root) for root in self.env.soc_roots] + lb_args = Namespace(arch_roots=arch_roots, soc_roots=soc_roots, board_roots=board_roots, board=None, board_dir=None) known_boards = list_boards.find_v2_boards(lb_args) @@ -587,13 +590,13 @@ def add_testsuites(self, testsuite_filter=None): logger.debug("Found possible testsuite in " + dirpath) - suite_yaml_path = os.path.join(dirpath, filename) + suite_yaml_path = TPath(os.path.join(dirpath, filename)) suite_path = os.path.dirname(suite_yaml_path) for alt_config_root in self.env.alt_config_root: - alt_config = os.path.join(os.path.abspath(alt_config_root), + alt_config = TPath(os.path.join(os.path.abspath(alt_config_root), os.path.relpath(suite_path, root), - filename) + filename)) if os.path.exists(alt_config): logger.info( f"Using alternative configuration from {os.path.normpath(alt_config)}" diff --git a/scripts/pylib/twister/twisterlib/testsuite.py b/scripts/pylib/twister/twisterlib/testsuite.py index a99e9c88e091a..2114e4afbac30 100644 --- a/scripts/pylib/twister/twisterlib/testsuite.py +++ b/scripts/pylib/twister/twisterlib/testsuite.py @@ -16,6 +16,7 @@ from twisterlib.error import StatusAttributeError, TwisterException, TwisterRuntimeError from twisterlib.mixins import DisablePyTestCollectionMixin from twisterlib.statuses import TwisterStatus +from twisterlib.twister_path import TPath logger = logging.getLogger('twister') logger.setLevel(logging.DEBUG) @@ -294,9 +295,9 @@ def find_c_files_in(path: str, extensions: list = None) -> list: filenames = [] for ext in extensions: # glob.glob('**/*.c') does not pick up the base directory - filenames += [os.path.join(path, x) for x in glob.glob(f'*.{ext}')] + filenames += [TPath(path, x) for x in glob.glob(f'*.{ext}')] # glob matches in subdirectories too - filenames += [os.path.join(path, x) for x in glob.glob(f'**/*.{ext}')] + filenames += [TPath(path, x) for x in glob.glob(f'**/*.{ext}')] # restore previous CWD os.chdir(oldpwd) @@ -367,10 +368,10 @@ def _find_src_dir_path(test_dir_path): optimization reasons it is placed in upper directory. """ src_dir_name = "src" - src_dir_path = os.path.join(test_dir_path, src_dir_name) + src_dir_path = TPath(test_dir_path, src_dir_name) if os.path.isdir(src_dir_path): return src_dir_path - src_dir_path = os.path.join(test_dir_path, "..", src_dir_name) + src_dir_path = TPath(os.path.join(test_dir_path, "..", src_dir_name)) if os.path.isdir(src_dir_path): return src_dir_path return "" @@ -436,7 +437,7 @@ def __init__(self, suite_root, suite_path, name, data=None, detailed_test_id=Tru the testcase.yaml defines multiple tests """ - workdir = os.path.relpath(suite_path, suite_root) + workdir = TPath(os.path.relpath(suite_path, suite_root)) assert self.check_suite_name(name, suite_root, workdir) self.detailed_test_id = detailed_test_id @@ -444,8 +445,8 @@ def __init__(self, suite_root, suite_path, name, data=None, detailed_test_id=Tru self.id = name self.source_dir = suite_path - self.source_dir_rel = os.path.relpath( - os.path.realpath(suite_path), start=canonical_zephyr_base + self.source_dir_rel = TPath( + os.path.relpath(os.path.realpath(suite_path), start=canonical_zephyr_base) ) self.yamlfile = suite_path self.testcases = [] @@ -517,20 +518,18 @@ def add_testcase(self, name, freeform=False): @staticmethod def get_unique(testsuite_root, workdir, name): - canonical_testsuite_root = os.path.realpath(testsuite_root) + canonical_testsuite_root = TPath(os.path.realpath(testsuite_root)) if Path(canonical_zephyr_base) in Path(canonical_testsuite_root).parents: # This is in ZEPHYR_BASE, so include path in name for uniqueness # FIXME: We should not depend on path of test for unique names. - relative_ts_root = os.path.relpath(canonical_testsuite_root, - start=canonical_zephyr_base) + relative_ts_root = TPath(os.path.relpath(canonical_testsuite_root, + start=canonical_zephyr_base)) else: relative_ts_root = "" # workdir can be "." - unique = os.path.normpath( - os.path.join(relative_ts_root, workdir, name) - ).replace(os.sep, '/') - return unique + unique = TPath(os.path.normpath(os.path.join(relative_ts_root, workdir, name))) + return unique.get_rel_after_dots_str() @staticmethod def check_suite_name(name, testsuite_root, workdir): diff --git a/scripts/pylib/twister/twisterlib/twister_main.py b/scripts/pylib/twister/twisterlib/twister_main.py index 9d4485c885ea0..67c14e18b0c57 100644 --- a/scripts/pylib/twister/twisterlib/twister_main.py +++ b/scripts/pylib/twister/twisterlib/twister_main.py @@ -20,6 +20,7 @@ from twisterlib.runner import TwisterRunner from twisterlib.statuses import TwisterStatus from twisterlib.testplan import TestPlan +from twisterlib.twister_path import TPath logger = logging.getLogger("twister") logger.setLevel(logging.DEBUG) @@ -30,7 +31,7 @@ def setup_logging(outdir, log_file, log_level, timestamps): if log_file: fh = logging.FileHandler(log_file) else: - fh = logging.FileHandler(os.path.join(outdir, "twister.log")) + fh = logging.FileHandler(outdir / "twister.log") fh.setLevel(logging.DEBUG) @@ -79,7 +80,7 @@ def main(options: argparse.Namespace, default_options: argparse.Namespace): if os.path.exists(options.outdir): print("Keeping artifacts untouched") elif options.last_metrics: - ls = os.path.join(options.outdir, "twister.json") + ls = options.outdir / "twister.json" if os.path.exists(ls): with open(ls) as fp: previous_results = fp.read() @@ -91,7 +92,7 @@ def main(options: argparse.Namespace, default_options: argparse.Namespace): shutil.rmtree(options.outdir) else: for i in range(1, 100): - new_out = options.outdir + f".{i}" + new_out = TPath(str(options.outdir) + f".{i}") if not os.path.exists(new_out): print(f"Renaming output directory to {new_out}") shutil.move(options.outdir, new_out) @@ -103,7 +104,7 @@ def main(options: argparse.Namespace, default_options: argparse.Namespace): previous_results_file = None os.makedirs(options.outdir, exist_ok=True) if options.last_metrics and previous_results: - previous_results_file = os.path.join(options.outdir, "baseline.json") + previous_results_file = options.outdir / "baseline.json" with open(previous_results_file, "w") as fp: fp.write(previous_results) @@ -151,7 +152,7 @@ def main(options: argparse.Namespace, default_options: argparse.Namespace): ) report = Reporting(tplan, env) - plan_file = os.path.join(options.outdir, "testplan.json") + plan_file = options.outdir / "testplan.json" if not os.path.exists(plan_file): report.json_report(plan_file, env.version) diff --git a/scripts/pylib/twister/twisterlib/twister_path.py b/scripts/pylib/twister/twisterlib/twister_path.py new file mode 100644 index 0000000000000..93b3f311903f7 --- /dev/null +++ b/scripts/pylib/twister/twisterlib/twister_path.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# vim: set syntax=python ts=4 : +# +# Copyright (c) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +from pathlib import Path, PosixPath + +ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") +canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE) + + +class TPath(os.PathLike): + def __init__(self, path, *args): + self.path = Path(path) + + for p in args: + self.path = self._joinpath(p) + + def get_longpath(self): + normalised = os.fspath(self.path.resolve()) + + if isinstance(self.path, PosixPath): + return Path(normalised) + + # On Windows, without this prefix, there is 260-character path length limit. + if not normalised.startswith('\\\\?\\'): + normalised = '\\\\?\\' + normalised + return Path(normalised) + + def _joinpath(self, other): + return self.path.joinpath(str(other)) + + def joinpath(self, other): + res = TPath(self._joinpath(other)) + return res + + def get_rel_str(self): + str_path = os.path.relpath(str(self.path), start=canonical_zephyr_base) + return str_path + + def get_rel_after_dots(self): + str_path = str(self.path) + str_path = str_path.rsplit(os.pardir+os.path.sep, 1)[-1] + return TPath(str_path) + + def get_rel_after_dots_str(self): + str_path = str(self.path) + if os.path.isabs(str_path): + return str_path + str_path = os.path.relpath(str(self.path), start=canonical_zephyr_base) + str_path = str_path.rsplit(os.pardir+os.path.sep, 1)[-1] + return str_path + + def is_dir(self): + return self.path.is_dir() + + def __hash__(self): + return hash(os.path.abspath(os.fspath(self))) + + def __eq__(self, other): + try: + return os.path.abspath(os.fspath(self)) == os.path.abspath(os.fspath(other)) + except TypeError: + return False + + def __lt__(self, other): + return str(self) < str(other) + + def __truediv__(self, other): + return self.joinpath(other) + + def __rtruediv__(self, other): + return self.joinpath(other) + + def __add__(self, other): + return self.joinpath(other) + + def __radd__(self, other): + return self.joinpath(other) + + def __iadd__(self, other): + self.path = self._joinpath(other) + + def __str__(self): + return str(self.get_longpath()) + + def __repr__(self): + return '' + + def __fspath__(self): + return os.fspath(self.path) + + def __format__(self, format_spec): + return self.__str__().format(format_spec) diff --git a/scripts/tests/twister/pytest_integration/test_harness_pytest.py b/scripts/tests/twister/pytest_integration/test_harness_pytest.py index 6ffb928c63a42..8401d3d853bdf 100644 --- a/scripts/tests/twister/pytest_integration/test_harness_pytest.py +++ b/scripts/tests/twister/pytest_integration/test_harness_pytest.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations +import os import pytest import textwrap @@ -16,7 +17,7 @@ @pytest.fixture def testinstance() -> TestInstance: - testsuite = TestSuite('.', 'samples/hello', 'unit.test') + testsuite = TestSuite('.', os.path.join('samples', 'hello'), 'unit.test') testsuite.harness_config = {} testsuite.ignore_faults = False testsuite.sysbuild = False @@ -41,9 +42,9 @@ def test_pytest_command(testinstance: TestInstance, device_type): testinstance.handler.type_str = device_type ref_command = [ 'pytest', - 'samples/hello/pytest', + os.path.join('samples', 'hello', 'pytest'), f'--build-dir={testinstance.build_dir}', - f'--junit-xml={testinstance.build_dir}/report.xml', + f'--junit-xml={os.path.join(testinstance.build_dir, "report.xml")}', f'--device-type={device_type}', '--twister-fixture=fixture1:option1', '--twister-fixture=fixture2' @@ -100,39 +101,52 @@ def test_pytest_command_extra_args_in_options(testinstance: TestInstance): ('pytest_root', 'expected'), [ ( - ['pytest/test_shell_help.py'], - ['samples/hello/pytest/test_shell_help.py'] + [os.path.join('pytest', 'test_shell_help.py')], + [os.path.join('samples', 'hello', 'pytest', 'test_shell_help.py')] ), ( - ['pytest/test_shell_help.py', 'pytest/test_shell_version.py', 'test_dir'], - ['samples/hello/pytest/test_shell_help.py', - 'samples/hello/pytest/test_shell_version.py', - 'samples/hello/test_dir'] + [ + os.path.join('pytest', 'test_shell_help.py'), + os.path.join('pytest', 'test_shell_version.py'), + os.path.join('test_dir') + ], + [ + os.path.join('samples', 'hello', 'pytest', 'test_shell_help.py'), + os.path.join('samples', 'hello', 'pytest', 'test_shell_version.py'), + os.path.join('samples', 'hello', 'test_dir') + ] ), ( - ['../shell/pytest/test_shell.py'], - ['samples/shell/pytest/test_shell.py'] + [os.path.join('..', 'shell', 'pytest', 'test_shell.py')], + [os.path.join('samples', 'shell', 'pytest', 'test_shell.py')] ), ( - ['/tmp/test_temp.py'], - ['/tmp/test_temp.py'] + [os.path.abspath(os.path.join(os.sep, 'tmp', 'test_temp.py'))], + [os.path.abspath(os.path.join(os.sep, 'tmp', 'test_temp.py'))] ), ( - ['~/tmp/test_temp.py'], - ['/home/joe/tmp/test_temp.py'] + [os.path.join('~', 'tmp', 'test_temp.py')], + [os.path.abspath(os.path.join(os.sep, 'home', 'joe', 'tmp', 'test_temp.py'))] ), ( - ['$ZEPHYR_BASE/samples/subsys/testsuite/pytest/shell/pytest'], - ['/zephyr_base/samples/subsys/testsuite/pytest/shell/pytest'] + [os.path.join('$ZEPHYR_BASE', 'samples', 'subsys', 'testsuite', + 'pytest', 'shell', 'pytest')], + [os.path.abspath(os.path.join(os.sep, 'zephyr_base', 'samples', 'subsys', 'testsuite', + 'pytest', 'shell', 'pytest'))] ), ( - ['pytest/test_shell_help.py::test_A', 'pytest/test_shell_help.py::test_B'], - ['samples/hello/pytest/test_shell_help.py::test_A', - 'samples/hello/pytest/test_shell_help.py::test_B'] + [ + os.path.join('pytest', 'test_shell_help.py::test_A'), + os.path.join('pytest', 'test_shell_help.py::test_B') + ], + [ + os.path.join('samples', 'hello', 'pytest', 'test_shell_help.py::test_A'), + os.path.join('samples', 'hello', 'pytest', 'test_shell_help.py::test_B') + ] ), ( - ['pytest/test_shell_help.py::test_A[param_a]'], - ['samples/hello/pytest/test_shell_help.py::test_A[param_a]'] + [os.path.join('pytest', 'test_shell_help.py::test_A[param_a]')], + [os.path.join('samples', 'hello', 'pytest', 'test_shell_help.py::test_A[param_a]')] ) ], ids=[ @@ -147,12 +161,14 @@ def test_pytest_command_extra_args_in_options(testinstance: TestInstance): ] ) def test_pytest_handle_source_list(testinstance: TestInstance, monkeypatch, pytest_root, expected): - monkeypatch.setenv('ZEPHYR_BASE', '/zephyr_base') - monkeypatch.setenv('HOME', '/home/joe') + monkeypatch.setenv('ZEPHYR_BASE', os.path.abspath(os.path.join(os.sep, 'zephyr_base'))) + monkeypatch.setenv('HOME', os.path.abspath(os.path.join(os.sep, 'home', 'joe'))) + monkeypatch.setenv('USERPROFILE', os.path.abspath(os.path.join(os.sep, 'home', 'joe'))) testinstance.testsuite.harness_config['pytest_root'] = pytest_root pytest_harness = Pytest() pytest_harness.configure(testinstance) command = pytest_harness.generate_command() + for pytest_src in expected: assert pytest_src in command diff --git a/scripts/tests/twister/test_environment.py b/scripts/tests/twister/test_environment.py index 3a4e090598b04..291feeb5ee206 100644 --- a/scripts/tests/twister/test_environment.py +++ b/scripts/tests/twister/test_environment.py @@ -25,13 +25,6 @@ ['--short-build-path', '-k'], '--short-build-path requires Ninja to be enabled' ), - ( - 'nt', - None, - None, - ['--device-serial-pty', 'dummy'], - '--device-serial-pty is not supported on Windows OS' - ), ( None, None, @@ -129,24 +122,39 @@ ), ] +TESTIDS_1 = [ + 'short build path without ninja', + 'west runner without west flash', + 'west-flash without device-testing', + 'valgrind without executable', + 'device serial without platform', + 'device serial with multiple platforms', + 'device flash with test without device testing', + 'shuffle-tests without subset', + 'shuffle-tests-seed without shuffle-tests', + 'unrecognised argument', + 'pytest-twister-harness installed' +] + +if os.name == 'nt': + + TESTDATA_1 += [ + ( + 'nt', + None, + None, + ['--device-serial-pty', 'dummy'], + '--device-serial-pty is not supported on Windows OS' + ) + ] + + TESTIDS_1 += ['device-serial-pty on Windows'] + @pytest.mark.parametrize( 'os_name, which_dict, pytest_plugin, args, expected_error', TESTDATA_1, - ids=[ - 'short build path without ninja', - 'device-serial-pty on Windows', - 'west runner without west flash', - 'west-flash without device-testing', - 'valgrind without executable', - 'device serial without platform', - 'device serial with multiple platforms', - 'device flash with test without device testing', - 'shuffle-tests without subset', - 'shuffle-tests-seed without shuffle-tests', - 'unrecognised argument', - 'pytest-twister-harness installed' - ] + ids=TESTIDS_1 ) def test_parse_arguments_errors( caplog, @@ -245,8 +253,11 @@ def test_parse_arguments(zephyr_base, additional_args): options = twisterlib.environment.parse_arguments(parser, args) - assert os.path.join(zephyr_base, 'tests') in options.testsuite_root - assert os.path.join(zephyr_base, 'samples') in options.testsuite_root + expected_tests_path = os.path.realpath(os.path.join(zephyr_base, 'tests')) + expected_samples_path = os.path.realpath(os.path.join(zephyr_base, 'samples')) + + assert expected_tests_path in options.testsuite_root + assert expected_samples_path in options.testsuite_root assert options.enable_size_report @@ -345,7 +356,7 @@ def mocked_abspath(path): if path == 'dummy_abspath': return 'dummy_abspath' elif isinstance(path, mock.Mock): - return None + return '' else: return original_abspath(path) @@ -449,7 +460,7 @@ def mocked_abspath(path): if path == 'dummy_abspath': return 'dummy_abspath' elif isinstance(path, mock.Mock): - return None + return '' else: return original_abspath(path) @@ -458,8 +469,7 @@ def mocked_abspath(path): with mock.patch('subprocess.run', mock.Mock(side_effect=mock_run)): twister_env.check_zephyr_version() - print(expected_logs) - print(caplog.text) + assert twister_env.version == expected_version assert twister_env.commit_date == expected_commit_date assert all([expected_log in caplog.text for expected_log in expected_logs]) @@ -577,7 +587,7 @@ def mocked_abspath(path): if path == 'dummy_abspath': return 'dummy_abspath' elif isinstance(path, mock.Mock): - return None + return '' else: return original_abspath(path) diff --git a/scripts/tests/twister/test_errors.py b/scripts/tests/twister/test_errors.py index 0396ee109f8f4..2b168e3497b78 100644 --- a/scripts/tests/twister/test_errors.py +++ b/scripts/tests/twister/test_errors.py @@ -8,6 +8,7 @@ import os import pytest +import re from pathlib import Path from twisterlib.error import StatusAttributeError @@ -21,7 +22,7 @@ def test_configurationerror(): expected_err = f'{os.path.join("some", "path")}: dummy message' - with pytest.raises(ConfigurationError, match=expected_err): + with pytest.raises(ConfigurationError, match=re.escape(expected_err)): raise ConfigurationError(cfile, message) diff --git a/scripts/tests/twister/test_handlers.py b/scripts/tests/twister/test_handlers.py index e4710060a35e3..c735dfbd118e7 100644 --- a/scripts/tests/twister/test_handlers.py +++ b/scripts/tests/twister/test_handlers.py @@ -32,6 +32,7 @@ BinaryHandler, DeviceHandler, QEMUHandler, + QEMUWinHandler, SimulationHandler ) from twisterlib.hardwaremap import ( @@ -77,19 +78,21 @@ def time(self): return Counter() - +TESTIDS_1 = ['import pty nt', 'import serial+pty posix'] TESTDATA_1 = [ - (True, False, 'posix', ['Install pyserial python module with pip to use' \ - ' --device-testing option.'], None), (False, True, 'nt', [], None), (True, True, 'posix', ['Install pyserial python module with pip to use' \ ' --device-testing option.'], ImportError), ] +if sys.platform == 'linux': + TESTDATA_1.append((True, False, 'posix', ['Install pyserial python module with pip to use --device-testing option.'], None)) + TESTIDS_1.append('import serial') + @pytest.mark.parametrize( 'fail_serial, fail_pty, os_name, expected_outs, expected_error', TESTDATA_1, - ids=['import serial', 'import pty nt', 'import serial+pty posix'] + ids=TESTIDS_1 ) def test_imports( capfd, @@ -107,8 +110,8 @@ def find_spec(self, fullname, path, target=None): raise ImportError() modules_mock = sys.modules.copy() - modules_mock['serial'] = None if fail_serial else modules_mock['serial'] - modules_mock['pty'] = None if fail_pty else modules_mock['pty'] + modules_mock['serial'] = None if fail_serial else modules_mock.get('serial') + modules_mock['pty'] = None if fail_pty else modules_mock.get('pty') meta_path_mock = sys.meta_path[:] meta_path_mock.insert(0, ImportRaiser()) @@ -214,65 +217,108 @@ def test_handler_missing_suite_name(mocked_instance): def test_handler_terminate(mocked_instance): - def mock_kill_function(pid, sig): + def mock_kill_function(pid): if pid < 0: raise ProcessLookupError + def mock_process_init(pid): + return mock.Mock( + pid = pid, + kill = mock.Mock(side_effect=lambda: mock_kill_function(pid)), + terminate = mock.Mock(side_effect=lambda: mock_kill_function(pid)), + children = mock.Mock(return_value=[]), + wait = mock.Mock() + ) + + def mock_wait_procs(proc_list, timeout=None): + gone = [] + alive = [] + for p in proc_list: + if p.pid > 0: + gone.append(p) + else: + if p.pid == 0 and len(p.kill.call_args_list) > 0: + gone.append(p) + else: + alive.append(p) + + return gone, alive + + mock_parent = mock_process_init(0) + mock_child_neg1 = mock_process_init(-1) + mock_child1 = mock_process_init(1) + mock_child2 = mock_process_init(2) + mock_parent.children = mock.Mock(return_value=[mock_child1, mock_child2]) + + def mock_process_get(pid): + if pid == -1: + return mock_child_neg1 + if pid == 0: + return mock_parent + if pid == 1: + return mock_child1 + if pid == 2: + return mock_child2 + raise ProcessLookupError + instance = mocked_instance handler = Handler(instance, 'build', mock.Mock()) - mock_process = mock.Mock() - mock_child1 = mock.Mock(pid=1) - mock_child2 = mock.Mock(pid=2) - mock_process.children = mock.Mock(return_value=[mock_child1, mock_child2]) - mock_proc = mock.Mock(pid=0) mock_proc.terminate = mock.Mock(return_value=None) mock_proc.kill = mock.Mock(return_value=None) - with mock.patch('psutil.Process', return_value=mock_process), \ - mock.patch( - 'os.kill', - mock.Mock(side_effect=mock_kill_function) - ) as mock_kill: + with mock.patch('psutil.Process', side_effect=mock_process_get), \ + mock.patch('psutil.wait_procs', side_effect=mock_wait_procs): handler.terminate(mock_proc) assert handler.terminated - mock_proc.terminate.assert_called_once() - mock_proc.kill.assert_called_once() - mock_kill.assert_has_calls( - [mock.call(1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] - ) + mock_parent.terminate.assert_called_once() + mock_child1.terminate.assert_called_once() + mock_child2.terminate.assert_called_once() + mock_parent.kill.assert_called_once() - mock_child_neg1 = mock.Mock(pid=-1) - mock_process.children = mock.Mock( + mock_parent.children = mock.Mock( return_value=[mock_child_neg1, mock_child2] ) handler.terminated = False - mock_kill.reset_mock() + mock_parent.reset_mock() + mock_parent.terminate.reset_mock() + mock_parent.kill.reset_mock() + mock_child2.reset_mock() + mock_child2.terminate.reset_mock() + mock_child2.kill.reset_mock() handler.terminate(mock_proc) - mock_kill.assert_has_calls( - [mock.call(-1, signal.SIGTERM), mock.call(2, signal.SIGTERM)] - ) + mock_parent.terminate.assert_called_once() + mock_child_neg1.terminate.assert_called_once() + mock_child2.terminate.assert_called_once() + mock_child_neg1.kill.assert_called_once() + mock_parent.kill.assert_called_once() def test_binaryhandler_try_kill_process_by_pid(mocked_instance): - def mock_kill_function(pid, sig): + def mock_kill_function(pid): if pid < 0: raise ProcessLookupError + def mock_process_init(pid): + return mock.Mock( + pid = pid, + kill = mock.Mock(side_effect=lambda: mock_kill_function(pid)), + terminate = mock.Mock(side_effect=lambda: mock_kill_function(pid)), + children = mock.Mock(return_value=[]), + wait = mock.Mock() + ) + instance = mocked_instance handler = BinaryHandler(instance, 'build', mock.Mock()) handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') - with mock.patch( - 'os.kill', - mock.Mock(side_effect=mock_kill_function) - ) as mock_kill, \ + with mock.patch('psutil.Process', mock.Mock(side_effect=mock_process_init)) as mock_proc, \ mock.patch('os.unlink', mock.Mock()) as mock_unlink: with mock.patch('builtins.open', mock.mock_open(read_data='1')): handler.try_kill_process_by_pid() @@ -280,10 +326,10 @@ def mock_kill_function(pid, sig): mock_unlink.assert_called_once_with( os.path.join('dummy', 'path', 'to', 'pid.pid') ) - mock_kill.assert_called_once_with(1, signal.SIGKILL) + mock_proc.assert_called_with(1) mock_unlink.reset_mock() - mock_kill.reset_mock() + mock_proc.reset_mock() handler.pid_fn = os.path.join('dummy', 'path', 'to', 'pid.pid') with mock.patch('builtins.open', mock.mock_open(read_data='-1')): @@ -292,7 +338,7 @@ def mock_kill_function(pid, sig): mock_unlink.assert_called_once_with( os.path.join('dummy', 'path', 'to', 'pid.pid') ) - mock_kill.assert_called_once_with(-1, signal.SIGKILL) + mock_proc.assert_called_with(-1) TESTDATA_3 = [ @@ -422,7 +468,7 @@ def wait(self, *args, **kwargs): '--log-file=build_dir/valgrind.log', '--track-origins=yes', 'generator']), (False, True, False, 123, None, ['generator', '-C', 'build_dir', 'run', '--seed=123']), - (False, False, False, None, ['ex1', 'ex2'], ['build_dir/zephyr/zephyr.exe', 'ex1', 'ex2']), + (False, False, False, None, ['ex1', 'ex2'], [os.path.join('build_dir', 'zephyr', 'zephyr.exe'), 'ex1', 'ex2']), ] @pytest.mark.parametrize( @@ -1307,16 +1353,22 @@ def mock_serial(*args, **kwargs): ser_pty_process.communicate.assert_called_once() +TESTIDS_16 = ['no pty'] TESTDATA_16 = [ - ('dummy1 dummy2', None, 'slave name'), - ('dummy1,dummy2', CalledProcessError, None), (None, None, 'dummy hardware serial'), ] +if os.name != 'nt': + TESTIDS_16.extend(['pty', 'pty process error']) + TESTDATA_16.extend([ + ('dummy1 dummy2', None, 'slave name'), + ('dummy1,dummy2', CalledProcessError, None), + ]) + @pytest.mark.parametrize( 'serial_pty, popen_exception, expected_device', TESTDATA_16, - ids=['pty', 'pty process error', 'no pty'] + ids=TESTIDS_16 ) def test_devicehandler_get_serial_device( mocked_instance, @@ -1338,8 +1390,8 @@ def mock_popen(command, *args, **kwargs): ttyname_mock = mock.Mock(side_effect=lambda x: x + ' name') with mock.patch('subprocess.Popen', popen_mock), \ - mock.patch('pty.openpty', openpty_mock), \ - mock.patch('os.ttyname', ttyname_mock): + mock.patch('pty.openpty', openpty_mock) if os.name != 'nt' else nullcontext(), \ + mock.patch('os.ttyname', ttyname_mock) if os.name != 'nt' else nullcontext(): result = handler._get_serial_device(serial_pty, hardware_serial) if popen_exception: @@ -1506,6 +1558,7 @@ def mock_popen(command, *args, **kwargs): (False, False, False), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( 'ignore_qemu_crash, expected_ignore_crash, expected_ignore_unexpected_eof', TESTDATA_18, @@ -1525,6 +1578,7 @@ def test_qemuhandler_init( assert handler.ignore_unexpected_eof == expected_ignore_unexpected_eof +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') def test_qemuhandler_get_cpu_time(): def mock_process(pid): return mock.Mock( @@ -1557,6 +1611,7 @@ def mock_process(pid): ), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( 'self_sysbuild, self_build_dir, build_dir, expected', TESTDATA_19, @@ -1599,6 +1654,7 @@ def test_qemuhandler_get_default_domain_build_dir( ), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( 'self_log, self_pid_fn, sysbuild_build_dir, exists_pid_fn', TESTDATA_20, @@ -1634,6 +1690,7 @@ def test_qemuhandler_set_qemu_filenames( os.path.sep + 'qemu.pid') +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') def test_qemuhandler_create_command(mocked_instance): sysbuild_build_dir = os.path.join('sysbuild', 'dummy_dir') @@ -1698,6 +1755,7 @@ def test_qemuhandler_create_command(mocked_instance): ), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( 'self_returncode, self_ignore_qemu_crash,' \ ' self_instance_reason, harness_status, is_timeout,' \ @@ -1735,6 +1793,7 @@ def test_qemuhandler_update_instance_info( ) +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') def test_qemuhandler_thread_get_fifo_names(): fifo_fn = 'dummy' @@ -1752,6 +1811,7 @@ def test_qemuhandler_thread_get_fifo_names(): (TwisterStatus.NONE, None, TwisterStatus.NONE, 'Unknown'), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( '_status, _reason, expected_status, expected_reason', TESTDATA_24, @@ -1855,6 +1915,7 @@ def test_qemuhandler_thread_update_instance_info( ), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( 'content, timeout, pid, harness_statuses, cputime, capture_coverage,' \ ' expected_status, expected_reason, expected_log_calls', @@ -1967,6 +2028,7 @@ def mocked_open(filename, *args, **kwargs): (False, True, TwisterStatus.FAIL, False, ['return code from QEMU (None): 1']), ] +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') @pytest.mark.parametrize( 'isatty, do_timeout, harness_status, exists_pid_fn, expected_logs', TESTDATA_26, @@ -2044,10 +2106,21 @@ def mock_filenames(sysbuild_build_dir): assert all([expected_log in caplog.text for expected_log in expected_logs]) -def test_qemuhandler_get_fifo(mocked_instance): +@pytest.mark.skipif(os.name == 'nt', reason='QEMUWinHandler is used on Windows, not QEMUHandler.') +def test_qemuhandler_get_fifo_linux(mocked_instance): handler = QEMUHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) handler.fifo_fn = 'fifo_fn' result = handler.get_fifo() assert result == 'fifo_fn' + + +@pytest.mark.skipif(os.name != 'nt', reason='QEMUWinHandler is used only on Windows.') +def test_qemuhandler_get_fifo_windows(mocked_instance): + handler = QEMUWinHandler(mocked_instance, 'build', mock.Mock(timeout_multiplier=1)) + handler.fifo_fn = 'fifo_fn' + + result = handler.get_fifo() + + assert result == 'fifo_fn' diff --git a/scripts/tests/twister/test_hardwaremap.py b/scripts/tests/twister/test_hardwaremap.py index 5dc61dc1d9ec0..99ed7228e12f5 100644 --- a/scripts/tests/twister/test_hardwaremap.py +++ b/scripts/tests/twister/test_hardwaremap.py @@ -7,6 +7,7 @@ """ import mock +import platform import pytest import sys @@ -339,16 +340,8 @@ def mock_open(*args, **kwargs): assert all([getattr(dut, k) == v for k, v in expected[dut.id].items()]) +TESTIDS_4 = ['no map (not linux)', 'no map (nonpersistent)'] TESTDATA_4 = [ - ( - True, - 'Linux', - ['', '', '', - '', '', - '', - '', - ''] - ), ( True, 'nt', @@ -369,11 +362,24 @@ def mock_open(*args, **kwargs): ) ] +if platform.system == 'Linux': + TESTDATA_4.append( + ( + True, + 'Linux', + ['', '', '', + '', '', + '', + '', + ''] + ) + ) + TESTIDS_4.append('linux persistent map') @pytest.mark.parametrize( 'persistent, system, expected_reprs', TESTDATA_4, - ids=['linux persistent map', 'no map (not linux)', 'no map (nonpersistent)'] + ids=TESTIDS_4 ) def test_hardwaremap_scan( caplog, diff --git a/scripts/tests/twister/test_harness.py b/scripts/tests/twister/test_harness.py index 1e18e29915eb2..3a48393b36fac 100644 --- a/scripts/tests/twister/test_harness.py +++ b/scripts/tests/twister/test_harness.py @@ -1131,7 +1131,7 @@ def test_bsim_build(monkeypatch, tmp_path): build_dir = tmp_path / "build_dir" os.makedirs(build_dir) mocked_instance.build_dir = str(build_dir) - mocked_instance.name = "platform_name/test/dummy.test" + mocked_instance.name = os.path.join('platform_name', 'test', 'dummy.test') mocked_instance.testsuite.harness_config = {} harness = Bsim() diff --git a/scripts/tests/twister/test_runner.py b/scripts/tests/twister/test_runner.py index 00a95cb97080f..6f194777af4aa 100644 --- a/scripts/tests/twister/test_runner.py +++ b/scripts/tests/twister/test_runner.py @@ -114,7 +114,7 @@ class MockHandler: ["extra_overlay.conf"], ["x.overlay;y.overlay", "z.overlay"], ["cmake1=foo", "cmake2=bar"], - "/builddir/", + os.path.join('', 'builddir', ''), ) == [ "-DCONFIG_t=\"test\"", "-Dcmake1=foo", "-Dcmake2=bar", @@ -122,8 +122,8 @@ class MockHandler: "-Dhandler_arg1", "-Dhandler_arg2", "-DCONF_FILE=a.conf;b.conf;c.conf", "-DDTC_OVERLAY_FILE=x.overlay;y.overlay;z.overlay", - "-DOVERLAY_CONFIG=extra_overlay.conf " - "/builddir/twister/testsuite_extra.conf", + "-DOVERLAY_CONFIG=extra_overlay.conf " + \ + os.path.join('', 'builddir', 'twister', 'testsuite_extra.conf'), ]) @@ -1940,7 +1940,7 @@ def mock_exists(fname): ( { 'CMakeCache.txt': mock.mock_open( - read_data='canonical/zephyr/base/dummy.file: ERROR' + read_data=f'{os.path.join("canonical", "zephyr", "base", "dummy.file")}: ERROR' ) }, { @@ -1950,7 +1950,7 @@ def mock_exists(fname): ( { os.path.join('zephyr', 'runners.yaml'): mock.mock_open( - read_data='There was canonical/zephyr/base/dummy.file here' + read_data=f'There was {os.path.join("canonical", "zephyr", "base", "dummy.file")} here' ) }, { @@ -1969,7 +1969,7 @@ def test_projectbuilder_sanitize_zephyr_base_from_files( text_mocks, expected_write_texts ): - build_dir_path = 'canonical/zephyr/base/build_dir/' + build_dir_path = os.path.join('canonical', 'zephyr', 'base', 'build_dir', '') def mock_exists(fname): if not fname.startswith(build_dir_path): @@ -1990,7 +1990,7 @@ def mock_open(fname, *args, **kwargs): with mock.patch('os.path.exists', mock_exists), \ mock.patch('builtins.open', mock_open), \ mock.patch('twisterlib.runner.canonical_zephyr_base', - 'canonical/zephyr/base'): + os.path.join('canonical', 'zephyr', 'base', '')): pb._sanitize_zephyr_base_from_files() for fname, fhandler in text_mocks.items(): @@ -2478,12 +2478,13 @@ def test_projectbuilder_calc_size( ('linux', '???', {'jobs': 4}, False, 4, 'JobClient'), ] +@pytest.mark.skipif(sys.platform != 'linux', reason='JobClient family only works on Linux.') @pytest.mark.parametrize( 'platform, os_name, options, jobclient_from_environ, expected_jobs,' \ ' expected_jobserver', TESTDATA_17, ids=['GNUMakeJobClient', 'GNUMakeJobServer', - 'JobClient', 'Jobclient+options'] + 'JobClient', 'JobClient+options'] ) def test_twisterrunner_run( caplog, @@ -2751,9 +2752,6 @@ def mock_get_cmake_filter_stages(filter, keys): if retry_build_errors: tr.get_cmake_filter_stages.assert_any_call('some', mock.ANY) - print(pipeline_mock.put.call_args_list) - print([mock.call(el) for el in expected_pipeline_elements]) - assert pipeline_mock.put.call_args_list == \ [mock.call(el) for el in expected_pipeline_elements] diff --git a/scripts/tests/twister/test_testinstance.py b/scripts/tests/twister/test_testinstance.py index 07cb72fbd2cc9..31950b3b40003 100644 --- a/scripts/tests/twister/test_testinstance.py +++ b/scripts/tests/twister/test_testinstance.py @@ -21,7 +21,7 @@ from twisterlib.testinstance import TestInstance from twisterlib.error import BuildError from twisterlib.runner import TwisterRunner -from twisterlib.handlers import QEMUHandler +from twisterlib.handlers import QEMUHandler, QEMUWinHandler from expr_parser import reserved @@ -35,8 +35,11 @@ (False, True, "sensor", "native", "", True, [], (True, False)), ] @pytest.mark.parametrize( - "build_only, slow, harness, platform_type, platform_sim, device_testing,fixture, expected", - TESTDATA_PART_1 + "build_only, slow, harness, platform_type, platform_sim, device_testing, fixture, expected", + TESTDATA_PART_1, + ids=['console, na, qemu', 'console, native, qemu', 'console, native, nsim, build only', + 'console, native, renode, build_only, slow', 'sensor, native', + 'sensor, na', 'sensor, native, slow, device_testing'] ) def test_check_build_or_run( class_testplan, @@ -56,9 +59,9 @@ def test_check_build_or_run( Scenario 2: Test if build_only is enabled when the OS is Windows""" class_testplan.testsuites = all_testsuites_dict - testsuite = class_testplan.testsuites.get('scripts/tests/twister/test_data/testsuites/tests/' - 'test_a/test_a.check_1') - print(testsuite) + testsuite_path = 'scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1' + testsuite_path = testsuite_path.replace('/', os.sep) + testsuite = class_testplan.testsuites.get(testsuite_path) class_testplan.platforms = platforms_list platform = class_testplan.get_platform("demo_board_2") @@ -78,9 +81,11 @@ def test_check_build_or_run( sim_name=platform_sim ) ) - run = testinstance.check_runnable(env.options) - _, r = expected - assert run == r + + with mock.patch('os.name', 'posix'): + run = testinstance.check_runnable(env.options) + _, r = expected + assert run == r with mock.patch('os.name', 'nt'): # path to QEMU binary is not in QEMU_BIN_PATH environment variable @@ -95,15 +100,15 @@ def test_check_build_or_run( TESTDATA_PART_2 = [ - (True, True, True, ["demo_board_2/unit_testing"], "native", + (True, True, True, ["demo_board_2"], "native", None, '\nCONFIG_COVERAGE=y\nCONFIG_COVERAGE_DUMP=y\nCONFIG_ASAN=y\nCONFIG_UBSAN=y'), - (True, False, True, ["demo_board_2/unit_testing"], "native", + (True, False, True, ["demo_board_2"], "native", None, '\nCONFIG_COVERAGE=y\nCONFIG_COVERAGE_DUMP=y\nCONFIG_ASAN=y'), - (False, False, True, ["demo_board_2/unit_testing"], 'native', + (False, False, True, ["demo_board_2"], 'native', None, '\nCONFIG_COVERAGE=y\nCONFIG_COVERAGE_DUMP=y'), - (True, False, True, ["demo_board_2/unit_testing"], 'mcu', + (True, False, True, ["demo_board_2"], 'mcu', None, '\nCONFIG_COVERAGE=y\nCONFIG_COVERAGE_DUMP=y'), - (False, False, False, ["demo_board_2/unit_testing"], 'native', None, ''), + (False, False, False, ["demo_board_2"], 'native', None, ''), (False, False, True, ['demo_board_1'], 'native', None, ''), (True, False, False, ["demo_board_2"], 'native', None, '\nCONFIG_ASAN=y'), (False, True, False, ["demo_board_2"], 'native', None, '\nCONFIG_UBSAN=y'), @@ -114,7 +119,7 @@ def test_check_build_or_run( (False, False, False, ["demo_board_2"], 'native', ["arch:arm:CONFIG_LOG=y"], ''), (False, False, False, ["demo_board_2"], 'native', - ["platform:demo_board_2/unit_testing:CONFIG_LOG=y"], 'CONFIG_LOG=y'), + ["platform:demo_board_2:CONFIG_LOG=y"], ''), (False, False, False, ["demo_board_2"], 'native', ["platform:demo_board_1:CONFIG_LOG=y"], ''), ] @@ -138,14 +143,15 @@ def test_create_overlay( ): """Test correct content is written to testcase_extra.conf based on if conditions.""" class_testplan.testsuites = all_testsuites_dict - testcase = class_testplan.testsuites.get('scripts/tests/twister/test_data/testsuites/samples/' - 'test_app/sample_test.app') + testcase_path = 'scripts/tests/twister/test_data/testsuites/samples/test_app/sample_test.app' + testcase_path = testcase_path.replace('/', os.sep) + testcase = class_testplan.testsuites.get(testcase_path) if extra_configs: testcase.extra_configs = extra_configs class_testplan.platforms = platforms_list - platform = class_testplan.get_platform("demo_board_2") + platform = class_testplan.get_platform("demo_board_2/unit_testing") testinstance = TestInstance(testcase, platform, class_testplan.env.outdir) platform.type = platform_type @@ -154,8 +160,9 @@ def test_create_overlay( def test_calculate_sizes(class_testplan, all_testsuites_dict, platforms_list): """ Test Calculate sizes method for zephyr elf""" class_testplan.testsuites = all_testsuites_dict - testcase = class_testplan.testsuites.get('scripts/tests/twister/test_data/testsuites/samples/' - 'test_app/sample_test.app') + testcase_path = 'scripts/tests/twister/test_data/testsuites/samples/test_app/sample_test.app' + testcase_path = testcase_path.replace('/', os.sep) + testcase = class_testplan.testsuites.get(testcase_path) class_testplan.platforms = platforms_list platform = class_testplan.get_platform("demo_board_2") testinstance = TestInstance(testcase, platform, class_testplan.env.outdir) @@ -204,6 +211,7 @@ def sample_testinstance(all_testsuites_dict, class_testplan, platforms_list, req testsuite_path += '/samples/test_app/sample_test.app' elif request.param['testsuite_kind'] == 'tests': testsuite_path += '/tests/test_a/test_a.check_1' + testsuite_path = testsuite_path.replace('/', os.sep) class_testplan.testsuites = all_testsuites_dict testsuite = class_testplan.testsuites.get(testsuite_path) @@ -221,19 +229,22 @@ def sample_testinstance(all_testsuites_dict, class_testplan, platforms_list, req @pytest.mark.parametrize('detailed_test_id', TESTDATA_1) def test_testinstance_init(all_testsuites_dict, class_testplan, platforms_list, detailed_test_id): - testsuite_path = 'scripts/tests/twister/test_data/testsuites/samples/test_app/sample_test.app' + testsuite_rel_path = 'scripts/tests/twister/test_data/testsuites/samples/test_app/sample_test.app' + testsuite_path = testsuite_rel_path.replace('/', os.sep) class_testplan.testsuites = all_testsuites_dict testsuite = class_testplan.testsuites.get(testsuite_path) testsuite.detailed_test_id = detailed_test_id class_testplan.platforms = platforms_list - platform = class_testplan.get_platform("demo_board_2/unit_testing") + platform = class_testplan.get_platform("demo_board_2") testinstance = TestInstance(testsuite, platform, class_testplan.env.outdir) if detailed_test_id: - assert testinstance.build_dir == os.path.join(class_testplan.env.outdir, platform.normalized_name, testsuite_path) + expected_path = os.path.join(class_testplan.env.outdir, platform.normalized_name, testsuite_rel_path) else: - assert testinstance.build_dir == os.path.join(class_testplan.env.outdir, platform.normalized_name, testsuite.source_dir_rel, testsuite.name) + expected_path = os.path.join(class_testplan.env.outdir, platform.normalized_name, testsuite.source_dir_rel, testsuite.name) + expected_path = os.path.abspath(os.fspath(expected_path)) + assert testinstance.build_dir == expected_path @pytest.mark.parametrize('testinstance', [{'testsuite_kind': 'sample'}], indirect=True) @@ -256,8 +267,6 @@ def test_testinstance_record(testinstance): ) as mock_writerows: testinstance.record(recording) - print(mock_file.mock_calls) - mock_file.assert_called_with( os.path.join(testinstance.build_dir, 'recording.csv'), 'w' @@ -282,6 +291,7 @@ def test_testinstance_add_filter(testinstance): def test_testinstance_init_cases(all_testsuites_dict, class_testplan, platforms_list): testsuite_path = 'scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1' + testsuite_path = testsuite_path.replace('/', os.sep) class_testplan.testsuites = all_testsuites_dict testsuite = class_testplan.testsuites.get(testsuite_path) class_testplan.platforms = platforms_list @@ -336,10 +346,12 @@ def test_testinstance_add_missing_case_status(testinstance, reason, expected_rea def test_testinstance_dunders(all_testsuites_dict, class_testplan, platforms_list): testsuite_path = 'scripts/tests/twister/test_data/testsuites/samples/test_app/sample_test.app' + testsuite_name = os.path.normpath(testsuite_path) + testsuite_path = testsuite_path.replace('/', os.sep) class_testplan.testsuites = all_testsuites_dict testsuite = class_testplan.testsuites.get(testsuite_path) class_testplan.platforms = platforms_list - platform = class_testplan.get_platform("demo_board_2") + platform = class_testplan.get_platform("demo_board_2/unit_testing") testinstance = TestInstance(testsuite, platform, class_testplan.env.outdir) testinstance_copy = TestInstance(testsuite, platform, class_testplan.env.outdir) @@ -440,7 +452,9 @@ def test_testinstance_testsuite_runnable( expected_can_run ): testsuite_path = 'scripts/tests/twister/test_data/testsuites/samples/test_app/sample_test.app' + testsuite_path = testsuite_path.replace('/', os.sep) class_testplan.testsuites = all_testsuites_dict + print(class_testplan.testsuites) testsuite = class_testplan.testsuites.get(testsuite_path) testsuite.harness = harness @@ -467,7 +481,7 @@ def test_testinstance_testsuite_runnable( ' expected_handler_type, expected_handler_args, expected_handler_ready', TESTDATA_4, ids=['preexisting handler', 'device testing', 'qemu simulation', - 'non-qemu simulation with exec', 'unit teting', 'no handler'] + 'non-qemu simulation with exec', 'unit testing', 'no handler'] ) @pytest.mark.parametrize('testinstance', [{'testsuite_kind': 'tests'}], indirect=True) def test_testinstance_setup_handler( @@ -492,12 +506,14 @@ def test_testinstance_setup_handler( ) with mock.patch.object(QEMUHandler, 'get_fifo', return_value=1), \ + mock.patch.object(QEMUWinHandler, 'get_fifo', return_value=1), \ mock.patch('shutil.which', return_value=True): testinstance.setup_handler(env) if expected_handler_type: assert testinstance.handler.type_str == expected_handler_type assert testinstance.handler.ready == expected_handler_ready + assert all([arg in testinstance.handler.args for arg in expected_handler_args]) diff --git a/scripts/tests/twister/test_testplan.py b/scripts/tests/twister/test_testplan.py index a885d541f1558..bc01daa480b70 100644 --- a/scripts/tests/twister/test_testplan.py +++ b/scripts/tests/twister/test_testplan.py @@ -49,8 +49,10 @@ def test_testplan_add_testsuites_short(class_testplan): assert sorted(testsuite_list) == sorted(expected_testsuites) # Test 2 : Assert Testcase name is expected & all testsuites values are testcase class objects - suite = class_testplan.testsuites.get(tests_rel_dir + 'test_a/test_a.check_1') - assert suite.name == tests_rel_dir + 'test_a/test_a.check_1' + testsuite_rel_path = tests_rel_dir + 'test_a/test_a.check_1' + testsuite_rel_path = testsuite_rel_path.replace('/', os.sep) + suite = class_testplan.testsuites.get(testsuite_rel_path) + assert suite.name == testsuite_rel_path assert all(isinstance(n, TestSuite) for n in class_testplan.testsuites.values()) @pytest.mark.parametrize("board_root_dir", [("board_config_file_not_exist"), ("board_config")]) @@ -268,34 +270,34 @@ def test_add_instances_short(tmp_path, class_env, all_testsuites_dict, platforms instance_list.append(instance) plan.add_instances(instance_list) assert list(plan.instances.keys()) == \ - [platform.name + '/' + s for s in list(all_testsuites_dict.keys())] + [os.path.join(platform.name, s) for s in list(all_testsuites_dict.keys())] assert all(isinstance(n, TestInstance) for n in list(plan.instances.values())) assert list(plan.instances.values()) == instance_list QUARANTINE_BASIC = { - 'demo_board_1/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1' : 'a1 on board_1 and board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1' : 'a1 on board_1 and board_3' + 'demo_board_1/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1'.replace('/', os.sep) : 'a1 on board_1 and board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1'.replace('/', os.sep) : 'a1 on board_1 and board_3' } QUARANTINE_WITH_REGEXP = { - 'demo_board_2/unit_testing/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_2' : 'a2 and c2 on x86', - 'demo_board_1/unit_testing/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1' : 'all test_d', - 'demo_board_3/unit_testing/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1' : 'all test_d', - 'demo_board_2/unit_testing/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1' : 'all test_d', - 'demo_board_2/unit_testing/scripts/tests/twister/test_data/testsuites/tests/test_c/test_c.check_2' : 'a2 and c2 on x86' + 'demo_board_2/unit_testing' + '/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_2'.replace('/', os.sep) : 'a2 and c2 on x86', + 'demo_board_1/unit_testing' + '/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1'.replace('/', os.sep) : 'all test_d', + 'demo_board_3/unit_testing' + '/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1'.replace('/', os.sep) : 'all test_d', + 'demo_board_2/unit_testing' + '/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1'.replace('/', os.sep) : 'all test_d', + 'demo_board_2/unit_testing' + '/scripts/tests/twister/test_data/testsuites/tests/test_c/test_c.check_2'.replace('/', os.sep) : 'a2 and c2 on x86' } QUARANTINE_PLATFORM = { - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_2' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_b/test_b.check_1' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_b/test_b.check_2' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_c/test_c.check_1' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_c/test_c.check_2' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_e/test_e.check_1' : 'all on board_3', - 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_config/test_config.main' : 'all on board_3' + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_1'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_a/test_a.check_2'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_d/test_d.check_1'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_b/test_b.check_1'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_b/test_b.check_2'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_c/test_c.check_1'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_c/test_c.check_2'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_e/test_e.check_1'.replace('/', os.sep) : 'all on board_3', + 'demo_board_3/scripts/tests/twister/test_data/testsuites/tests/test_config/test_config.main'.replace('/', os.sep) : 'all on board_3' } QUARANTINE_MULTIFILES = { @@ -353,12 +355,9 @@ def test_quarantine_short(class_testplan, platforms_list, test_data, TESTDATA_PART4 = [ - (os.path.join('test_d', 'test_d.check_1'), ['dummy'], - None, 'Snippet not supported'), - (os.path.join('test_c', 'test_c.check_1'), ['cdc-acm-console'], - 0, None), - (os.path.join('test_d', 'test_d.check_1'), ['dummy', 'cdc-acm-console'], - 2, 'Snippet not supported'), + ('test_d/test_d.check_1', ['dummy'], None, 'Snippet not supported'), + ('test_c/test_c.check_1', ['cdc-acm-console'], 0, None), + ('test_d/test_d.check_1', ['dummy', 'cdc-acm-console'], 2, 'Snippet not supported'), ] @pytest.mark.parametrize( @@ -377,9 +376,9 @@ def test_required_snippets_short( ): """ Testing required_snippets function of TestPlan class in Twister """ plan = class_testplan - testpath = os.path.join('scripts', 'tests', 'twister', 'test_data', - 'testsuites', 'tests', testpath) - testsuite = class_testplan.testsuites.get(testpath) + testsuite_rel_path = f'scripts/tests/twister/test_data/testsuites/tests/{testpath}' + testsuite_rel_path = testsuite_rel_path.replace('/', os.sep) + testsuite = class_testplan.testsuites.get(testsuite_rel_path) plan.platforms = platforms_list print(platforms_list) plan.platform_names = [p.name for p in platforms_list] @@ -1155,7 +1154,7 @@ def test_testplan_add_configurations( - name: unit_testing """ p1e1_yamlfile = tmp_p1_dir / 'board.yml' - p1e1_yamlfile.write_text(p1e1_bs_yaml) + p1e1_yamlfile.write_text(p1e1_bs_yaml, encoding='utf-8') p1e1_yaml = """\ identifier: p1e1 @@ -1168,7 +1167,7 @@ def test_testplan_add_configurations( twister: False """ p1e1_yamlfile = tmp_p1_dir / 'p1e1.yaml' - p1e1_yamlfile.write_text(p1e1_yaml) + p1e1_yamlfile.write_text(p1e1_yaml, encoding='utf-8') p1e2_yaml = """\ identifier: p1e2 @@ -1180,7 +1179,7 @@ def test_testplan_add_configurations( - zephyr """ p1e2_yamlfile = tmp_p1_dir / 'p1e2.yaml' - p1e2_yamlfile.write_text(p1e2_yaml) + p1e2_yamlfile.write_text(p1e2_yaml, encoding='utf-8') tmp_p2_dir = tmp_vend1_dir / 'p2' tmp_p2_dir.mkdir() @@ -1198,7 +1197,7 @@ def test_testplan_add_configurations( - name: unit_testing """ p2_yamlfile = tmp_p2_dir / 'board.yml' - p2_yamlfile.write_text(p2_bs_yaml) + p2_yamlfile.write_text(p2_bs_yaml, encoding='utf-8') p2_yaml = """\ identifier: p2/unit_testing @@ -1212,7 +1211,7 @@ def test_testplan_add_configurations( default: True """ p2_yamlfile = tmp_p2_dir / 'p2.yaml' - p2_yamlfile.write_text(p2_yaml) + p2_yamlfile.write_text(p2_yaml, encoding='utf-8') p2_2_yaml = """\ @@ -1227,7 +1226,7 @@ def test_testplan_add_configurations( - zephyr """ p2_2_yamlfile = tmp_p2_dir / 'p2-2.yaml' - p2_2_yamlfile.write_text(p2_2_yaml) + p2_2_yamlfile.write_text(p2_2_yaml, encoding='utf-8') tmp_vend2_dir = tmp_board_root_dir / 'arm' tmp_vend2_dir.mkdir() @@ -1243,7 +1242,7 @@ def test_testplan_add_configurations( - name: unit_testing """ p3_yamlfile = tmp_p3_dir / 'board.yml' - p3_yamlfile.write_text(p3_bs_yaml) + p3_yamlfile.write_text(p3_bs_yaml, encoding='utf-8') p3_yaml = """\ identifier: p3 @@ -1257,7 +1256,9 @@ def test_testplan_add_configurations( default: True """ p3_yamlfile = tmp_p3_dir / 'p3.yaml' - p3_yamlfile.write_text(p3_yaml) + p3_yamlfile.write_text(p3_yaml, encoding='utf-8') + p3_yamlfile = tmp_p3_dir / 'p3_B.conf' + p3_yamlfile.write_text('', encoding='utf-8') env = mock.Mock(board_roots=[tmp_board_root_dir],soc_roots=[tmp_path], arch_roots=[tmp_path]) @@ -1338,6 +1339,7 @@ def test_testplan_get_all_tests(): ) def test_testplan_add_testsuites(tmp_path, testsuite_filter, use_alt_root, detailed_id, expected_errors, expected_suite_count): + testsuite_filter = [f.replace('/', os.sep) for f in testsuite_filter] # tmp_path # ├ tests <- test root # │ ├ good_test @@ -1629,7 +1631,7 @@ def get_platform(name): testplan.load_from_file('dummy.yaml', filter_platform) expected_instances = { - 'Platform 1/TestSuite 1': { + str(os.path.join('Platform 1', 'TestSuite 1')): { 'metrics': { 'handler_time': 60.0, 'used_ram': 4096, @@ -1647,7 +1649,7 @@ def get_platform(name): } } }, - 'Platform 1/TestSuite 2': { + str(os.path.join('Platform 1', 'TestSuite 2')): { 'metrics': { 'handler_time': 0, 'used_ram': 0, @@ -1658,7 +1660,7 @@ def get_platform(name): 'retries': 0, 'testcases': [] }, - 'Platform 1/TestSuite 3': { + str(os.path.join('Platform 1', 'TestSuite 3')): { 'metrics': { 'handler_time': 360.0, 'used_ram': 4096, @@ -1682,7 +1684,7 @@ def get_platform(name): } } }, - 'Platform 1/TestSuite 4': { + str(os.path.join('Platform 1', 'TestSuite 4')): { 'metrics': { 'handler_time': 360.0, 'used_ram': 4096, diff --git a/scripts/tests/twister/test_testsuite.py b/scripts/tests/twister/test_testsuite.py index 83231c0f48ada..7ebb978204dd6 100644 --- a/scripts/tests/twister/test_testsuite.py +++ b/scripts/tests/twister/test_testsuite.py @@ -326,13 +326,7 @@ def test_get_unique_exception(testsuite_root, workdir, name, exception): 'test_a', 'test_a.check_1' ), - os.path.join( - os.sep, - TEST_DATA_REL_PATH, - 'tests', - 'test_a', - 'test_a.check_1' - ), + os.path.join(os.sep, TEST_DATA_REL_PATH, 'tests', 'test_a', 'test_a.check_1') ), ( ZEPHYR_BASE, @@ -367,7 +361,14 @@ def test_get_unique_exception(testsuite_root, workdir, name, exception): @pytest.mark.parametrize( 'testsuite_root, suite_path, name, expected', - TESTDATA_5 + TESTDATA_5, + ids=[ + 'test base, single test folder, case absolute path', + 'zephyr base, zephyr base, case', + 'zephyr base, single test folder, case absolute path', + 'tests folder, tests folder, case relative path', + 'zephyr base, zephyr base, subcase' + ] ) def test_get_unique(testsuite_root, suite_path, name, expected): """ @@ -443,8 +444,9 @@ def test_get_search_area_boundary( TESTDATA_7 = [ - (True, [os.path.join('', 'home', 'user', 'dummy_path', 'dummy.c'), - os.path.join('', 'home', 'user', 'dummy_path', 'dummy.cpp')]), + (True, [os.path.join(os.sep, 'home', 'user', 'dummy_path', 'dummy.c'), + os.path.join(os.sep, 'home', 'user', 'dummy_path', 'dummy.cpp')] + ), (False, []) ] @@ -454,8 +456,11 @@ def test_get_search_area_boundary( ids=['valid', 'not a directory'] ) def test_find_c_files_in(isdir, expected): - old_dir = os.path.join('', 'home', 'user', 'dummy_base_dir') - new_path = os.path.join('', 'home', 'user', 'dummy_path') + if os.name == 'nt': + expected = ['\\\\?\\C:' + p for p in expected] + + old_dir = os.path.join(os.sep, 'home', 'user', 'dummy_base_dir') + new_path = os.path.join(os.sep, 'home', 'user', 'dummy_path') cur_dir = old_dir def mock_chdir(path, *args, **kwargs): @@ -516,9 +521,9 @@ def mock_glob(fmt, *args, **kwargs): mock.patch('os.getcwd', return_value=cur_dir), \ mock.patch('glob.glob', mock_glob), \ mock.patch('os.chdir', side_effect=mock_chdir) as chdir_mock: - filenames = find_c_files_in(new_path) + tfilenames = find_c_files_in(new_path) - assert sorted(filenames) == sorted(expected) + assert sorted([str(f) for f in tfilenames]) == sorted(expected) assert chdir_mock.call_args is None or \ chdir_mock.call_args == mock.call(old_dir) @@ -683,9 +688,13 @@ def mock_stat(filename, *args, **kwargs): TESTDATA_9 = [ - ('dummy/path', 'dummy/path/src', 'dummy/path/src'), - ('dummy/path', 'dummy/src', 'dummy/src'), - ('dummy/path', 'another/path', '') + ( + os.path.join('dummy', 'path'), + os.path.join('dummy', 'path', 'src'), + os.path.join('dummy', 'path', 'src') + ), + (os.path.join('dummy', 'path'), os.path.join('dummy', 'src'), os.path.join('dummy', 'src')), + (os.path.join('dummy', 'path'), os.path.join('another', 'path'), '') ] @@ -903,5 +912,4 @@ def test_testcase_dunders(): def test_get_no_detailed_test_id(testsuite_root, suite_path, name, expected): '''Test to check if the name without path is given for each testsuite''' suite = TestSuite(testsuite_root, suite_path, name, detailed_test_id=False) - print(suite.name) assert suite.name == expected