Skip to content
Merged
110 changes: 109 additions & 1 deletion pybricksdev/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,114 @@ def is_pybricks_usb(dev):
await hub.disconnect()


class Push(Tool):
def add_parser(self, subparsers: argparse._SubParsersAction):
parser = subparsers.add_parser(
"push",
help="upload a Pybricks program without running it",
)
parser.tool = self
parser.add_argument(
"conntype",
metavar="<connection type>",
help="connection type: %(choices)s",
choices=["ble", "usb", "ssh"],
)
parser.add_argument(
"file",
metavar="<file>",
help="path to a MicroPython script or `-` for stdin",
type=argparse.FileType(),
)
parser.add_argument(
"-n",
"--name",
metavar="<name>",
required=False,
help="hostname or IP address for SSH connection; "
"Bluetooth device name or Bluetooth address for BLE connection; "
"serial port name for USB connection",
)

async def run(self, args: argparse.Namespace):
# Pick the right connection
if args.conntype == "ssh":
from pybricksdev.connections.ev3dev import EV3Connection

# So it's an ev3dev
if args.name is None:
print("--name is required for SSH connections", file=sys.stderr)
exit(1)

device_or_address = socket.gethostbyname(args.name)
hub = EV3Connection(device_or_address)
elif args.conntype == "ble":
from pybricksdev.ble import find_device as find_ble
from pybricksdev.connections.pybricks import PybricksHubBLE

# It is a Pybricks Hub with BLE. Device name or address is given.
print(f"Searching for {args.name or 'any hub with Pybricks service'}...")
device_or_address = await find_ble(args.name)
hub = PybricksHubBLE(device_or_address)
elif args.conntype == "usb":
from usb.core import find as find_usb

from pybricksdev.connections.pybricks import PybricksHubUSB
from pybricksdev.usb import (
LEGO_USB_VID,
MINDSTORMS_INVENTOR_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
SPIKE_PRIME_USB_PID,
)

def is_pybricks_usb(dev):
return (
(dev.idVendor == LEGO_USB_VID)
and (
dev.idProduct
in [
SPIKE_PRIME_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
MINDSTORMS_INVENTOR_USB_PID,
]
)
and dev.product.endswith("Pybricks")
)

device_or_address = find_usb(custom_match=is_pybricks_usb)

if device_or_address is not None:
hub = PybricksHubUSB(device_or_address)
else:
from pybricksdev.connections.lego import REPLHub

hub = REPLHub()
else:
raise ValueError(f"Unknown connection type: {args.conntype}")

# Connect to the address and upload the script without running it
await hub.connect()
try:
with _get_script_path(args.file) as script_path:
# For PybricksHub, we use download_user_program to just upload without running
if args.conntype in ["ble", "usb"]:
from pybricksdev.compile import compile_multi_file

# Compile the script to mpy format
mpy = await compile_multi_file(
script_path, hub._mpy_abi_version or 6
)
# Upload without running
await hub.download_user_program(mpy)
# For EV3Connection, we just use download
elif args.conntype == "ssh":
await hub.download(script_path)
else:
raise RuntimeError("Unsupported hub type for push command")
finally:
await hub.disconnect()


class Flash(Tool):
def add_parser(self, subparsers: argparse._SubParsersAction):
parser = subparsers.add_parser(
Expand Down Expand Up @@ -459,7 +567,7 @@ def main():
help="the tool to use",
)

for tool in Compile(), Run(), Flash(), DFU(), OAD(), LWP3(), Udev():
for tool in Compile(), Run(), Push(), Flash(), DFU(), OAD(), LWP3(), Udev():
tool.add_parser(subparsers)

argcomplete.autocomplete(parser)
Expand Down
Loading