Skip to content

Commit d408f33

Browse files
committed
connections: use rxpy to monitor user program running status
This move the logic for monitoring the user program running status for waiting for a program to finish to the run() method. This fixes a race condition where a hub status message was received after the program was downloaded but before the user program running status flag was set. Fixes: #28
1 parent a0c676f commit d408f33

File tree

2 files changed

+46
-25
lines changed

2 files changed

+46
-25
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
## Fixed
10+
- Fixed race condition with `pybricksdev run ble` not waiting for program to
11+
finish before disconnecting ([pybricksdev#28]).
12+
13+
[pybricksdev#28]: https://github.com/pybricks/pybricksdev/issues/28
14+
915
## [1.0.0-alpha.24] - 2022-01-25
1016

1117
### Fixed

pybricksdev/connections.py

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88

99
import asyncssh
1010
import semver
11+
import rx.operators as op
1112
from bleak import BleakClient
1213
from bleak.backends.device import BLEDevice
1314
from serial.tools import list_ports
1415
from serial import Serial
1516
from tqdm.auto import tqdm
1617
from tqdm.contrib.logging import logging_redirect_tqdm
17-
from rx.subject import Subject
18+
from rx.subject import Subject, BehaviorSubject
1819

1920
from .ble.lwp3.bytecodes import HubKind
2021
from .ble.nus import NUS_RX_UUID, NUS_TX_UUID
@@ -152,6 +153,7 @@ class PybricksHub:
152153
EOL = b"\r\n" # MicroPython EOL
153154

154155
def __init__(self):
156+
self.status_observable = BehaviorSubject(StatusFlag(0))
155157
self.nus_observable = Subject()
156158
self.stream_buf = bytearray()
157159
self.output = []
@@ -162,10 +164,6 @@ def __init__(self):
162164

163165
# indicates is we are currently downloading a program
164166
self.loading = False
165-
# indicates that the user program is running
166-
self.program_running = False
167-
# used to notify when the user program has ended
168-
self.user_program_stopped = asyncio.Event()
169167

170168
self.hub_kind: HubKind
171169
self.hub_variant: int
@@ -246,19 +244,8 @@ def nus_handler(self, sender, data):
246244
def pybricks_service_handler(self, _: int, data: bytes) -> None:
247245
if data[0] == Event.STATUS_REPORT:
248246
# decode the payload
249-
(flags,) = struct.unpack("<I", data[1:])
250-
program_running_now = bool(flags & StatusFlag.USER_PROGRAM_RUNNING)
251-
252-
# If we are currently downloading a program, we must ignore user
253-
# program running state changes, otherwise the checksum will be
254-
# sent to the terminal instead of being handled by the download
255-
# algorithm
256-
if not self.loading:
257-
if self.program_running != program_running_now:
258-
logger.info(f"Program running: {program_running_now}")
259-
self.program_running = program_running_now
260-
if not program_running_now:
261-
self.user_program_stopped.set()
247+
(flags,) = struct.unpack_from("<I", data, 1)
248+
self.status_observable.on_next(StatusFlag(flags))
262249

263250
async def connect(self, device: BLEDevice):
264251
"""Connects to a device that was discovered with :meth:`pybricksdev.ble.find_device`
@@ -327,7 +314,6 @@ async def run(self, py_path, wait=True, print_output=True):
327314

328315
try:
329316
self.loading = True
330-
self.user_program_stopped.clear()
331317

332318
queue = asyncio.Queue()
333319
subscription = self.nus_observable.subscribe(
@@ -377,12 +363,41 @@ async def send_block(data: bytes) -> None:
377363
self.loading = False
378364

379365
if wait:
380-
await self.user_program_stopped.wait()
381-
# sleep is a hack to receive all output from user program since
382-
# the firmware currently doesn't flush the buffer before clearing
383-
# the user program running status flag
384-
# https://github.com/pybricks/support/issues/305
385-
await asyncio.sleep(0.3)
366+
user_program_running = asyncio.Queue()
367+
368+
with self.status_observable.pipe(
369+
op.map(lambda s: s & StatusFlag.USER_PROGRAM_RUNNING),
370+
op.distinct_until_changed(),
371+
).subscribe(lambda s: user_program_running.put_nowait(s)):
372+
373+
# The first item in the queue is the current status. The status
374+
# could change before or after the last checksum is received,
375+
# so this could be truthy or falsy.
376+
is_running = await user_program_running.get()
377+
378+
if not is_running:
379+
# if the program has not already started, wait a short time
380+
# for it to start
381+
try:
382+
await asyncio.wait_for(user_program_running.get(), 0.2)
383+
except asyncio.TimeoutError:
384+
# if it doesn't start, assume it was a very short lived
385+
# program and we just missed the status message
386+
return
387+
388+
# At this point, we know the user program is running, so the
389+
# next item in the queue must indicate that the user program
390+
# has stopped.
391+
is_running = await user_program_running.get()
392+
393+
# maybe catch mistake if the code is changed
394+
assert not is_running
395+
396+
# sleep is a hack to receive all output from user program since
397+
# the firmware currently doesn't flush the buffer before clearing
398+
# the user program running status flag
399+
# https://github.com/pybricks/support/issues/305
400+
await asyncio.sleep(0.3)
386401

387402

388403
FILE_PACKET_SIZE = 1024

0 commit comments

Comments
 (0)