diff --git a/setup.py b/setup.py index 8bd96fd..45053ae 100644 --- a/setup.py +++ b/setup.py @@ -63,8 +63,7 @@ author='SparkFun Electronics', author_email='info@sparkfun.com', - install_requires=[ - ], + install_requires=["pyserial", ], # Choose your license license='MIT', diff --git a/ublox_gps/core.py b/ublox_gps/core.py index 4741788..a55af56 100644 --- a/ublox_gps/core.py +++ b/ublox_gps/core.py @@ -2,9 +2,13 @@ """The core structure definitions""" import struct +import time from collections import namedtuple +from serial import SerialException from typing import List, Iterator, Union +from serial.serialutil import SerialException + __all__ = ['PadByte', 'Field', 'Flag', 'BitField', 'RepeatedBlock', 'Message', 'Cls', 'Parser', ] @@ -435,6 +439,15 @@ def _read_until(stream, terminator: bytes, size=None): """ term_len = len(terminator) line = bytearray() + + def check_timeout(start_time) -> bool: + """Check to see if the timeout has been reached.""" + if not hasattr(stream, "timeout"): + return False + + return (time.time() - start_time) > stream.timeout + + _read_start_time = time.time() while True: c = stream.read(1) if c: @@ -443,6 +456,10 @@ def _read_until(stream, terminator: bytes, size=None): break if size is not None and len(line) >= size: break + elif check_timeout(_read_start_time): + raise SerialException( + "Failed to find terminator before timeout." + ) else: break diff --git a/ublox_gps/sparkfun_predefines.py b/ublox_gps/sparkfun_predefines.py index cfd2f27..3dc1e3d 100644 --- a/ublox_gps/sparkfun_predefines.py +++ b/ublox_gps/sparkfun_predefines.py @@ -3,7 +3,7 @@ from . import core __all__ = ['ACK_CLS', 'CFG_CLS', 'ESF_CLS', 'INF_CLS', 'MGA_CLS', 'MON_CLS', - 'NAV_CLS', 'TIM_CLS', ] + 'NAV_CLS', 'TIM_CLS', 'RXM_CLS', ] ACK_CLS = core.Cls(0x05, 'ACK', [ core.Message(0x01, 'ACK', [ @@ -1130,3 +1130,48 @@ ]), ]), ]) + +# From the u-blox M8 protocol spec +# https://www.u-blox.com/sites/default/files/products/documents/u-blox8-M8_ReceiverDescrProtSpec_%28UBX-13003221%29.pdf +RXM_CLS = core.Cls(0x02, "RXM", [ # 32.18 UBX-RXM (0x02) + core.Message(0x15, "RAWX", [ # 32.18.4 UBX-RXM-RAWX (0x02 0x15) + core.Field("rcvTow", "R8"), + core.Field("week", "U2"), + core.Field("leapS", "I1"), + core.Field("numMeas", "U1"), + core.BitField("recStat", "X1", [ + core.Flag("leapSec", 0, 1), + core.Flag("clkReset", 1, 2), + ]), + core.Field("version", "U1"), + core.Field("reserved1a", "U1"), # Try U2? + core.Field("reserved1b", "U1"), + core.RepeatedBlock("RB", [ + core.Field("prMeas", "R8"), + core.Field("cpMeas", "R8"), + core.Field("doMeas", "R4"), + core.Field("gnssId", "U1"), + core.Field("svId", "U1"), + core.Field("sigId", "U1"), + core.Field("freqId", "U1"), + core.Field("locktime", "U2"), + core.Field("cno", "U1"), + core.BitField("prStdev", "X1", [ + core.Flag("prStd", 0, 4), + ]), + core.BitField("cpStdev", "X1", [ + core.Flag("cpStd", 0, 4), + ]), + core.BitField("doStdev", "X1", [ + core.Flag("doStd", 0, 4), + ]), + core.BitField("trkStat", "X1", [ + core.Flag("prValid", 0, 1), + core.Flag("coValid", 1, 2), + core.Flag("halfCyc", 2, 3), + core.Flag("subHalfCyc", 3, 4), + ]), + core.Field("reserved2", "U1"), + ]), + ]), +]) \ No newline at end of file diff --git a/ublox_gps/ublox_gps.py b/ublox_gps/ublox_gps.py index 5e62942..104ad50 100644 --- a/ublox_gps/ublox_gps.py +++ b/ublox_gps/ublox_gps.py @@ -45,11 +45,23 @@ import struct import serial -import spidev from . import sparkfun_predefines as sp from . import core +try: + import spidev + SPI_AVAILABLE = True +except ModuleNotFoundError as err: + import sys + + # If the platform is MacOS or Windows + if sys.platform in ["darwin", "win32", ]: + print("spidev only available for linux") + SPI_AVAILABLE = False + else: + raise err + class UbloxGps(object): """ UbloxGps @@ -58,7 +70,7 @@ class UbloxGps(object): :param hard_port: The port to use to communicate with the module, this can be a serial or SPI port. If no port is given, then the library - assumes serial at a 38400 baud rate. + assumes serial0 at a 38400 baud rate. :return: The UbloxGps object. :rtype: Object @@ -67,12 +79,17 @@ class UbloxGps(object): def __init__(self, hard_port = None): if hard_port is None: self.hard_port = serial.Serial("/dev/serial0/", 38400, timeout=1) - elif type(hard_port) == spidev.SpiDev: - sfeSpi = sfeSpiWrapper(hard_port) - self.hard_port = sfeSpi - else: + elif isinstance(hard_port, serial.Serial): self.hard_port = hard_port + if SPI_AVAILABLE: + if type(hard_port) == spidev.SpiDev: + sfeSpi = sfeSpiWrapper(hard_port) + self.hard_port = sfeSpi + + if not hasattr(self, "hard_port"): + raise IOError("Unable to connect to port: {}".format(hard_port)) + # Class message values self.ack_ms= { 'ACK':0x01, 'NAK':0x00 @@ -113,6 +130,9 @@ def __init__(self, hard_port = None): self.time_ms= { 'TM2':0x03, 'TP':0x01, 'VRFY':0x06 } + self.rxm_ms= { + "RAWX":0x15, + } def send_message(self, ubx_class, ubx_id, ubx_payload = None): """ @@ -356,6 +376,21 @@ def esf_status(self): cls_name, msg_name, payload = parse_tool.receive_from(self.hard_port) return payload + def rawx_measurements(self): + """ + Sends a poll request for the RXM class with the RAWX measurements and + parses ublox messages for the response. The payload is extracted from + the response which is then passed to the user. + + :return: The payload of the RXM Class and RAWX Message ID + :rtype: namedtuple + """ + self.send_message(sp.RXM_CLS, self.rxm_ms.get('RAWX')) + parse_tool = core.Parser([sp.RXM_CLS]) + cls_name, msg_name, payload = parse_tool.receive_from(self.hard_port) + s_payload = self.scale_RXM_RAWX(payload) + return s_payload + def port_settings(self): """ Sends a poll request for the MON class with the COMMS Message ID and @@ -511,6 +546,23 @@ def module_software_version(self): msg = parse_tool.receive_from(self.hard_port) return msg + def get_ubx_rxm_rawx(self): + """ + Sends a poll request for the RXM class that contains the raw + pseudorange, carrier phase, and doppler information for each + satellite, among other information. This payload is extracted from the + response, scaled, and then passed to the user. + + :return: The payload of the RXM Class and RAWX measurements + :rtype: namedtuples + """ + self.send_message(sp.RXM_CLS, self.rxm_ms.get("RAWX")) + parse_tool = core.Parser([sp.RXM_CLS]) + cls_name, msg_name, payload = parse_tool.receive_from(self.hard_port) + s_payload = self.scale_RXM_RAWX(payload) + return s_payload + + def scale_NAV_ATT(self, nav_payload): """ This takes the UBX-NAV-ATT payload and scales the relevant fields @@ -715,60 +767,83 @@ def scale_NAV_VALNED(self, nav_payload): nav_payload = nav_payload._replace(heading= att_head * (10**-5)) return nav_payload + + def scale_RXM_RAWX(self, nav_payload): + """ + This takes the UBX-RXM-RAWX payload and scales the relevant fields as + they're described in the datasheet. + :return: Scaled version of the given payload. + :rtype: namedtyple + """ + for _i, _rb in enumerate(nav_payload.RB): + _pr_std = _rb.prStdev + _pr_std = _pr_std._replace(prStd=_pr_std.prStd * 0.01 * (2 ** 4)) + _cp_std = _rb.cpStdev + _cp_std = _cp_std._replace(cpStd=_cp_std.cpStd * 0.004) + _do_std = _rb.doStdev + _do_std = _do_std._replace(doStd=_do_std.doStd * 0.002 * (2 ** 4)) -class sfeSpiWrapper(object): - """ - sfeSpiWrapper - - Initialize the library with the given port. + nav_payload.RB[_i] = nav_payload.RB[_i]._replace(prStdev=_pr_std) + nav_payload.RB[_i] = nav_payload.RB[_i]._replace(cpStdev=_cp_std) + nav_payload.RB[_i] = nav_payload.RB[_i]._replace(doStdev=_do_std) - :param spi_port: This library simply provides some ducktyping for spi so - that the ubxtranslator library doesn't complain. It - takes a spi port and then sets it to the ublox module's - specifications. + return nav_payload - :return: The sfeSpiWrapper object. - :rtype: Object - """ - def __init__(self, spi_port = None): +if SPI_AVAILABLE: + class sfeSpiWrapper(object): + """ + sfeSpiWrapper - if spi_port is None: - self.spi_port = spidev.SpiDev() - else: - self.spi_port = spi_port + Initialize the library with the given port. - self.spi_port.open(0,0) - self.spi_port.max_speed_hz = 5500 #Hz - self.spi_port.mode = 0b00 + :param spi_port: This library simply provides some ducktyping for spi so + that the ubxtranslator library doesn't complain. It + takes a spi port and then sets it to the ublox module's + specifications. - def read(self, read_data = 1): + :return: The sfeSpiWrapper object. + :rtype: Object """ - Reads a byte or bytes of data from the SPI port. The bytes are - converted to a bytes object before being returned. - :return: The requested bytes - :rtype: bytes - """ + def __init__(self, spi_port = None): - data = self.spi_port.readbytes(read_data) - byte_data = bytes([]) - for d in data: - byte_data = byte_data + bytes([d]) - return byte_data + if spi_port is None: + self.spi_port = spidev.SpiDev() + else: + self.spi_port = spi_port - def write(self, data): - """ - Writes a byte or bytes of data to the SPI port. + self.spi_port.open(0,0) + self.spi_port.max_speed_hz = 5500 #Hz + self.spi_port.mode = 0b00 - :return: True on completion - :rtype: boolean - """ - self.spi_port.xfer2(list(data)) + def read(self, read_data = 1): + """ + Reads a byte or bytes of data from the SPI port. The bytes are + converted to a bytes object before being returned. - return True + :return: The requested bytes + :rtype: bytes + """ + + data = self.spi_port.readbytes(read_data) + byte_data = bytes([]) + for d in data: + byte_data = byte_data + bytes([d]) + return byte_data + + def write(self, data): + """ + Writes a byte or bytes of data to the SPI port. + + :return: True on completion + :rtype: boolean + """ + self.spi_port.xfer2(list(data)) + + return True