Skip to content

Commit 0dd7836

Browse files
committed
cli: move flash logic to separate file
Ideally, the __init__.py file just contains command line arg parsing and all of the actual run logic is defined elsewhere to prevent the file from getting too big.
1 parent 4d6b57f commit 0dd7836

File tree

2 files changed

+87
-70
lines changed

2 files changed

+87
-70
lines changed

pybricksdev/cli/__init__.py

Lines changed: 12 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,25 @@
11
# SPDX-License-Identifier: MIT
2-
# Copyright (c) 2019-2021 The Pybricks Authors
2+
# Copyright (c) 2019-2022 The Pybricks Authors
33

44
"""Command line wrapper around pybricksdev library."""
55

66
import argparse
77
import asyncio
88
import contextlib
9-
import json
109
import logging
1110
import os
1211
import sys
13-
from tempfile import NamedTemporaryFile
14-
from typing import ContextManager, TextIO
15-
import validators
16-
import zipfile
17-
1812
from abc import ABC, abstractmethod
1913
from os import PathLike, path
14+
from tempfile import NamedTemporaryFile
15+
from typing import ContextManager, TextIO
2016

2117
import argcomplete
18+
import validators
2219
from argcomplete.completers import FilesCompleter
2320

24-
from .. import __name__ as MODULE_NAME, __version__ as MODULE_VERSION
25-
from ..ble.lwp3 import LWP3_BOOTLOADER_SERVICE_UUID
26-
from ..ble.lwp3.bytecodes import HubKind
27-
21+
from .. import __name__ as MODULE_NAME
22+
from .. import __version__ as MODULE_VERSION
2823

2924
PROG_NAME = (
3025
f"{path.basename(sys.executable)} -m {MODULE_NAME}"
@@ -170,7 +165,7 @@ def add_parser(self, subparsers: argparse._SubParsersAction):
170165

171166
async def run(self, args: argparse.Namespace):
172167
from ..ble import find_device
173-
from ..connections import PybricksHub, EV3Connection, REPLHub
168+
from ..connections import EV3Connection, PybricksHub, REPLHub
174169

175170
# Pick the right connection
176171
if args.conntype == "ssh":
@@ -223,64 +218,10 @@ def add_parser(self, subparsers: argparse._SubParsersAction):
223218
"-n", "--name", metavar="<name>", type=str, help="a custom name for the hub"
224219
)
225220

226-
async def run(self, args: argparse.Namespace):
227-
from ..flash import create_firmware
228-
229-
print("Creating firmware")
230-
firmware, metadata = await create_firmware(args.firmware, args.name)
231-
232-
if metadata["device-id"] in (HubKind.TECHNIC_SMALL, HubKind.TECHNIC_LARGE):
233-
from ..dfu import flash_dfu
234-
from ..connections import REPLHub
235-
236-
try:
237-
# Connect to the hub and exit the runtime.
238-
hub = REPLHub()
239-
await hub.connect()
240-
await hub.reset_hub()
241-
242-
# Upload installation script.
243-
archive = zipfile.ZipFile(args.firmware)
244-
await hub.exec_line("import uos; uos.mkdir('_firmware')")
245-
await hub.upload_file(
246-
"_firmware/install_pybricks.py",
247-
bytearray(archive.open("install_pybricks.py").read()),
248-
)
249-
250-
# Upload metadata.
251-
await hub.upload_file(
252-
"_firmware/firmware.metadata.json",
253-
json.dumps(metadata, indent=4).encode(),
254-
)
255-
256-
# Upload Pybricks firmware
257-
await hub.upload_file("_firmware/firmware.bin", firmware)
258-
259-
# Run installation script
260-
print("Installing firmware")
261-
await hub.exec_line("from _firmware.install_pybricks import install")
262-
await hub.exec_paste_mode("install()")
263-
264-
except OSError:
265-
print("Could not find hub in standard firmware mode. Trying DFU.")
266-
flash_dfu(firmware, metadata)
267-
else:
268-
from ..ble import find_device
269-
from ..flash import BootloaderConnection
270-
271-
print("Searching for LEGO Bootloader...")
272-
273-
try:
274-
device = await find_device(service=LWP3_BOOTLOADER_SERVICE_UUID)
275-
except asyncio.TimeoutError:
276-
print("timed out")
277-
return
221+
def run(self, args: argparse.Namespace):
222+
from .flash import flash_firmware
278223

279-
print("Found:", device)
280-
updater = BootloaderConnection()
281-
await updater.connect(device)
282-
print("Erasing flash and starting update")
283-
await updater.flash(firmware, metadata)
224+
return flash_firmware(args.firmware, args.name)
284225

285226

286227
class DFUBackup(Tool):
@@ -352,7 +293,7 @@ def add_parser(self, subparsers: argparse._SubParsersAction):
352293
parser.tool = self
353294

354295
def run(self, args: argparse.Namespace):
355-
from .lwp3.repl import setup_repl_logging, repl
296+
from .lwp3.repl import repl, setup_repl_logging
356297

357298
setup_repl_logging()
358299
return repl()
@@ -387,6 +328,7 @@ def add_parser(self, subparsers: argparse._SubParsersAction):
387328

388329
async def run(self, args: argparse.Namespace):
389330
from importlib.resources import read_text
331+
390332
from .. import resources
391333

392334
print(read_text(resources, resources.UDEV_RULES))

pybricksdev/cli/flash.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# SPDX-License-Identifier: MIT
2+
# Copyright (c) 2019-2022 The Pybricks Authors
3+
4+
import asyncio
5+
import json
6+
import zipfile
7+
from typing import BinaryIO, Optional
8+
9+
from ..ble import find_device
10+
from ..ble.lwp3 import LWP3_BOOTLOADER_SERVICE_UUID
11+
from ..ble.lwp3.bytecodes import HubKind
12+
from ..connections import REPLHub
13+
from ..dfu import flash_dfu
14+
from ..flash import BootloaderConnection, create_firmware
15+
16+
17+
async def flash_firmware(firmware_zip: BinaryIO, hub_name: Optional[str]) -> None:
18+
"""
19+
Command line tool for flasing firmware.
20+
21+
Args:
22+
firmware_zip: The path to the ``firmware.zip`` file.
23+
hub_name: Optional custom hub name.
24+
"""
25+
26+
print("Creating firmware...")
27+
28+
firmware, metadata = await create_firmware(firmware_zip, hub_name)
29+
30+
if metadata["device-id"] in (HubKind.TECHNIC_SMALL, HubKind.TECHNIC_LARGE):
31+
try:
32+
# Connect to the hub and exit the runtime.
33+
hub = REPLHub()
34+
await hub.connect()
35+
await hub.reset_hub()
36+
37+
# Upload installation script.
38+
archive = zipfile.ZipFile(firmware)
39+
await hub.exec_line("import uos; uos.mkdir('_firmware')")
40+
await hub.upload_file(
41+
"_firmware/install_pybricks.py",
42+
bytearray(archive.open("install_pybricks.py").read()),
43+
)
44+
45+
# Upload metadata.
46+
await hub.upload_file(
47+
"_firmware/firmware.metadata.json",
48+
json.dumps(metadata, indent=4).encode(),
49+
)
50+
51+
# Upload Pybricks firmware
52+
await hub.upload_file("_firmware/firmware.bin", firmware)
53+
54+
# Run installation script
55+
print("Installing firmware")
56+
await hub.exec_line("from _firmware.install_pybricks import install")
57+
await hub.exec_paste_mode("install()")
58+
59+
except OSError:
60+
print("Could not find hub in standard firmware mode. Trying DFU.")
61+
flash_dfu(firmware, metadata)
62+
else:
63+
print("Searching for LEGO Bootloader...")
64+
65+
try:
66+
device = await find_device(service=LWP3_BOOTLOADER_SERVICE_UUID)
67+
except asyncio.TimeoutError:
68+
print("timed out")
69+
return
70+
71+
print("Found:", device)
72+
updater = BootloaderConnection()
73+
await updater.connect(device)
74+
print("Erasing flash and starting update")
75+
await updater.flash(firmware, metadata)

0 commit comments

Comments
 (0)