Skip to content

Commit e517d53

Browse files
committed
cli: add new oad list command
This adds a new command for Powered Up hubs that use the TI OAD profile for firmware updates. This includes a list command that will connect to the hub and list any interesting info that can be read from the OAD service.
1 parent dd2b4d1 commit e517d53

File tree

6 files changed

+394
-2
lines changed

6 files changed

+394
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Added
10+
- Added `pybricksdev oad info` command.
11+
912
## [1.0.0-alpha.50] - 2024-07-01
1013

1114
### Changed

pybricksdev/ble/oad/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# SPDX-License-Identifier: MIT
2+
# Copyright (c) 2024 The Pybricks Authors
3+
4+
"""
5+
Package for TI OAD (Over-the-Air Download) support.
6+
7+
https://software-dl.ti.com/lprf/sdg-latest/html/oad-ble-stack-3.x/oad_profile.html
8+
"""
9+
10+
from ._common import oad_uuid
11+
12+
__all__ = ["OAD_SERVICE_UUID"]
13+
14+
OAD_SERVICE_UUID = oad_uuid(0xFFC0)
15+
"""OAD service UUID."""

pybricksdev/ble/oad/_common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# SPDX-License-Identifier: MIT
2+
# Copyright (c) 2024 The Pybricks Authors
3+
4+
5+
def oad_uuid(uuid16: int) -> str:
6+
"""
7+
Converts a 16-bit UUID to the TI OAD 128-bit UUID format.
8+
"""
9+
return "f000{:04x}-0451-4000-b000-000000000000".format(uuid16)
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
# SPDX-License-Identifier: MIT
2+
# Copyright (c) 2024 The Pybricks Authors
3+
4+
import asyncio
5+
import struct
6+
from enum import IntEnum
7+
8+
from bleak import BleakClient
9+
10+
from ._common import oad_uuid
11+
12+
__all__ = ["OADControlPoint"]
13+
14+
15+
OAD_CONTROL_POINT_CHAR_UUID = oad_uuid(0xFFC5)
16+
"""OAD Control Point characteristic UUID."""
17+
18+
19+
class CmdId(IntEnum):
20+
GET_OAD_BLOCK_SIZE = 0x01
21+
SET_IMAGE_COUNT = 0x02
22+
START_OAD_PROCESS = 0x03
23+
ENABLE_OAD_IMAGE = 0x04
24+
CANCEL_OAD = 0x05
25+
DISABLE_OAD_IMAGE_BLOCK_WRITE = 0x06
26+
GET_SOFTWARE_VERSION = 0x07
27+
GET_OAD_IMAGE_STATUS = 0x08
28+
GET_PROFILE_VERSION = 0x09
29+
GET_DEVICE_TYPE = 0x10
30+
IMAGE_BLOCK_WRITE_CHAR = 0x12
31+
ERASE_ALL_BONDS = 0x13
32+
33+
34+
class OADReturn(IntEnum):
35+
SUCCESS = 0
36+
"""OAD succeeded"""
37+
CRC_ERR = 1
38+
"""The downloaded image’s CRC doesn’t match the one expected from the metadata"""
39+
FLASH_ERR = 2
40+
"""Flash function failure such as flashOpen/flashRead/flash write/flash erase"""
41+
BUFFER_OFL = 3
42+
"""The block number of the received packet doesn’t match the one requested, an overflow has occurred."""
43+
ALREADY_STARTED = 4
44+
"""OAD start command received, while OAD is already is progress"""
45+
NOT_STARTED = 5
46+
"""OAD data block received with OAD start process"""
47+
DL_NOT_COMPLETE = 6
48+
"""OAD enable command received without complete OAD image download"""
49+
NO_RESOURCES = 7
50+
"""Memory allocation fails/ used only for backward compatibility"""
51+
IMAGE_TOO_BIG = 8
52+
"""Image is too big"""
53+
INCOMPATIBLE_IMAGE = 9
54+
"""Stack and flash boundary mismatch, program entry mismatch"""
55+
INVALID_FILE = 10
56+
"""Invalid image ID received"""
57+
INCOMPATIBLE_FILE = 11
58+
"""BIM/image header/firmware version mismatch"""
59+
AUTH_FAIL = 12
60+
"""Start OAD process / Image Identify message/image payload authentication/validation fail"""
61+
EXT_NOT_SUPPORTED = 13
62+
"""Data length extension or OAD control point characteristic not supported"""
63+
DL_COMPLETE = 14
64+
"""OAD image payload download complete"""
65+
CCCD_NOT_ENABLED = 15
66+
"""Internal (target side) error code used to halt the process if a CCCD has not been enabled"""
67+
IMG_ID_TIMEOUT = 16
68+
"""OAD Image ID has been tried too many times and has timed out. Device will disconnect."""
69+
70+
71+
def _decode_version(v: int) -> int:
72+
return (v >> 4) * 10 + (v & 0x0F)
73+
74+
75+
class OADControlPoint:
76+
def __init__(self, client: BleakClient):
77+
self._client = client
78+
self._queue = asyncio.Queue[bytes]()
79+
80+
async def __aenter__(self):
81+
await self._client.start_notify(
82+
OAD_CONTROL_POINT_CHAR_UUID, self._notification_handler
83+
)
84+
return self
85+
86+
async def __aexit__(self, *exc_info):
87+
await self._client.stop_notify(OAD_CONTROL_POINT_CHAR_UUID)
88+
89+
def _notification_handler(self, sender, data):
90+
self._queue.put_nowait(data)
91+
92+
async def _send_command(self, cmd_id: CmdId, payload: bytes = b""):
93+
await self._client.write_gatt_char(
94+
OAD_CONTROL_POINT_CHAR_UUID, bytes([cmd_id]) + payload
95+
)
96+
rsp = await self._queue.get()
97+
98+
if rsp[0] != cmd_id:
99+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
100+
101+
return rsp[1:]
102+
103+
async def get_oad_block_size(self) -> int:
104+
"""
105+
Get the OAD block size.
106+
107+
Returns: OAD_BLOCK_SIZE
108+
"""
109+
rsp = await self._send_command(CmdId.GET_OAD_BLOCK_SIZE)
110+
111+
if len(rsp) != 2:
112+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
113+
114+
return int.from_bytes(rsp, "little")
115+
116+
async def set_image_count(self, count: int) -> OADReturn:
117+
"""
118+
Set the number of images to be downloaded.
119+
120+
Args:
121+
count: Number of images to be downloaded.
122+
123+
Returns: Status
124+
"""
125+
rsp = await self._send_command(CmdId.SET_IMAGE_COUNT, bytes([count]))
126+
127+
if len(rsp) != 1:
128+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
129+
130+
return OADReturn(rsp[0])
131+
132+
async def start_oad_process(self) -> int:
133+
"""
134+
Start the OAD process.
135+
136+
Returns: Block Number
137+
"""
138+
rsp = await self._send_command(CmdId.START_OAD_PROCESS)
139+
140+
if len(rsp) != 4:
141+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
142+
143+
return int.from_bytes(rsp, "little")
144+
145+
async def enable_oad_image(self) -> OADReturn:
146+
"""
147+
Enable the OAD image.
148+
149+
Returns: Status
150+
"""
151+
# REVISIT: this command can also take an optional payload
152+
rsp = await self._send_command(CmdId.ENABLE_OAD_IMAGE)
153+
154+
if len(rsp) != 1:
155+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
156+
157+
return OADReturn(rsp[0])
158+
159+
async def cancel_oad(self) -> OADReturn:
160+
"""
161+
Cancel the OAD process.
162+
163+
Returns: Status
164+
"""
165+
rsp = await self._send_command(CmdId.CANCEL_OAD)
166+
167+
if len(rsp) != 1:
168+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
169+
170+
return OADReturn(rsp[0])
171+
172+
async def disable_oad_image_block_write(self) -> OADReturn:
173+
"""
174+
Disable OAD image block write.
175+
176+
Returns: Status
177+
"""
178+
rsp = await self._send_command(CmdId.DISABLE_OAD_IMAGE_BLOCK_WRITE)
179+
180+
if len(rsp) != 1:
181+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
182+
183+
return OADReturn(rsp[0])
184+
185+
async def get_software_version(self) -> tuple[tuple[int, int], tuple[int, int]]:
186+
"""
187+
Get the software version.
188+
189+
Returns: Software Version (tuple of Application and Stack version tuples)
190+
"""
191+
rsp = await self._send_command(CmdId.GET_SOFTWARE_VERSION)
192+
193+
if len(rsp) != 4:
194+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
195+
196+
return (
197+
(_decode_version(rsp[0]), _decode_version(rsp[1])),
198+
(_decode_version(rsp[2]), _decode_version(rsp[3])),
199+
)
200+
201+
async def get_oad_image_status(self) -> OADReturn:
202+
"""
203+
Get the OAD image status.
204+
205+
Returns: Status
206+
"""
207+
rsp = await self._send_command(CmdId.GET_OAD_IMAGE_STATUS)
208+
209+
if len(rsp) != 1:
210+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
211+
212+
return OADReturn(rsp[0])
213+
214+
async def get_profile_version(self) -> int:
215+
"""
216+
Get the profile version.
217+
218+
Returns: Version of OAD profile supported by target
219+
"""
220+
rsp = await self._send_command(CmdId.GET_PROFILE_VERSION)
221+
222+
if len(rsp) != 1:
223+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
224+
225+
return rsp[0]
226+
227+
async def get_device_type(self) -> int:
228+
"""
229+
Get the device type.
230+
231+
Returns: Value of Device ID register
232+
"""
233+
rsp = await self._send_command(CmdId.GET_DEVICE_TYPE)
234+
235+
if len(rsp) != 4:
236+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
237+
238+
return int.from_bytes(rsp, "little")
239+
240+
async def image_block_write(self, prev_status: int, block_num: int) -> None:
241+
"""
242+
Write an image block.
243+
244+
Args:
245+
prev_status: Status of the previous block received
246+
block_num: Block number
247+
"""
248+
rsp = await self._send_command(
249+
CmdId.IMAGE_BLOCK_WRITE_CHAR, struct.pack("<BI", prev_status, block_num)
250+
)
251+
252+
if len(rsp) != 0:
253+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
254+
255+
async def erase_all_bonds(self) -> OADReturn:
256+
"""
257+
Erase all bonds.
258+
259+
Returns: Status
260+
"""
261+
rsp = await self._send_command(CmdId.ERASE_ALL_BONDS)
262+
263+
if len(rsp) != 1:
264+
raise RuntimeError(f"Unexpected response: {rsp.hex(':')}")
265+
266+
return OADReturn(rsp[0])

pybricksdev/cli/__init__.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# SPDX-License-Identifier: MIT
2-
# Copyright (c) 2019-2022 The Pybricks Authors
2+
# Copyright (c) 2019-2024 The Pybricks Authors
33

44
"""Command line wrapper around pybricksdev library."""
55

@@ -290,6 +290,43 @@ def run(self, args: argparse.Namespace):
290290
return self.subparsers.choices[args.action].tool.run(args)
291291

292292

293+
class OADInfo(Tool):
294+
def add_parser(self, subparsers: argparse._SubParsersAction):
295+
parser = subparsers.add_parser(
296+
"info",
297+
help="get information about firmware on a LEGO Powered Up device using TI OAD",
298+
)
299+
parser.tool = self
300+
301+
async def run(self, args: argparse.Namespace):
302+
from .oad import dump_oad_info
303+
304+
await dump_oad_info()
305+
306+
307+
class OAD(Tool):
308+
def add_parser(self, subparsers: argparse._SubParsersAction):
309+
self.parser = subparsers.add_parser(
310+
"oad",
311+
help="update firmware on a LEGO Powered Up device using TI OAD",
312+
)
313+
self.parser.tool = self
314+
self.subparsers = self.parser.add_subparsers(
315+
metavar="<action>", dest="action", help="the action to perform"
316+
)
317+
318+
for tool in (OADInfo(),):
319+
tool.add_parser(self.subparsers)
320+
321+
def run(self, args: argparse.Namespace):
322+
if args.action not in self.subparsers.choices:
323+
self.parser.error(
324+
f'Missing name of action: {"|".join(self.subparsers.choices.keys())}'
325+
)
326+
327+
return self.subparsers.choices[args.action].tool.run(args)
328+
329+
293330
class LWP3Repl(Tool):
294331
def add_parser(self, subparsers: argparse._SubParsersAction):
295332
parser = subparsers.add_parser(
@@ -372,7 +409,7 @@ def main():
372409
help="the tool to use",
373410
)
374411

375-
for tool in Compile(), Run(), Flash(), DFU(), LWP3(), Udev():
412+
for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
376413
tool.add_parser(subparsers)
377414

378415
argcomplete.autocomplete(parser)

0 commit comments

Comments
 (0)