Skip to content

Commit f24ab3a

Browse files
authored
cli: Add a new command for downloading program without running it.
Sometimes, it's useful to upload the program to hubs without running it, especially when hubs are in complex environments and require manual start for safety and convenience purposes.
1 parent 5f44290 commit f24ab3a

File tree

5 files changed

+633
-24
lines changed

5 files changed

+633
-24
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Added
10+
- Added `pybricksdev download` command to download Python scripts to hubs without running them.
11+
Supports BLE, USB, and SSH connections. ([pybricksdev#107])
12+
13+
[pybricksdev#107]: https://github.com/pybricks/pybricksdev/issues/107
14+
915
## [1.0.1] - 2025-02-20
1016

1117
### Fixed

pybricksdev/cli/__init__.py

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,100 @@ def is_pybricks_usb(dev):
236236
await hub.disconnect()
237237

238238

239+
class Download(Tool):
240+
def add_parser(self, subparsers: argparse._SubParsersAction):
241+
parser = subparsers.add_parser(
242+
"download",
243+
help="upload a Pybricks program without running it",
244+
)
245+
parser.tool = self
246+
parser.add_argument(
247+
"conntype",
248+
metavar="<connection type>",
249+
help="connection type: %(choices)s",
250+
choices=["ble", "usb", "ssh"],
251+
)
252+
parser.add_argument(
253+
"file",
254+
metavar="<file>",
255+
help="path to a MicroPython script or `-` for stdin",
256+
type=argparse.FileType(),
257+
)
258+
parser.add_argument(
259+
"-n",
260+
"--name",
261+
metavar="<name>",
262+
required=False,
263+
help="hostname or IP address for SSH connection; "
264+
"Bluetooth device name or Bluetooth address for BLE connection; "
265+
"serial port name for USB connection",
266+
)
267+
268+
async def run(self, args: argparse.Namespace):
269+
# Pick the right connection
270+
if args.conntype == "ssh":
271+
from pybricksdev.connections.ev3dev import EV3Connection
272+
273+
# So it's an ev3dev
274+
if args.name is None:
275+
print("--name is required for SSH connections", file=sys.stderr)
276+
exit(1)
277+
278+
device_or_address = socket.gethostbyname(args.name)
279+
hub = EV3Connection(device_or_address)
280+
elif args.conntype == "ble":
281+
from pybricksdev.ble import find_device as find_ble
282+
from pybricksdev.connections.pybricks import PybricksHubBLE
283+
284+
# It is a Pybricks Hub with BLE. Device name or address is given.
285+
print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
286+
device_or_address = await find_ble(args.name)
287+
hub = PybricksHubBLE(device_or_address)
288+
elif args.conntype == "usb":
289+
from usb.core import find as find_usb
290+
291+
from pybricksdev.connections.pybricks import PybricksHubUSB
292+
from pybricksdev.usb import (
293+
LEGO_USB_VID,
294+
MINDSTORMS_INVENTOR_USB_PID,
295+
SPIKE_ESSENTIAL_USB_PID,
296+
SPIKE_PRIME_USB_PID,
297+
)
298+
299+
def is_pybricks_usb(dev):
300+
return (
301+
(dev.idVendor == LEGO_USB_VID)
302+
and (
303+
dev.idProduct
304+
in [
305+
SPIKE_PRIME_USB_PID,
306+
SPIKE_ESSENTIAL_USB_PID,
307+
MINDSTORMS_INVENTOR_USB_PID,
308+
]
309+
)
310+
and dev.product.endswith("Pybricks")
311+
)
312+
313+
device_or_address = find_usb(custom_match=is_pybricks_usb)
314+
315+
if device_or_address is not None:
316+
hub = PybricksHubUSB(device_or_address)
317+
else:
318+
from pybricksdev.connections.lego import REPLHub
319+
320+
hub = REPLHub()
321+
else:
322+
raise ValueError(f"Unknown connection type: {args.conntype}")
323+
324+
# Connect to the address and upload the script without running it
325+
await hub.connect()
326+
try:
327+
with _get_script_path(args.file) as script_path:
328+
await hub.download(script_path)
329+
finally:
330+
await hub.disconnect()
331+
332+
239333
class Flash(Tool):
240334
def add_parser(self, subparsers: argparse._SubParsersAction):
241335
parser = subparsers.add_parser(
@@ -459,7 +553,7 @@ def main():
459553
help="the tool to use",
460554
)
461555

462-
for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
556+
for tool in Compile(), Run(), Download(), Flash(), DFU(), OAD(), LWP3(), Udev():
463557
tool.add_parser(subparsers)
464558

465559
argcomplete.autocomplete(parser)

pybricksdev/connections/pybricks.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,50 @@ async def stop_user_program(self) -> None:
466466
response=True,
467467
)
468468

469+
async def download(self, script_path: str) -> None:
470+
"""
471+
Downloads a script to the hub without running it.
472+
473+
This method handles both compilation and downloading of the script.
474+
For Pybricks hubs, it compiles the script to MPY format and downloads it
475+
using the Pybricks protocol.
476+
477+
Args:
478+
script_path: Path to the Python script to download.
479+
480+
Raises:
481+
RuntimeError: If the hub is not connected or if the hub type is not supported.
482+
ValueError: If the compiled program is too large to fit on the hub.
483+
"""
484+
if self.connection_state_observable.value != ConnectionState.CONNECTED:
485+
raise RuntimeError("not connected")
486+
487+
# since Pybricks profile v1.2.0, the hub will tell us which file format(s) it supports
488+
if not (
489+
self._capability_flags
490+
& (
491+
HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6
492+
| HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6_1_NATIVE
493+
)
494+
):
495+
raise RuntimeError(
496+
"Hub is not compatible with any of the supported file formats"
497+
)
498+
499+
# no support for native modules unless one of the flags below is set
500+
abi = 6
501+
502+
if (
503+
self._capability_flags
504+
& HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6_1_NATIVE
505+
):
506+
abi = (6, 1)
507+
508+
# Compile the script to mpy format
509+
mpy = await compile_multi_file(script_path, abi)
510+
# Download without running
511+
await self.download_user_program(mpy)
512+
469513
async def run(
470514
self,
471515
py_path: Optional[str] = None,
@@ -506,31 +550,11 @@ async def run(
506550
await self._legacy_run(py_path, wait)
507551
return
508552

509-
# since Pybricks profile v1.2.0, the hub will tell us which file format(s) it supports
510-
if not (
511-
self._capability_flags
512-
& (
513-
HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6
514-
| HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6_1_NATIVE
515-
)
516-
):
517-
raise RuntimeError(
518-
"Hub is not compatible with any of the supported file formats"
519-
)
520-
521-
# no support for native modules unless one of the flags below is set
522-
abi = 6
523-
524-
if (
525-
self._capability_flags
526-
& HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6_1_NATIVE
527-
):
528-
abi = (6, 1)
529-
553+
# Download the program if a path is provided
530554
if py_path is not None:
531-
mpy = await compile_multi_file(py_path, abi)
532-
await self.download_user_program(mpy)
555+
await self.download(py_path)
533556

557+
# Start the program
534558
await self.start_user_program()
535559

536560
if wait:

tests/connections/test_pybricks.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""Tests for the pybricks connection module."""
2+
3+
import asyncio
4+
import contextlib
5+
import os
6+
import tempfile
7+
from unittest.mock import AsyncMock, PropertyMock, patch
8+
9+
import pytest
10+
from reactivex.subject import Subject
11+
12+
from pybricksdev.connections.pybricks import (
13+
ConnectionState,
14+
HubCapabilityFlag,
15+
HubKind,
16+
PybricksHubBLE,
17+
StatusFlag,
18+
)
19+
20+
21+
class TestPybricksHub:
22+
"""Tests for the PybricksHub base class functionality."""
23+
24+
@pytest.mark.asyncio
25+
async def test_download_modern_protocol(self):
26+
"""Test downloading with modern protocol and capability flags."""
27+
hub = PybricksHubBLE("mock_device")
28+
hub._mpy_abi_version = 6
29+
hub._client = AsyncMock()
30+
hub.get_capabilities = AsyncMock(return_value={"pybricks": {"mpy": True}})
31+
hub.download_user_program = AsyncMock()
32+
type(hub.connection_state_observable).value = PropertyMock(
33+
return_value=ConnectionState.CONNECTED
34+
)
35+
hub._capability_flags = HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6
36+
37+
with contextlib.ExitStack() as stack:
38+
# Create and manage temporary file
39+
temp = stack.enter_context(
40+
tempfile.NamedTemporaryFile(suffix=".py", mode="w+", delete=False)
41+
)
42+
temp.write("print('test')")
43+
temp_path = temp.name
44+
stack.callback(os.unlink, temp_path)
45+
46+
await hub.download(temp_path)
47+
hub.download_user_program.assert_called_once()
48+
49+
@pytest.mark.asyncio
50+
async def test_download_legacy_firmware(self):
51+
"""Test downloading with legacy firmware."""
52+
hub = PybricksHubBLE("mock_device")
53+
hub._mpy_abi_version = None # Legacy firmware
54+
hub._client = AsyncMock()
55+
hub.download_user_program = AsyncMock()
56+
hub.hub_kind = HubKind.BOOST
57+
type(hub.connection_state_observable).value = PropertyMock(
58+
return_value=ConnectionState.CONNECTED
59+
)
60+
hub._capability_flags = HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6
61+
62+
with contextlib.ExitStack() as stack:
63+
# Create and manage temporary file
64+
temp = stack.enter_context(
65+
tempfile.NamedTemporaryFile(suffix=".py", mode="w+", delete=False)
66+
)
67+
temp.write("print('test')")
68+
temp_path = temp.name
69+
stack.callback(os.unlink, temp_path)
70+
71+
await hub.download(temp_path)
72+
hub.download_user_program.assert_called_once()
73+
74+
@pytest.mark.asyncio
75+
async def test_download_unsupported_capabilities(self):
76+
"""Test downloading when hub doesn't support required capabilities."""
77+
hub = PybricksHubBLE("mock_device")
78+
hub._mpy_abi_version = 6
79+
hub._client = AsyncMock()
80+
hub.get_capabilities = AsyncMock(return_value={"pybricks": {"mpy": False}})
81+
type(hub.connection_state_observable).value = PropertyMock(
82+
return_value=ConnectionState.CONNECTED
83+
)
84+
hub._capability_flags = 0
85+
86+
with contextlib.ExitStack() as stack:
87+
# Create and manage temporary file
88+
temp = stack.enter_context(
89+
tempfile.NamedTemporaryFile(suffix=".py", mode="w+", delete=False)
90+
)
91+
temp.write("print('test')")
92+
temp_path = temp.name
93+
stack.callback(os.unlink, temp_path)
94+
95+
with pytest.raises(
96+
RuntimeError,
97+
match="Hub is not compatible with any of the supported file formats",
98+
):
99+
await hub.download(temp_path)
100+
101+
@pytest.mark.asyncio
102+
async def test_download_compile_error(self):
103+
"""Test handling compilation errors."""
104+
hub = PybricksHubBLE("mock_device")
105+
hub._mpy_abi_version = 6
106+
hub._client = AsyncMock()
107+
hub.get_capabilities = AsyncMock(return_value={"pybricks": {"mpy": True}})
108+
type(hub.connection_state_observable).value = PropertyMock(
109+
return_value=ConnectionState.CONNECTED
110+
)
111+
hub._capability_flags = HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6
112+
hub._max_user_program_size = 1000 # Set a reasonable size limit
113+
114+
with contextlib.ExitStack() as stack:
115+
# Create and manage temporary file
116+
temp = stack.enter_context(
117+
tempfile.NamedTemporaryFile(suffix=".py", mode="w+", delete=False)
118+
)
119+
temp.write("print('test' # Missing closing parenthesis")
120+
temp_path = temp.name
121+
stack.callback(os.unlink, temp_path)
122+
123+
# Mock compile_multi_file to raise SyntaxError
124+
stack.enter_context(
125+
patch(
126+
"pybricksdev.connections.pybricks.compile_multi_file",
127+
side_effect=SyntaxError("invalid syntax"),
128+
)
129+
)
130+
131+
with pytest.raises(SyntaxError, match="invalid syntax"):
132+
await hub.download(temp_path)
133+
134+
@pytest.mark.asyncio
135+
async def test_run_modern_protocol(self):
136+
"""Test running a program with modern protocol."""
137+
hub = PybricksHubBLE("mock_device")
138+
hub._mpy_abi_version = None # Use modern protocol
139+
hub._client = AsyncMock()
140+
hub.client = AsyncMock()
141+
hub.get_capabilities = AsyncMock(return_value={"pybricks": {"mpy": True}})
142+
hub.download_user_program = AsyncMock()
143+
hub.start_user_program = AsyncMock()
144+
hub.write_gatt_char = AsyncMock()
145+
type(hub.connection_state_observable).value = PropertyMock(
146+
return_value=ConnectionState.CONNECTED
147+
)
148+
hub._capability_flags = HubCapabilityFlag.USER_PROG_MULTI_FILE_MPY6
149+
hub.hub_kind = HubKind.BOOST
150+
151+
# Mock the status observable to simulate program start and stop
152+
status_subject = Subject()
153+
hub.status_observable = status_subject
154+
hub._stdout_line_queue = asyncio.Queue()
155+
hub._enable_line_handler = True
156+
157+
with contextlib.ExitStack() as stack:
158+
# Create and manage temporary file
159+
temp = stack.enter_context(
160+
tempfile.NamedTemporaryFile(suffix=".py", mode="w+", delete=False)
161+
)
162+
temp.write("print('test')")
163+
temp_path = temp.name
164+
stack.callback(os.unlink, temp_path)
165+
166+
# Start the run task
167+
run_task = asyncio.create_task(hub.run(temp_path))
168+
169+
# Simulate program start
170+
await asyncio.sleep(0.1)
171+
status_subject.on_next(StatusFlag.USER_PROGRAM_RUNNING)
172+
173+
# Simulate program stop after a short delay
174+
await asyncio.sleep(0.1)
175+
status_subject.on_next(0) # Clear all flags
176+
177+
# Wait for run task to complete
178+
await run_task
179+
180+
# Verify the expected calls were made
181+
hub.download_user_program.assert_called_once()
182+
hub.start_user_program.assert_called_once()

0 commit comments

Comments
 (0)