|
6 | 6 | import sys |
7 | 7 | import time |
8 | 8 | from pathlib import Path |
| 9 | +from typing import Optional |
9 | 10 |
|
10 | 11 | from loguru import logger as log |
11 | 12 | from rich.progress import track |
12 | 13 |
|
13 | 14 | from .flash_uf2_boardid import get_board_id |
14 | | -from .uf2disk import UF2Disk |
15 | 15 |
|
16 | 16 |
|
17 | | -def get_uf2_drives(): |
18 | | - """ |
19 | | - Get a list of all the (un)mounted UF2 drives |
20 | | - """ |
21 | | - if sys.platform != "linux": |
22 | | - log.error("pumount only works on Linux") |
23 | | - return |
24 | | - # import blkinfo only on linux |
25 | | - from blkinfo import BlkDiskInfo |
26 | | - |
27 | | - myblkd = BlkDiskInfo() |
28 | | - filters = { |
29 | | - "tran": "usb", |
30 | | - } |
31 | | - usb_disks = myblkd.get_disks(filters) |
32 | | - for disk in usb_disks: |
33 | | - if disk["fstype"] == "vfat": |
34 | | - uf2_part = disk |
35 | | - # unpartioned usb disk or partition (e.g. /dev/sdb ) |
36 | | - # SEEED WIO Terminal is unpartioned |
37 | | - # print( json.dumps(uf2_part, indent=4)) |
38 | | - uf2 = UF2Disk() |
39 | | - uf2.device_path = "/dev/" + uf2_part["name"] |
40 | | - uf2.label = uf2_part["label"] |
41 | | - uf2.mountpoint = uf2_part["mountpoint"] |
42 | | - yield uf2 |
43 | | - elif disk["type"] == "disk" and disk.get("children") and len(disk.get("children")) > 0: |
44 | | - if disk.get("children")[0]["type"] == "part" and disk.get("children")[0]["fstype"] == "vfat": |
45 | | - uf2_part = disk.get("children")[0] |
46 | | - # print( json.dumps(uf2_part, indent=4)) |
47 | | - uf2 = UF2Disk() |
48 | | - uf2.device_path = "/dev/" + uf2_part["name"] |
49 | | - uf2.label = uf2_part["label"] |
50 | | - uf2.mountpoint = uf2_part["mountpoint"] |
51 | | - yield uf2 |
52 | | - |
53 | | - |
54 | | -def wait_for_UF2_macos(s_max: int = 10): |
55 | | - destination = "" |
56 | | - wait = 10 |
57 | | - uf2_drives = [] |
58 | | - # while not destination and wait > 0: |
| 17 | +def wait_for_UF2_macos(s_max: int = 10) -> Optional[Path]: |
| 18 | + """Wait for the MCU to mount as a drive""" |
| 19 | + if s_max < 1: |
| 20 | + s_max = 10 |
| 21 | + destination = None |
59 | 22 | for _ in track( |
60 | 23 | range(s_max), description="Waiting for mcu to mount as a drive", transient=True, refresh_per_second=2 |
61 | 24 | ): |
62 | | - # log.info(f"Waiting for mcu to mount as a drive : {wait} seconds left") |
63 | | - uf2_drives += list(get_uf2_drives()) |
64 | | - for drive in get_uf2_drives(): |
65 | | - time.sleep(1) |
| 25 | + # log.info(f"Waiting for mcu to mount as a drive : {n} seconds left") |
| 26 | + vol_mounts = Path("/Volumes").iterdir() |
| 27 | + for vol in vol_mounts: |
66 | 28 | try: |
67 | | - if Path(drive.mountpoint, "INFO_UF2.TXT").exists(): |
68 | | - board_id = get_board_id(Path(drive.mountpoint)) # type: ignore |
69 | | - destination = Path(drive.mountpoint) |
| 29 | + if Path(vol, "INFO_UF2.TXT").exists(): |
| 30 | + destination = Path(vol) |
70 | 31 | break |
71 | | - except PermissionError: |
72 | | - log.debug(f"Permission error on {drive.mountpoint}") |
73 | | - continue |
| 32 | + except OSError: |
| 33 | + pass |
74 | 34 | if destination: |
75 | 35 | break |
76 | 36 | time.sleep(1) |
77 | | - wait -= 1 |
78 | 37 | return destination |
0 commit comments