-
|
Up until pybricksdev 1.0.0a24 I've been using this loop to process communication from the hub: async def output_loop(self):
print("starting output handler loop")
while self.hub.program_running:
while self.hub.output:
raw = self.hub.output.pop(0)
await self.handle_output(raw)
await asyncio.sleep(0.05)
print("output loop finished!")However, after upgrading to 1.0.0a25 it seems the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
|
It doesn't look like we have created a separate event stream (observable) for "user program output". We just have import asyncio
import os
import tempfile
from pybricksdev.ble import find_device
from pybricksdev.connections.pybricks import PybricksHub
PROGRAM = """
from pybricks import version
from pybricks.tools import wait
print(version)
wait(1000)
print('bye')
"""
async def main():
device = await find_device()
hub = PybricksHub()
def handle_output():
while hub.output:
line = hub.output.pop(0)
print("received:", line)
def handle_nus(ignored):
# the call to handle_output() has to be deferred until after the
# internal line_handler has run, otherwise we will miss the last
# line of output
asyncio.get_running_loop().call_soon(handle_output)
await hub.connect(device)
try:
# this subscribes to all "raw" Nordic UART Service events, which may
# be checksums during download and run or the output from a user program
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
try:
f.write(PROGRAM)
f.close()
with hub.nus_observable.subscribe(handle_nus):
# running with output printing disabled lets us handle the output
await hub.run(f.name, print_output=False)
finally:
os.unlink(f.name)
finally:
await hub.disconnect()
asyncio.run(main())The nicer API that we don't have yet might look something like this: ...
async def main():
device = await find_device()
hub = PybricksHub()
def handle_output(line):
print("received:", line)
await hub.connect(device)
try:
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
try:
f.write(PROGRAM)
f.close()
with hub.output_line_observable.subscribe(handle_output):
# running with output printing disabled lets us handle the output
await hub.run(f.name, print_output=False)
finally:
os.unlink(f.name)
finally:
await hub.disconnect()
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
It doesn't look like we have created a separate event stream (observable) for "user program output". We just have
nus_observablefor now, which also includes checksum data while downloading the program. But for now, you can do something like this: