|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +""" |
| 4 | +This is a helper script to run test binaries in parallel with two probes. |
| 5 | +
|
| 6 | +Build the binaries before running this script:: |
| 7 | +
|
| 8 | + cargo test -p testsuite --target thumbv7em-none-eabi --bins --no-run |
| 9 | +""" |
| 10 | + |
| 11 | +from typing import List, NamedTuple, Optional, Tuple |
| 12 | +import argparse |
| 13 | +import asyncio |
| 14 | +import os |
| 15 | +import sys |
| 16 | +import time |
| 17 | + |
| 18 | + |
| 19 | +class TestResult(NamedTuple): |
| 20 | + rc: int |
| 21 | + elf_abs_path: str |
| 22 | + probe: str |
| 23 | + elapsed: float |
| 24 | + |
| 25 | + def elf_filename(self) -> str: |
| 26 | + return os.path.basename(self.elf_abs_path) |
| 27 | + |
| 28 | + def test_name(self) -> str: |
| 29 | + # assumes the ELF is in the form of "rtc-1794a848b0c66d5d" |
| 30 | + return self.elf_filename().split("-")[0] |
| 31 | + |
| 32 | + def status(self) -> str: |
| 33 | + return "PASS" if self.passed() else "FAIL" |
| 34 | + |
| 35 | + def passed(self) -> bool: |
| 36 | + return self.rc == 0 |
| 37 | + |
| 38 | + |
| 39 | +def find_elfs(elf_dir: str) -> Tuple[List[str], str]: |
| 40 | + """ |
| 41 | + ``cargo`` does not output stable names for the test binaries. |
| 42 | +
|
| 43 | + This scans for ELF files, returning a tuple of |
| 44 | + ``(test_elf_files, subghz_test_elf_file)``. |
| 45 | + """ |
| 46 | + test_elf_files: List[str] = [] |
| 47 | + subghz_test_elf_file: Optional[str] = None |
| 48 | + |
| 49 | + for file in os.listdir(elf_dir): |
| 50 | + # runnable ELFs will not have an extension |
| 51 | + if os.path.splitext(file)[1] != "": |
| 52 | + continue |
| 53 | + |
| 54 | + file_abs_path = os.path.join(elf_dir, file) |
| 55 | + with open(file_abs_path, "rb") as f: |
| 56 | + magic = f.read(4) |
| 57 | + |
| 58 | + if magic == b"\x7FELF": |
| 59 | + if "subghz" in file: |
| 60 | + if subghz_test_elf_file is not None: |
| 61 | + raise Exception( |
| 62 | + "subghz_test_elf_file already set " |
| 63 | + f"subghz_test_elf_file={subghz_test_elf_file}" |
| 64 | + ) |
| 65 | + |
| 66 | + subghz_test_elf_file = file_abs_path |
| 67 | + else: |
| 68 | + test_elf_files.append(file_abs_path) |
| 69 | + |
| 70 | + if subghz_test_elf_file is None: |
| 71 | + raise Exception("subghz test ELF file not found") |
| 72 | + |
| 73 | + return (test_elf_files, subghz_test_elf_file) |
| 74 | + |
| 75 | + |
| 76 | +async def probe_run(elf_path: str, probe: str, log_prefix: str) -> TestResult: |
| 77 | + start = time.monotonic() |
| 78 | + proc = await asyncio.create_subprocess_exec( |
| 79 | + "probe-run", |
| 80 | + "--chip", |
| 81 | + "STM32WLE5JCIx", |
| 82 | + "--connect-under-reset", |
| 83 | + elf_path, |
| 84 | + "--probe", |
| 85 | + probe, |
| 86 | + stdout=asyncio.subprocess.PIPE, |
| 87 | + stderr=asyncio.subprocess.STDOUT, |
| 88 | + ) |
| 89 | + rc = await proc.wait() |
| 90 | + elapsed = time.monotonic() - start |
| 91 | + |
| 92 | + for line in ( |
| 93 | + (await proc.stdout.read()) |
| 94 | + .decode(encoding="UTF-8", errors="backslashreplace") |
| 95 | + .splitlines() |
| 96 | + ): |
| 97 | + print(f"[{log_prefix}] {line.rstrip()}") |
| 98 | + |
| 99 | + return TestResult(rc=rc, elf_abs_path=elf_path, probe=probe, elapsed=elapsed) |
| 100 | + |
| 101 | + |
| 102 | +async def probe_worker( |
| 103 | + test_elf_queue: asyncio.Queue, probe: str, log_prefix: str |
| 104 | +) -> List[TestResult]: |
| 105 | + print(f"[{log_prefix}] Using probe {probe}") |
| 106 | + |
| 107 | + ret = [] |
| 108 | + while True: |
| 109 | + try: |
| 110 | + test_elf_path = test_elf_queue.get_nowait() |
| 111 | + except asyncio.QueueEmpty: |
| 112 | + return ret |
| 113 | + else: |
| 114 | + result = await probe_run(test_elf_path, probe, log_prefix) |
| 115 | + ret.append(result) |
| 116 | + |
| 117 | + return ret |
| 118 | + |
| 119 | + |
| 120 | +def flatten(t: List[list]) -> list: |
| 121 | + return [item for sublist in t for item in sublist] |
| 122 | + |
| 123 | + |
| 124 | +async def main() -> int: |
| 125 | + this_dir = os.path.dirname(os.path.abspath(__file__)) |
| 126 | + repo_root = os.path.abspath(os.path.join(this_dir, "..")) |
| 127 | + default_elf_dir = os.path.join( |
| 128 | + repo_root, "target", "thumbv7em-none-eabi", "debug", "deps" |
| 129 | + ) |
| 130 | + parser = argparse.ArgumentParser(description="Helper to run all on-target tests") |
| 131 | + parser.add_argument( |
| 132 | + "-d", |
| 133 | + "--elf-dir", |
| 134 | + default=default_elf_dir, |
| 135 | + type=str, |
| 136 | + help="Path to directory containing ELFs to flash", |
| 137 | + ) |
| 138 | + parser.add_argument("probe", nargs=2, help="Serial number of the probes to use") |
| 139 | + args = parser.parse_args() |
| 140 | + |
| 141 | + (test_elf_files, subghz_test_elf_file) = find_elfs(args.elf_dir) |
| 142 | + |
| 143 | + print("Running subghz test") |
| 144 | + subghz_results = await asyncio.gather( |
| 145 | + probe_run(subghz_test_elf_file, args.probe[0], "A"), |
| 146 | + probe_run(subghz_test_elf_file, args.probe[1], "B"), |
| 147 | + ) |
| 148 | + |
| 149 | + num_tests: int = len(test_elf_files) |
| 150 | + print(f"Running {num_tests} tests") |
| 151 | + |
| 152 | + test_elf_queue = asyncio.Queue() |
| 153 | + for test in test_elf_files: |
| 154 | + test_elf_queue.put_nowait(test) |
| 155 | + |
| 156 | + test_results = await asyncio.gather( |
| 157 | + probe_worker(test_elf_queue, args.probe[0], "A"), |
| 158 | + probe_worker(test_elf_queue, args.probe[1], "B"), |
| 159 | + ) |
| 160 | + test_results = flatten(test_results) |
| 161 | + test_results.extend(subghz_results) |
| 162 | + |
| 163 | + fail: bool = False |
| 164 | + for result in test_results: |
| 165 | + if not result.passed(): |
| 166 | + fail = True |
| 167 | + |
| 168 | + print( |
| 169 | + f"{result.status()} " |
| 170 | + f"{result.test_name()} " |
| 171 | + f"{result.elapsed:.3f}s " |
| 172 | + f"{result.probe}" |
| 173 | + ) |
| 174 | + |
| 175 | + if fail: |
| 176 | + return 1 |
| 177 | + else: |
| 178 | + return 0 |
| 179 | + |
| 180 | + |
| 181 | +if __name__ == "__main__": |
| 182 | + sys.exit(asyncio.run(main())) |
0 commit comments