Skip to content

Commit 60e962b

Browse files
committed
cli: modify flash command to reboot in update mode
This adds a feature the the CLI `flash` command to scan for hubs not in bootloader mode running official LEGO firmware or Pybricks firmware. It will reboot these hubs in bootloader mode and proceed with flashing the firmware.
1 parent 0e20265 commit 60e962b

File tree

2 files changed

+278
-17
lines changed

2 files changed

+278
-17
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212

1313
### Changed
1414
- Moved ``Bootloader*`` to new ``ble.lwp3.bootloader`` module.
15+
- ``pybricksdev flash`` will now discover hubs running official LEGO firmware
16+
or Pybricks firmware and reboot in bootloader mode automatically.
1517

1618
## [1.0.0-alpha.21] - 2022-01-12
1719

pybricksdev/cli/flash.py

Lines changed: 276 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,285 @@
33

44
import asyncio
55
import json
6+
import logging
7+
import sys
68
import zipfile
9+
from tempfile import NamedTemporaryFile
710
from typing import BinaryIO, Optional
811

9-
from ..ble import find_device
10-
from ..ble.lwp3 import LWP3_BOOTLOADER_SERVICE_UUID
11-
from ..ble.lwp3.bytecodes import HubKind
12+
from bleak import BleakClient, BleakScanner
13+
from bleak.backends.device import BLEDevice
14+
from bleak.backends.scanner import AdvertisementData
15+
16+
from ..ble.lwp3 import (
17+
LEGO_CID,
18+
LWP3_BOOTLOADER_SERVICE_UUID,
19+
LWP3_HUB_CHARACTERISTIC_UUID,
20+
LWP3_HUB_SERVICE_UUID,
21+
)
22+
from ..ble.lwp3 import AdvertisementData as HubAdvertisementData
23+
from ..ble.lwp3.bootloader import BootloaderAdvertisementData
24+
from ..ble.lwp3.bytecodes import HubKind, HubProperty
25+
from ..ble.lwp3.messages import (
26+
FirmwareUpdateMessage,
27+
HubPropertyRequestUpdate,
28+
HubPropertyUpdate,
29+
parse_message,
30+
)
31+
from ..ble.nus import NUS_RX_UUID, NUS_TX_UUID
32+
from ..ble.pybricks import (
33+
FW_REV_UUID,
34+
PNP_ID_UUID,
35+
PYBRICKS_SERVICE_UUID,
36+
unpack_pnp_id,
37+
)
38+
from ..compile import compile_file
1239
from ..connections import REPLHub
1340
from ..dfu import flash_dfu
1441
from ..flash import BootloaderConnection, create_firmware
42+
from ..tools import chunk
43+
from ..tools.checksum import xor_bytes
44+
45+
logger = logging.getLogger(__name__)
46+
47+
48+
REBOOT_SCRIPT = """
49+
from pybricks.hubs import ThisHub
50+
from pybricks.tools import wait
51+
52+
hub = ThisHub()
53+
54+
# without delay, hub will reboot before we receive last checksum
55+
wait(500)
56+
hub.system.reset(2)
57+
"""
58+
59+
60+
def match_hub(
61+
hub_kind: HubKind, hub_name: Optional[str], adv: AdvertisementData
62+
) -> bool:
63+
"""
64+
Advertisement data matching function for filtering supported hubs.
65+
66+
Args:
67+
hub_kind: The hub type ID to match.
68+
hub_name: An optional name to match to the "local name" in the
69+
advertisement data.
70+
adv: The advertisemet data to check.
71+
72+
Returns:
73+
``True`` if *adv* matches the criteria, otherwise ``False``.
74+
"""
75+
# hub name filtering is optional
76+
if hub_name and adv.local_name != hub_name:
77+
return False
78+
79+
# LEGO firmware uses manufacturer-specific data
80+
81+
lego_data = adv.manufacturer_data.get(LEGO_CID)
82+
83+
if lego_data:
84+
if LWP3_BOOTLOADER_SERVICE_UUID in adv.service_uuids:
85+
bl_data = BootloaderAdvertisementData(lego_data)
86+
return bl_data.hub_kind == hub_kind
87+
88+
if LWP3_HUB_SERVICE_UUID in adv.service_uuids:
89+
hub_data = HubAdvertisementData(lego_data)
90+
return hub_data.hub_kind == hub_kind
91+
92+
# Pybricks firmware uses Device Information service data
93+
94+
pnp_id_data = adv.service_data.get(PNP_ID_UUID)
95+
96+
if pnp_id_data and PYBRICKS_SERVICE_UUID in adv.service_uuids:
97+
_, _, pid, _ = unpack_pnp_id(pnp_id_data)
98+
return pid == hub_kind
99+
100+
return False
101+
102+
103+
async def download_and_run(client: BleakClient, script: str) -> None:
104+
"""
105+
Downloads and runs a script on a hub running Pybricks firmware.
106+
107+
Args:
108+
client: The Bluetooth connection to the hub.
109+
script: The script to be compiled and run.
110+
"""
111+
with NamedTemporaryFile("w", suffix=".py") as temp:
112+
temp.write(script)
113+
114+
# file has to be closed so mpy-cross can open it
115+
temp.file.close()
116+
117+
mpy = await compile_file(temp.name)
118+
119+
recv_queue = asyncio.Queue()
120+
121+
def on_notify(_h, data: bytes):
122+
recv_queue.put_nowait(data)
123+
124+
# BOOST Move hub has hardware limit of MTU == 23 so it has to have data
125+
# split into smaller chunks
126+
write_size = 20 if client.mtu_size < 100 else 100
127+
128+
async def write_chunk(data: bytes):
129+
"""
130+
Writes a chunk of data and waits for a checksum reply.
131+
132+
Args:
133+
data: The data.
134+
135+
Raises:
136+
RuntimeError: If the returned checksum did not match.
137+
asyncio.TimeoutError: If no reply was received.
138+
"""
139+
checksum = xor_bytes(data, 0)
140+
141+
for c in chunk(data, write_size):
142+
await client.write_gatt_char(NUS_RX_UUID, c)
143+
144+
reply: bytes = await asyncio.wait_for(recv_queue.get(), 1)
145+
146+
if reply[0] != checksum:
147+
raise RuntimeError("bad checksum")
148+
149+
await client.start_notify(NUS_TX_UUID, on_notify)
150+
151+
# communication protocol is write file size, then send file in 100 byte chunks
152+
try:
153+
await write_chunk(len(mpy).to_bytes(4, "little"))
154+
155+
for c in chunk(mpy, 100):
156+
await write_chunk(c)
157+
158+
finally:
159+
await client.stop_notify(NUS_TX_UUID)
160+
161+
162+
async def reboot_official_to_bootloader(hub_kind: HubKind, device: BLEDevice) -> None:
163+
"""
164+
Connects to a hub running official LEGO firmware and sends a message to
165+
reboot in firmware update mode.
166+
"""
167+
async with BleakClient(device) as client:
168+
169+
# give bluetooth stack time to settle
170+
await asyncio.sleep(1)
171+
172+
fw_ver_future = asyncio.get_running_loop().create_future()
173+
174+
def on_notify(_h, data: bytes):
175+
msg = parse_message(data)
176+
177+
logger.debug("%s", str(msg))
178+
179+
if (
180+
isinstance(msg, HubPropertyUpdate)
181+
and msg.prop == HubProperty.FW_VERSION
182+
):
183+
fw_ver_future.set_result(msg.value)
184+
185+
await client.start_notify(LWP3_HUB_CHARACTERISTIC_UUID, on_notify)
186+
await client.write_gatt_char(
187+
LWP3_HUB_CHARACTERISTIC_UUID,
188+
HubPropertyRequestUpdate(HubProperty.FW_VERSION),
189+
# work around city hub bluetooth bug on linux
190+
response=hub_kind == HubKind.CITY,
191+
)
192+
193+
fw_ver = await asyncio.wait_for(fw_ver_future, 5)
194+
print(f"Hub is running firmware v{fw_ver}.")
195+
196+
print("Rebooting in update mode...")
197+
198+
await client.write_gatt_char(
199+
LWP3_HUB_CHARACTERISTIC_UUID, FirmwareUpdateMessage()
200+
)
201+
202+
203+
async def reboot_pybricks_to_bootloader(hub_kind: HubKind, device: BLEDevice) -> None:
204+
"""
205+
Connects to a hub running Pybricks firmware and sends a message to
206+
reboot in firmware update mode.
207+
"""
208+
async with BleakClient(device) as client:
209+
# Work around BlueZ limitation.
210+
if client.__class__.__name__ == "BleakClientBlueZDBus":
211+
client._mtu_size = 23 if hub_kind == HubKind.BOOST else 158
212+
213+
# give bluetooth stack time to settle
214+
await asyncio.sleep(1)
215+
216+
fw_ver = await client.read_gatt_char(FW_REV_UUID)
217+
fw_ver = fw_ver.decode()
218+
print(f"Hub is running firmware v{fw_ver}.")
219+
220+
print("Rebooting in update mode...")
221+
222+
await download_and_run(client, REBOOT_SCRIPT)
223+
224+
225+
async def flash_ble(
226+
hub_kind: HubKind, hub_name: Optional[str], firmware: bytes, metadata: dict
227+
):
228+
"""
229+
Flashes firmware to the hub using Bluetooth Low Energy.
230+
231+
The hub has to be advertising and can be running official LEGO firmware,
232+
Pybricks firmware or be in bootloader mode.
233+
234+
Args:
235+
hub_kind: The hub type ID. Only hubs matching this ID will be discovered.
236+
hub_name: If given, only hubs that advertise this name will be disovered.
237+
firmware: The raw firmware binary blob.
238+
metadata: The firmware metadata from the firmware.zip file.
239+
"""
240+
241+
print(f"Searching for {repr(hub_name) if hub_name else hub_kind.name} hub...")
242+
243+
# scan for hubs in bootloader mode, running official LEGO firmware or
244+
# running Pybricks firmware
245+
246+
device = await BleakScanner.find_device_by_filter(
247+
lambda _d, a: match_hub(hub_kind, hub_name, a),
248+
service_uuids=[
249+
LWP3_BOOTLOADER_SERVICE_UUID,
250+
LWP3_HUB_SERVICE_UUID,
251+
PYBRICKS_SERVICE_UUID,
252+
],
253+
)
254+
255+
if device is None:
256+
print("timed out", file=sys.stderr)
257+
return
258+
259+
# if not already in bootlaoder mode, we need to reboot into bootloader mode
260+
if LWP3_HUB_SERVICE_UUID in device.metadata["uuids"]:
261+
print("Found hub running official LEGO firmare.")
262+
await reboot_official_to_bootloader(hub_kind, device)
263+
elif PYBRICKS_SERVICE_UUID in device.metadata["uuids"]:
264+
print("Found hub running Pybricks firmare.")
265+
await reboot_pybricks_to_bootloader(hub_kind, device)
266+
267+
# if not previously in bootlaoder mode, scan again, this time only for bootloader
268+
if LWP3_BOOTLOADER_SERVICE_UUID not in device.metadata["uuids"]:
269+
device = await BleakScanner.find_device_by_filter(
270+
lambda _d, a: match_hub(hub_kind, hub_name, a),
271+
service_uuids=[
272+
LWP3_BOOTLOADER_SERVICE_UUID,
273+
],
274+
)
275+
276+
if device is None:
277+
print("timed out", file=sys.stderr)
278+
return
279+
280+
print("Found:", device)
281+
updater = BootloaderConnection()
282+
await updater.connect(device)
283+
print("Erasing flash and starting update")
284+
await updater.flash(firmware, metadata)
15285

16286

17287
async def flash_firmware(firmware_zip: BinaryIO, hub_name: Optional[str]) -> None:
@@ -26,8 +296,9 @@ async def flash_firmware(firmware_zip: BinaryIO, hub_name: Optional[str]) -> Non
26296
print("Creating firmware...")
27297

28298
firmware, metadata = await create_firmware(firmware_zip, hub_name)
299+
hub_kind = HubKind(metadata["device-id"])
29300

30-
if metadata["device-id"] in (HubKind.TECHNIC_SMALL, HubKind.TECHNIC_LARGE):
301+
if hub_kind in (HubKind.TECHNIC_SMALL, HubKind.TECHNIC_LARGE):
31302
try:
32303
# Connect to the hub and exit the runtime.
33304
hub = REPLHub()
@@ -60,16 +331,4 @@ async def flash_firmware(firmware_zip: BinaryIO, hub_name: Optional[str]) -> Non
60331
print("Could not find hub in standard firmware mode. Trying DFU.")
61332
flash_dfu(firmware, metadata)
62333
else:
63-
print("Searching for LEGO Bootloader...")
64-
65-
try:
66-
device = await find_device(service=LWP3_BOOTLOADER_SERVICE_UUID)
67-
except asyncio.TimeoutError:
68-
print("timed out")
69-
return
70-
71-
print("Found:", device)
72-
updater = BootloaderConnection()
73-
await updater.connect(device)
74-
print("Erasing flash and starting update")
75-
await updater.flash(firmware, metadata)
334+
await flash_ble(hub_kind, hub_name, firmware, metadata)

0 commit comments

Comments
 (0)