44# SPDX-License-Identifier: Apache-2.0
55
66import argparse
7- import subprocess
7+ import asyncio
88import sys
99import time
1010from pathlib import Path
1616BAUD_RATE : int = 1_000_000
1717TIMEOUT : int = 60
1818
19+ RUNNER : str = Path (__file__ ).name
1920
20- def load_fpga_test (test : Path , timeout : int = TIMEOUT ) -> None :
21+
22+ async def load_fpga_test (test : Path ) -> None :
2123 command = ["openFPGALoader" , "--spi" , "--offset" , str (BOOT_ROM_OFFSET ), "--write-flash" ]
22- command .append (test .with_suffix (".bin" ))
24+ command .append (str (test .with_suffix (".bin" )))
25+ # TODO: This is a workaround to send a reset and start the test, should be removed when we
26+ # are able to reset the SoC with the external reset.
27+ # The first invocation resets and load the binary, the second resets and the load is
28+ # ignored by the bootROM, thus we don't check the return error.
29+ p = await asyncio .create_subprocess_exec (* command )
30+ if await p .wait () != 0 :
31+ print (f"[{ RUNNER } ] SPI load command exited with non-zero exit code { p .returncode } " )
32+ sys .exit (1 )
33+ p = await asyncio .create_subprocess_exec (* command )
34+ await p .wait ()
35+
36+
37+ async def run_fpga_test (tty : str , test : Path ) -> bool :
38+ with serial .Serial (tty , BAUD_RATE , timeout = 0 ) as uart :
39+ load = asyncio .create_task (load_fpga_test (test ))
40+ poll = asyncio .create_task (poll_uart (uart ))
41+ success = await poll
42+ await load
43+ return success
44+
45+
46+ async def poll_uart (uart : serial .Serial ) -> bool :
2347 start = time .time ()
24- while time .time () - start < timeout :
25- try :
26- subprocess .run (command , capture_output = False , check = False )
27- except OSError as e :
28- print (f"[{ Path (__file__ ).name } ] Error: { e .strerror } " )
29- sys .exit (1 )
30-
31- # TODO: This is a workaround to send a reset and start the test, should be removed when we
32- # are able to reset the SoC with the external reset.
33- # The first invocation resets and load the binary, the second resets and the load is
34- # ignored by the bootROM, thus we don't check the return error.
35- subprocess .run (command , capture_output = True , check = False )
36- return
37-
38- print (f"[{ Path (__file__ ).name } ] Load FPGA test timeout" )
39- sys .exit (1 )
40-
41-
42- def run_fpga_test (tty : str , test : Path , timeout : int = 10 ) -> int :
43- print (f"Listening to { tty } " )
44- with serial .Serial (tty , BAUD_RATE , timeout = 1 ) as uart :
45- start = time .time ()
46- load_fpga_test (test )
47- while time .time () - start < timeout :
48- line = uart .readline ().decode ("utf-8" , errors = "ignore" )
49- print (line , end = "" )
50- if not line or "TEST RESULT" not in line :
51- continue
52-
53- if "PASSED" in line :
54- return 0
55-
56- if "FAILED" not in line :
57- print (f"[{ Path (__file__ ).name } ] Unknown test result: { line } " )
58-
59- return 1
60- print (f"[{ Path (__file__ ).name } ] Test timeout" )
61- return 1
48+ while time .time () - start < TIMEOUT :
49+ line = await asyncio .to_thread (uart .readline )
50+ line = line .decode ("utf-8" , errors = "ignore" )
51+ print (line , end = "" )
52+ if not line or "TEST RESULT" not in line :
53+ continue
54+ return "PASSED" in line
55+ print (f"[{ RUNNER } ] Test timeout" )
56+ return False
6257
6358
6459def find_uart (vid : int = 0x0403 , pid : int = 0x6001 ) -> str | None :
@@ -74,9 +69,13 @@ def main() -> None:
7469 args = parser .parse_args ()
7570
7671 if uart_tty := find_uart ():
77- sys .exit (run_fpga_test (uart_tty , args .test , TIMEOUT ))
72+ try :
73+ success = asyncio .run (run_fpga_test (uart_tty , args .test ))
74+ sys .exit (0 if success else 1 )
75+ except KeyboardInterrupt :
76+ sys .exit (1 )
7877
79- print (f"[{ Path ( __file__ ). name } ] Error: UART device not found" )
78+ print (f"[{ RUNNER } ] Error: UART device not found" )
8079 sys .exit (1 )
8180
8281
0 commit comments