This repository was archived by the owner on Jan 5, 2024. It is now read-only.
forked from mayeranalytics/pyUBX
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathUBXMessage.py
More file actions
executable file
·297 lines (253 loc) · 11.1 KB
/
UBXMessage.py
File metadata and controls
executable file
·297 lines (253 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#!/usr/bin/env python3
"""TODO."""
import struct
import inspect
from enum import Enum
import sys
from itertools import accumulate
class MessageClass(Enum):
"""UBX Class IDs."""
NAV = b'\x01' # Navigation Results Messages: Position, Speed, Time, Acceleration, Heading, DOP, SVs used
RXM = b'\x02' # Receiver Manager Messages: Satellite Status, RTC Status
INF = b'\x04' # Information Messages: Printf-Style Messages, with IDs such as Error, Warning, Notice
ACK = b'\x05' # Ack/nak if configuration messages
UPD = b'\x09' # Firmware Update Messages: Memory/Flash erase/write, Reboot, Flash identification, etc.
MON = b'\x0A' # Monitoring messages
AID = b'\x0B' # AssistNow Aiding Messages: Ephemeris, Almanac, other A-GPS data input
TIM = b'\x0D' # Timing Messages: Time Pulse Output, Time Mark Results
ESF = b'\x10' # External Sensor Fusion Messages: External Sensor Measurements and Status Information
MGA = b'\x13' # Multiple GNSS Assistance Messages: Assistance data for various GNSS
LOG = b'\x21' # Logging Messages: Log creation, deletion, info and retrieval
SEC = b'\x27' # Security Feature Messages
HNR = b'\x28' # High Rate Navigation Results Messages: High rate time, position, speed, heading
def _byte(i):
"""Helper function: convert int 0..255 to bytestring byte."""
return bytes([i])
class UBXMessage(object):
"""Base class for UBX messages."""
sync_char_1 = b'\xb5'
sync_char_2 = b'\x62'
def __init__(self, msgClass, msgId, payload):
"""Instantiate UBXMessage from MessageClass, messageId and payload.
msgClass and msgId should be ints.
"""
self._class = msgClass
self._id = msgId
self._payload = payload
@staticmethod
def make(msgClass, msgId, payload):
"""Return a proper UBX message from the given class, id and payload."""
msg = struct.pack('cc', UBXMessage.sync_char_1, UBXMessage.sync_char_2)
msg += struct.pack('cc', _byte(msgClass), _byte(msgId))
msg += struct.pack('<h', len(payload))
msg += payload
msg += struct.pack('>H', UBXMessage.Checksum(msg[2:]).get())
return msg
@staticmethod
def extract(msg):
"""Return msgClass, msgId, payload from given message.
This is the inverse of make(), used mostly for debugging purposes.
"""
(sync1, sync2) = struct.unpack('cc', msg[0:2])
if sync1 != UBXMessage.sync_char_1 or sync2 != UBXMessage.sync_char_2:
raise Exception("Sync chars not correct.")
msgClass, msgId = struct.unpack('cc', msg[2:4])
lenPayload = struct.unpack('<h', msg[4:6])[0]
payload = msg[6:(6+lenPayload)]
trueCksum = UBXMessage.Checksum(msg[2:(len(msg)-2)]).get()
msgCksum = struct.unpack('>H', msg[6+lenPayload:])[0]
if trueCksum != msgCksum:
raise Exception(
"Calculated checksum 0x{:02x} does not match 0x{:02x}."
.format(msgCksum, trueCksum)
)
return ord(msgClass), ord(msgId), payload
def serialize(self):
"""Serialize the UBXMessage."""
return UBXMessage.make(self._class, self._id, self._payload)
class Checksum:
"""Incrementally calculate UBX message checksums."""
def __init__(self, msg=None):
"""Instantiate object.
If msg is not None calculate the checksum of the message, otherwise
instantiate the checksums to zero.
"""
self.reset()
if msg is not None:
for i in msg:
self.update(bytes([i]))
def reset(self):
"""Reset the checksums to zero."""
self.a, self.b = 0x00, 0x00
def update(self, byte):
"""Update checksums with byte."""
i = ord(byte)
self.a += i
self.a &= 0xff
self.b += self.a
self.b &= 0xff
def get(self):
"""Return the checksum (a 16-bit integer, ck_a is the MSB)."""
return self.a * 256 + self.b
def _mkFieldInfo(Fields):
# The following is a list of (name, formatChar) tuples, such as
# [(1, 'clsID', U1), (2, 'msgID', U1)]
once = [
(v.ord, k, v)
for k, v in Fields.__dict__.items()
if not k.startswith('__') and k != 'Repeated'
]
once.sort()
repeated = Fields.__dict__.get('Repeated')
repeated = [] if repeated is None else _mkFieldInfo(repeated)
return {
'once': ([v for (o, k, v) in once], # list of types
[k for (o, k, v) in once]), # list of var names
'repeat': repeated
}
def _mkNamesAndTypes(fieldInfo, msgLength):
"""Make list of variable names and list of variable types.
Input is the fieldInfo as returned by _mkFieldInfo.
The number of repeated objects is deduced from the message length and
the types of variables in the 'once' and 'repeat' block of the input."""
varTypes, varNames = fieldInfo['once']
repeat = fieldInfo['repeat']
if repeat:
varTypesRepeat, varNamesRepeat = repeat['once'] # nest level 1 only
sizeOnce = sum([t._size for t in varTypes])
sizeRepeat = sum([t._size for t in varTypesRepeat])
N = (msgLength - sizeOnce) // sizeRepeat
sizeTotal = sizeOnce + N * sizeRepeat
if sizeTotal != msgLength:
errmsg = "message length {} does not match {}"\
.format(msgLength, sizeTotal)
raise Exception(errmsg)
varTypes += N * varTypesRepeat
varNames += _flatten(list(
map(lambda i: list(map(lambda s: s+"_"+str(i),
varNamesRepeat)
),
range(1, N+1))
)
)
return varNames, varTypes
def _flatten(l):
return [item for sublist in l for item in sublist]
def initMessageClass(cls):
"""Decorator for the python class representing a UBX message class.
It does the following in cls:
- add a dict with name _lookup that maps UBX message ID to python subclass.
In each subclass it does this:
- add an __init__ if it doesn't exist
- add a __str__ if it doesn't exist
Function __init__ instantiates the object from a message.
Function __str__ creates a human readable string from the object.
"""
cls_name = cls.__name__
subClasses = [c for c in cls.__dict__.values() if type(c) == type]
lookup = dict([(getattr(subcls, '_id'), subcls) for subcls in subClasses])
setattr(cls, "_lookup", lookup)
for sc in subClasses:
if sc.__dict__.get('Fields') is None: # 'Fields' must be present
raise Exception(
"Class {}.{} has no Fields"
.format(cls.__name__, sc.__name__)
)
# add __init__ to subclass if necessary
if sc.__dict__.get('__init__') is None:
def __init__(self, msg):
"""Instantiate object from message bytestring."""
fieldInfo = _mkFieldInfo(self.Fields)
varNames, varTypes = _mkNamesAndTypes(fieldInfo, len(msg))
if not varNames:
errmsg = 'No variables found in UBX.{}.{}.'\
.format(cls_name, sc.__name__)
errmsg += ' Is the \'Fields\' class empty?'
raise Exception(errmsg)
_len = len(msg) # msg will be consumed in the loop
for (varName, varType) in zip(varNames, varTypes):
val, msg = varType.parse(msg)
setattr(self, varName, val)
if len(msg) != 0:
clsName = "UBX.{}.{}".format(cls_name, sc.__name__)
raise Exception(
"Message not fully consumed while parsing a {}!"
.format(clsName)
)
self._len = _len
self._payload = msg
setattr(sc, "__init__", __init__)
# add __str__ to subclass if necessary
if sc.__dict__.get('__str__') is None:
def __str__(self):
"""Return human readable string."""
fieldInfo = _mkFieldInfo(self.Fields)
varNames, varTypes = _mkNamesAndTypes(fieldInfo, self._len)
s = "{}-{}:".format(cls_name, type(self).__name__)
for (varName, varType) in zip(varNames, varTypes):
s += "\n {}={}".format(
varName,
varType.toString(getattr(self, varName)) # prettify
)
return s
setattr(sc, "__str__", __str__)
# add serialize to subclass if necessary
if sc.__dict__.get('serialize') is None:
def serialize(self):
"""UBX-serialize this object."""
fieldInfo = _mkFieldInfo(self.Fields)
varNames, varTypes = _mkNamesAndTypes(fieldInfo, self._len)
payload = b''
for name, typ in zip(varNames, varTypes):
val = getattr(self, name)
payload += typ.serialize(val)
return UBXMessage.make(
self._class, self._id, payload
)
setattr(sc, "serialize", serialize)
# set the '_class' class variable in subclass
setattr(sc, '_class', cls._class)
return cls
def classFromMessageClass():
"""Look up the python class corresponding to a UBX message class.
The result is something like
[(5, UBX.ACK.ACK), (6, UBX.CFG.CFG), (10, UBX.MON.MON)]
"""
return dict([
(getattr(v, '_class'), v)
for (k, v) in inspect.getmembers(sys.modules["UBX"], inspect.isclass)
if v.__name__ not in [
"UBXMessage", "U1", "I1", "X1", "U2", "I2", "X2",
"U4", "I4", "X4", "R4", "R8", "CH", "U"
]
])
def parseUBXPayload(msgClass, msgId, payload):
"""Parse a UBX payload from message class, message ID and payload."""
Cls = classFromMessageClass().get(msgClass)
if Cls is None:
err = "Cannot parse message class {}.\n Available: {}"\
.format(msgClass, classFromMessageClass())
raise Exception(err)
Subcls = Cls._lookup.get(msgId)
if Subcls is None:
raise Exception(
"Cannot parse message ID {} of message class {}.\n Available: {}"
.format(msgId, msgClass, Cls._lookup))
return Subcls(payload)
def parseUBXMessage(msg):
"""Parse a UBX message."""
msgClass, msgId, payload = UBXMessage.extract(msg)
return parseUBXPayload(msgClass, msgId, payload)
def formatByteString(s):
"""Return a readable string of hex numbers."""
return " ".join('{:02x}'.format(x) for x in s)
def addGet(cls):
"""Decorator that adds a Get function to the subclass."""
class Get(UBXMessage):
def __init__(self):
# this only works because class and module have the same name!
import UBX
_class = eval(cls.__module__)._class
UBXMessage.__init__(self, _class, cls._id, b'')
setattr(cls, "Get", Get)
return cls