|
8 | 8 | import struct |
9 | 9 | import sys |
10 | 10 | import zipfile |
| 11 | +import zlib |
11 | 12 | from tempfile import NamedTemporaryFile |
12 | 13 | from typing import BinaryIO, Dict, Optional |
13 | 14 |
|
14 | 15 | from bleak import BleakClient, BleakScanner |
15 | 16 | from bleak.backends.device import BLEDevice |
16 | 17 | from bleak.backends.scanner import AdvertisementData |
17 | 18 | from packaging.version import Version |
| 19 | +from tqdm.auto import tqdm |
| 20 | +from tqdm.contrib.logging import logging_redirect_tqdm |
18 | 21 |
|
19 | 22 | from ..ble.lwp3 import ( |
20 | 23 | LEGO_CID, |
@@ -364,6 +367,55 @@ async def flash_nxt(firmware: bytes) -> None: |
364 | 367 | s.close() |
365 | 368 |
|
366 | 369 |
|
| 370 | +async def flash_ev3(firmware: bytes) -> None: |
| 371 | + """ |
| 372 | + Flashes firmware to EV3. |
| 373 | +
|
| 374 | + Args: |
| 375 | + firmware: |
| 376 | + A firmware blob. |
| 377 | + """ |
| 378 | + from ..connections.ev3 import EV3Bootloader |
| 379 | + |
| 380 | + # TODO: nice error message and exit(1) if EV3 is not found |
| 381 | + with EV3Bootloader() as bootloader: |
| 382 | + fw, hw = await bootloader.get_version() |
| 383 | + print(f"hwid: {hw}") |
| 384 | + |
| 385 | + ERASE_TICKS = 60 |
| 386 | + |
| 387 | + # Erasing doesn't have any feedback so we just use time for the progress |
| 388 | + # bar. The operation runs on the EV3, so the time is the same for everyone. |
| 389 | + async def tick(callback): |
| 390 | + for _ in range(ERASE_TICKS): |
| 391 | + await asyncio.sleep(1) |
| 392 | + callback(1) |
| 393 | + |
| 394 | + print("Erasing memory...") |
| 395 | + with logging_redirect_tqdm(), tqdm(total=ERASE_TICKS) as pbar: |
| 396 | + await asyncio.gather(bootloader.erase_chip(), tick(pbar.update)) |
| 397 | + |
| 398 | + print("Downloading firmware...") |
| 399 | + with logging_redirect_tqdm(), tqdm( |
| 400 | + total=len(firmware), unit="B", unit_scale=True |
| 401 | + ) as pbar: |
| 402 | + await bootloader.download(0, firmware, pbar.update) |
| 403 | + |
| 404 | + print("Verifying...", end="", flush=True) |
| 405 | + checksum = await bootloader.get_checksum(0, len(firmware)) |
| 406 | + expected_checksum = zlib.crc32(firmware) |
| 407 | + |
| 408 | + if checksum != expected_checksum: |
| 409 | + print("Bad checksum!") |
| 410 | + exit(1) |
| 411 | + |
| 412 | + print("OK.") |
| 413 | + |
| 414 | + print("Restarting EV3...", end="", flush=True) |
| 415 | + await bootloader.start_app() |
| 416 | + print("Done.") |
| 417 | + |
| 418 | + |
367 | 419 | async def flash_firmware(firmware_zip: BinaryIO, new_name: Optional[str]) -> None: |
368 | 420 | """ |
369 | 421 | Command line tool for flashing firmware. |
@@ -420,5 +472,7 @@ async def flash_firmware(firmware_zip: BinaryIO, new_name: Optional[str]) -> Non |
420 | 472 | await flash_ble(hub_kind, firmware, metadata) |
421 | 473 | elif hub_kind == HubKind.NXT: |
422 | 474 | await flash_nxt(firmware) |
| 475 | + elif hub_kind == HubKind.EV3: |
| 476 | + await flash_ev3(firmware) |
423 | 477 | else: |
424 | 478 | raise ValueError(f"unsupported hub kind: {hub_kind}") |
0 commit comments