Skip to content

Commit 0bec30d

Browse files
committed
Added async support and test cases.
1 parent d131ead commit 0bec30d

File tree

5 files changed

+222
-9
lines changed

5 files changed

+222
-9
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,3 @@ Nothing in this package is very complicated, please have a crack and help me to
6767
- Add the ability to pack messages into packets for two way communications
6868
- Add more and better tests
6969
- Add Field type RU1_3
70-
- Add async support

ubxtranslator/core.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -392,13 +392,6 @@ def register_cls(self, cls: Cls):
392392
"""Register a message class."""
393393
self.classes[cls.id_] = cls
394394

395-
async def async_receive_from(self, stream) -> namedtuple:
396-
"""Receive a message from a stream and return as a namedtuple.
397-
raise IOError in case of errors due to insufficient data.
398-
raise ValueError in case of errors due to sufficient but invalid data.
399-
"""
400-
pass
401-
402395
def receive_from(self, stream) -> namedtuple:
403396
"""Receive a message from a stream and return as a namedtuple.
404397
raise IOError in case of errors due to insufficient data.
@@ -448,6 +441,44 @@ def receive_from(self, stream) -> namedtuple:
448441

449442
return self.classes[msg_cls].parse(msg_id, buff[4:])
450443

444+
async def receive_from_async(self, stream) -> namedtuple:
445+
"""Async version of receive_from."""
446+
while True:
447+
# Search for the prefix
448+
buff = await self._read_until_async(stream, terminator=Parser.PREFIX)
449+
if buff[-2:] == Parser.PREFIX:
450+
break
451+
452+
# read the first four bytes
453+
buff = await stream.readexactly(4)
454+
455+
if len(buff) != 4:
456+
raise IOError("A stream read returned {} bytes, expected 4 bytes".format(len(buff)))
457+
458+
# convert them into the packet descriptors
459+
msg_cls, msg_id, length = struct.unpack("BBH", buff)
460+
461+
# check the packet validity
462+
if msg_cls not in self.classes:
463+
raise ValueError("Received unsupported message class of {:x}".format(msg_cls))
464+
if msg_id not in self.classes[msg_cls]:
465+
raise ValueError("Received unsupported message id of {:x} in class {:x}".format(msg_id, msg_cls))
466+
467+
# Read the payload
468+
payload = await stream.readexactly(length)
469+
full_msg_for_checksum = buff + payload
470+
471+
# Read the checksum
472+
checksum_sup = await stream.readexactly(2)
473+
checksum_cal = self._generate_fletcher_checksum(full_msg_for_checksum)
474+
475+
if checksum_cal != checksum_sup:
476+
raise ValueError("Checksum mismatch. Calculated {:x} {:x}, received {:x} {:x}".format(
477+
checksum_cal[0], checksum_cal[1], checksum_sup[0], checksum_sup[1]
478+
))
479+
480+
return self.classes[msg_cls].parse(msg_id, payload)
481+
451482
@staticmethod
452483
def _read_until(stream, terminator: bytes, size=None):
453484
"""Read from the stream until the terminator byte/s are read.
@@ -468,6 +499,24 @@ def _read_until(stream, terminator: bytes, size=None):
468499

469500
return bytes(line)
470501

502+
@staticmethod
503+
async def _read_until_async(stream, terminator: bytes, size=None):
504+
"""Async version of Parser._read_until."""
505+
term_len = len(terminator)
506+
line = bytearray()
507+
while True:
508+
c = await stream.read(1)
509+
if c:
510+
line += c
511+
if line[-term_len:] == terminator:
512+
break
513+
if size is not None and len(line) >= size:
514+
break
515+
else:
516+
break
517+
518+
return bytes(line)
519+
471520
@staticmethod
472521
def _generate_fletcher_checksum(payload: bytes) -> bytes:
473522
"""Return the checksum for the provided payload"""

ubxtranslator/examples/async.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""An example of a state class with an async parser using pyserial-asyncio.
2+
3+
You will need to change the port name to that of the port you want to connect to. Also make sure that the baud rate is
4+
correct and that the device has been setup to output UBX messages protocol to your desired port!
5+
"""
6+
7+
import asyncio
8+
from asyncio import Event
9+
10+
import serial_asyncio
11+
12+
from ubxtranslator.core import Parser
13+
from ubxtranslator.predefined import NAV_CLS, ACK_CLS
14+
15+
16+
class GnssService:
17+
def __init__(self, port, baud_rate):
18+
self.parser = Parser([NAV_CLS, ACK_CLS])
19+
self.port = port
20+
self.baud_rate = baud_rate
21+
self.last_message = None
22+
self.new_message_event = Event()
23+
24+
async def read_serial(self):
25+
"""Read from serial port asynchronously using pyserial-asyncio."""
26+
while True:
27+
try:
28+
reader, _ = await serial_asyncio.open_serial_connection(
29+
url=self.port, baudrate=self.baud_rate
30+
)
31+
print(f"Starting to listen for UBX packets on {self.port}")
32+
while True:
33+
try:
34+
msg = await self.parser.receive_from_async(reader)
35+
if msg:
36+
self.last_message = msg
37+
self.new_message_event.set()
38+
39+
except (ValueError, IOError, asyncio.IncompleteReadError) as e:
40+
print(f"Error parsing UBX message: {e}")
41+
continue
42+
43+
except Exception as e:
44+
print(f"Could not open serial port {self.port}: {e}")
45+
# Wait before trying to reconnect
46+
await asyncio.sleep(5)
47+
48+
async def print_last_message(self):
49+
"""An event based approach to consumption."""
50+
while True:
51+
await self.new_message_event.wait()
52+
print(self.last_message)
53+
self.new_message_event.clear()
54+
55+
async def run(self):
56+
"""Run the service."""
57+
await asyncio.gather(self.read_serial(), self.print_last_message())
58+

ubxtranslator/tests/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import unittest
22

3-
from . import test_core, test_fields
3+
from . import test_core, test_fields, test_async
44

55

66
def suite():
@@ -17,6 +17,9 @@ def suite():
1717
suite.addTest(test_core.UbxClsTester())
1818
suite.addTest(test_core.UbxParserTester())
1919

20+
# test async
21+
suite.addTest(test_async.UbxAsyncParserTester())
22+
2023
return suite
2124

2225

ubxtranslator/tests/test_async.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import unittest
2+
import asyncio
3+
from ubxtranslator.core import *
4+
5+
6+
class MockStreamReader:
7+
def __init__(self, data):
8+
self.data = data
9+
self.pos = 0
10+
11+
async def read(self, n):
12+
chunk = self.data[self.pos:self.pos + n]
13+
self.pos += len(chunk)
14+
return chunk
15+
16+
async def readexactly(self, n):
17+
chunk = self.data[self.pos:self.pos + n]
18+
if len(chunk) < n:
19+
raise asyncio.IncompleteReadError(chunk, n)
20+
self.pos += n
21+
return chunk
22+
23+
24+
class UbxAsyncParserTester(unittest.IsolatedAsyncioTestCase):
25+
async def test_parser_async(self):
26+
cls = Cls(1, 'TEST', [
27+
Message(1, 'TEST', [
28+
PadByte(),
29+
Field('F1', 'U1'),
30+
Field('F2', 'I1'),
31+
PadByte(),
32+
Field('F3', 'U1'),
33+
BitField('F4', 'X1', [
34+
Flag('SF1', 0, 4),
35+
Flag('SF2', 4, 8)
36+
])
37+
])
38+
])
39+
40+
parser = Parser([cls])
41+
42+
with self.subTest(msg='Test correct async usage'):
43+
test_packet = bytes([1, 1, 6, 0, 0, 1, 2, 3, 4, 5])
44+
test_packet = parser.PREFIX + test_packet + parser._generate_fletcher_checksum(test_packet)
45+
46+
test_stream = MockStreamReader(test_packet)
47+
48+
cls_name, msg_name, msg = await parser.receive_from_async(test_stream)
49+
50+
self.assertEqual(msg.F1, 1)
51+
self.assertEqual(msg.F2, 2)
52+
self.assertEqual(msg.F3, 4)
53+
self.assertEqual(msg.F4.SF1, 5)
54+
self.assertEqual(msg.F4.SF2, 0)
55+
56+
with self.subTest(msg='Test async bad class id'):
57+
test_packet = bytes([2, 1, 6, 0, 0, 1, 2, 3, 4, 5])
58+
test_packet = parser.PREFIX + test_packet + parser._generate_fletcher_checksum(test_packet)
59+
test_stream = MockStreamReader(test_packet)
60+
with self.assertRaises(ValueError):
61+
await parser.receive_from_async(test_stream)
62+
63+
with self.subTest(msg='Test async bad msg'):
64+
test_packet = bytes([1, 2, 6, 0, 0, 1, 2, 3, 4, 5])
65+
test_packet = parser.PREFIX + test_packet + parser._generate_fletcher_checksum(test_packet)
66+
test_stream = MockStreamReader(test_packet)
67+
with self.assertRaises(ValueError):
68+
await parser.receive_from_async(test_stream)
69+
70+
with self.subTest(msg='Test async bad checksum'):
71+
test_packet = bytes([1, 1, 6, 0, 0, 1, 2, 3, 4, 5])
72+
test_packet = parser.PREFIX + test_packet + bytes([0, 1])
73+
test_stream = MockStreamReader(test_packet)
74+
with self.assertRaises(ValueError):
75+
await parser.receive_from_async(test_stream)
76+
77+
with self.subTest(msg='Test async insufficient data'):
78+
test_packet = bytes([1, 1, 6, 0, 0, 1, 2, 3, 5])
79+
test_packet = parser.PREFIX + test_packet + parser._generate_fletcher_checksum(test_packet)
80+
test_stream = MockStreamReader(test_packet)
81+
with self.assertRaises(asyncio.IncompleteReadError):
82+
await parser.receive_from_async(test_stream)
83+
84+
async def test_read_until_async(self):
85+
data = b"some junk" + Parser.PREFIX + b"more data"
86+
stream = MockStreamReader(data)
87+
88+
# Test reading until PREFIX
89+
result = await Parser._read_until_async(stream, terminator=Parser.PREFIX)
90+
self.assertEqual(result, b"some junk" + Parser.PREFIX)
91+
92+
# Test reading with size limit
93+
stream = MockStreamReader(b"1234567890")
94+
result = await Parser._read_until_async(stream, terminator=b"XYZ", size=5)
95+
self.assertEqual(result, b"12345")
96+
97+
# Test reading until EOF
98+
stream = MockStreamReader(b"eof reach")
99+
result = await Parser._read_until_async(stream, terminator=b"NOT_HERE")
100+
self.assertEqual(result, b"eof reach")
101+
102+
103+
if __name__ == '__main__':
104+
unittest.main()

0 commit comments

Comments
 (0)