Skip to content

Commit 343f9d7

Browse files
committed
Refactor python _read_message, fix bugs
Previously the DeviceBus._read_message function had bugs with consecutive empty messages (should rarely happen, but should still be handled appropriately) or with two delimeters at the end of a buffer (happens when one message ends almost at the end of the buffer and another begins at the last byte of the buffer, which should be rare but is more common with subscriptions). This refactor should be more robust, and I believe also faster but I haven't benchmarked it. In the process, make the entire function work on bytearrays instead of converting to string early; this shouldn't make a difference with the JSON being all ASCII, but can if the JSON is ever serialized in UTF-8 instead of using escape sequences.
1 parent cac3692 commit 343f9d7

1 file changed

Lines changed: 50 additions & 34 deletions

File tree

src/main/scripts/lib/micropython/devices.py

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,18 @@ def __str__(self):
5858

5959

6060
class DeviceBus:
61-
MESSAGE_DELIMITER = "\0"
61+
MESSAGE_DELIMITER = b'\0'
6262

6363
def __init__(self, path):
6464
self.file = io.open(path, "+b")
6565
os.system("stty -F %s raw -echo" % path)
6666
self.poll = select.poll()
6767
self.poll.register(self.file.fileno(), select.POLLIN)
68-
self.buffer = None
69-
self.buffer_pos = 0
68+
self._clear_buffer()
7069

7170
def close(self):
7271
self.file.close()
73-
self.buffer = None
72+
self._clear_buffer()
7473

7574
def flush(self):
7675
self._clear_buffer()
@@ -118,47 +117,64 @@ def _write_message(self, data):
118117
self.file.write(self.MESSAGE_DELIMITER + json.dumps(data) + self.MESSAGE_DELIMITER)
119118

120119
def _read_message(self, expected_type):
121-
message = ""
120+
'''Read a message from the bus, blocking if necessary
121+
122+
@param expected_type: The type of message we expect to see (eg. results)
123+
'''
124+
125+
message = b""
126+
127+
# Skip leading delimiters
122128
while True:
123-
if self.buffer is None:
129+
if self._buffer_remaining() == 0:
124130
self._fill_buffer()
125131

126-
value = self.buffer.decode()[self.buffer_pos:]
132+
while self._buffer_remaining() and self._buffer[self._buffer_pos] in self.MESSAGE_DELIMITER:
133+
self._buffer_pos += 1
134+
135+
if self._buffer_remaining():
136+
break
127137

128-
if len(message) == 0 and value[0] == self.MESSAGE_DELIMITER:
129-
self.buffer_pos += 1
130-
value = value[1:]
138+
# Rest of the buffer should have at least one non-delim byte
139+
# Read full message
140+
while (next_delim_pos := self._buffer.find(self.MESSAGE_DELIMITER, self._buffer_pos)) == -1:
141+
message += self._read_buffer()
142+
self._fill_buffer()
131143

132-
if value.find(self.MESSAGE_DELIMITER) != -1:
133-
value = value[:value.find(self.MESSAGE_DELIMITER) + 1]
134-
self.buffer_pos += len(value)
135-
if self.buffer_pos >= len(self.buffer):
136-
self._clear_buffer()
144+
message += self._read_buffer(next_delim_pos)
145+
146+
# parse message
147+
data = json.loads(message)
148+
if data["type"] == expected_type:
149+
if "data" in data:
150+
return data["data"]
137151
else:
138-
self._clear_buffer()
139-
140-
message += value
141-
142-
if message[-1] == self.MESSAGE_DELIMITER:
143-
data = json.loads(message)
144-
if data["type"] == expected_type:
145-
if "data" in data:
146-
return data["data"]
147-
else:
148-
return
149-
elif data["type"] == "error":
150-
raise Exception(data["data"])
151-
else:
152-
raise Exception("unexpected message type: %s" % data["type"])
152+
return
153+
elif data["type"] == "error":
154+
raise Exception(data["data"])
155+
else:
156+
raise Exception("unexpected message type: %s" % data["type"])
157+
158+
def _buffer_remaining(self):
159+
return len(self._buffer) - self._buffer_pos
160+
161+
def _read_buffer(self, end=None):
162+
if end is None:
163+
end = len(self._buffer)
164+
165+
old_pos = self._buffer_pos
166+
self._buffer_pos = end
167+
return self._buffer[old_pos:end]
153168

154169
def _clear_buffer(self):
155-
self.buffer = None
156-
self.buffer_pos = 0
170+
self._buffer = bytearray()
171+
self._buffer_pos = 0
157172

158173
def _fill_buffer(self):
174+
assert self._buffer_remaining() == 0
159175
self.poll.poll() # Blocking wait until we have some data.
160-
self.buffer = self._read(1024)
161-
self.buffer_pos = 0
176+
self._buffer = self._read(1024)
177+
self._buffer_pos = 0
162178

163179
def _read(self, limit):
164180
# This is horrible, but don't know how to know how many bytes are available,

0 commit comments

Comments
 (0)