diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index 872df234..c1c54d78 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -1,7 +1,9 @@ -import re -import io -import logging import copy +import logging +import re + +from canopen.objectdictionary import datatypes + try: from configparser import RawConfigParser, NoOptionError, NoSectionError except ImportError: @@ -190,6 +192,28 @@ def import_from_node(node_id, network): return od +def _calc_bit_length(data_type): + if data_type == datatypes.INTEGER8: + return 8 + elif data_type == datatypes.INTEGER16: + return 16 + elif data_type == datatypes.INTEGER32: + return 32 + elif data_type == datatypes.INTEGER64: + return 64 + else: + raise ValueError(f"Invalid data_type '{data_type}', expecting a signed integer data_type.") + + +def _signed_int_from_hex(hex_str, bit_length): + number = int(hex_str, 0) + limit = ((1 << bit_length - 1) - 1) + if number > limit: + return limit - number + else: + return number + + def _convert_variable(node_id, var_type, value): if var_type in (objectdictionary.OCTET_STRING, objectdictionary.DOMAIN): return bytes.fromhex(value) @@ -251,12 +275,20 @@ def build_variable(eds, section, node_id, index, subindex=0): if eds.has_option(section, "LowLimit"): try: - var.min = int(eds.get(section, "LowLimit"), 0) + min_string = eds.get(section, "LowLimit") + if var.data_type in objectdictionary.SIGNED_TYPES: + var.min = _signed_int_from_hex(min_string, _calc_bit_length(var.data_type)) + else: + var.min = int(min_string, 0) except ValueError: pass if eds.has_option(section, "HighLimit"): try: - var.max = int(eds.get(section, "HighLimit"), 0) + max_string = eds.get(section, "HighLimit") + if var.data_type in objectdictionary.SIGNED_TYPES: + var.max = _signed_int_from_hex(max_string, _calc_bit_length(var.data_type)) + else: + var.max = int(max_string, 0) except ValueError: pass if eds.has_option(section, "DefaultValue"): diff --git a/canopen/sdo/client.py b/canopen/sdo/client.py index 7e0f58bf..777829b6 100644 --- a/canopen/sdo/client.py +++ b/canopen/sdo/client.py @@ -2,10 +2,8 @@ import logging import io import time -try: - import queue -except ImportError: - import Queue as queue +import queue +from typing import Union from ..network import CanError from .. import objectdictionary @@ -29,7 +27,7 @@ class SdoClient(SdoBase): #: Seconds to wait before sending a request, for rate limiting PAUSE_BEFORE_SEND = 0.0 - def __init__(self, rx_cobid, tx_cobid, od): + def __init__(self, rx_cobid : int, tx_cobid : int, od : objectdictionary.ObjectDictionary): """ :param int rx_cobid: COB-ID that the server receives on (usually 0x600 + node ID) @@ -41,10 +39,10 @@ def __init__(self, rx_cobid, tx_cobid, od): SdoBase.__init__(self, rx_cobid, tx_cobid, od) self.responses = queue.Queue() - def on_response(self, can_id, data, timestamp): + def on_response(self, can_id : int, data : bytes, timestamp : int) -> None: self.responses.put(bytes(data)) - def send_request(self, request): + def send_request(self, request : bytes) -> None: retries_left = self.MAX_RETRIES while True: try: @@ -61,11 +59,12 @@ def send_request(self, request): else: break - def read_response(self): + def read_response(self) -> bytes: try: response = self.responses.get( block=True, timeout=self.RESPONSE_TIMEOUT) except queue.Empty: + self.abort(0x05040000) raise SdoCommunicationError("No SDO response received") res_command, = struct.unpack_from("B", response) if res_command == RESPONSE_ABORTED: @@ -73,7 +72,7 @@ def read_response(self): raise SdoAbortedError(abort_code) return response - def request_response(self, sdo_request): + def request_response(self, sdo_request : bytes) -> bytes: retries_left = self.MAX_RETRIES if not self.responses.empty(): # logger.warning("There were unexpected messages in the queue") @@ -90,7 +89,7 @@ def request_response(self, sdo_request): raise logger.warning(str(e)) - def abort(self, abort_code=0x08000000): + def abort(self, abort_code=0x08000000) -> None: """Abort current transfer.""" request = bytearray(8) request[0] = REQUEST_ABORTED @@ -160,8 +159,18 @@ def download( fp.write(data) fp.close() - def open(self, index, subindex=0, mode="rb", encoding="ascii", - buffering=1024, size=None, block_transfer=False, force_segment=False, request_crc_support=True): + def open( + self, + index : int, + subindex : int =0, + mode : str ="rb", + encoding : str ="ascii", + buffering : int =1024, + size : Union[int,None] = None, + block_transfer : bool = False, + force_segment : bool = False, + request_crc_support : bool =True + ): """Open the data stream as a file like object. :param int index: @@ -228,9 +237,9 @@ class ReadableStream(io.RawIOBase): """File like object for reading from a variable.""" #: Total size of data or ``None`` if not specified - size = None + size : Union[int,None] = None - def __init__(self, sdo_client, index, subindex=0): + def __init__(self, sdo_client : SdoClient, index : int, subindex : int = 0): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for reading. @@ -239,10 +248,10 @@ def __init__(self, sdo_client, index, subindex=0): :param int subindex: Object dictionary sub-index to read from. """ - self._done = False + self._done : bool = False self.sdo_client = sdo_client - self._toggle = 0 - self.pos = 0 + self._toggle : int = 0 + self.pos : int = 0 logger.debug("Reading 0x%X:%d from node %d", index, subindex, sdo_client.rx_cobid - 0x600) @@ -277,7 +286,7 @@ def __init__(self, sdo_client, index, subindex=0): else: logger.debug("Using segmented transfer") - def read(self, size=-1): + def read(self, size : int = -1) -> bytes: """Read one segment which may be up to 7 bytes. :param int size: @@ -301,8 +310,10 @@ def read(self, size=-1): response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_SEGMENT_UPLOAD: + self.sdo_client.abort(0x05040001) raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) if res_command & TOGGLE_BIT != self._toggle: + self.sdo_client.abort(0x05030000) raise SdoCommunicationError("Toggle bit mismatch") length = 7 - ((res_command >> 1) & 0x7) if res_command & NO_MORE_DATA: @@ -311,7 +322,7 @@ def read(self, size=-1): self.pos += length return response[1:length + 1] - def readinto(self, b): + def readinto(self, b) -> int: """ Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read. @@ -320,17 +331,24 @@ def readinto(self, b): b[:len(data)] = data return len(data) - def readable(self): + def readable(self) -> bool: return True - def tell(self): + def tell(self) -> int: return self.pos class WritableStream(io.RawIOBase): """File like object for writing to a variable.""" - def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False): + def __init__( + self, + sdo_client : SdoClient, + index : int, + subindex : int = 0, + size : Union[int,None] = None, + force_segment : bool = False + ): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for communication. @@ -345,10 +363,10 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False """ self.sdo_client = sdo_client self.size = size - self.pos = 0 - self._toggle = 0 - self._exp_header = None - self._done = False + self.pos : int = 0 + self._toggle : int = 0 + self._exp_header : Union[bytes,None] = None + self._done : bool = False if size is None or size > 4 or force_segment: # Initiate segmented download @@ -361,6 +379,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False response = sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command != RESPONSE_DOWNLOAD: + self.sdo_client.abort(0x05040001) raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) else: @@ -370,7 +389,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False command |= (4 - size) << 2 self._exp_header = SDO_STRUCT.pack(command, index, subindex) - def write(self, b): + def write(self, b : bytes) -> int: """ Write the given bytes-like object, b, to the SDO server, and return the number of bytes written. This will be at most 7 bytes. @@ -389,6 +408,7 @@ def write(self, b): response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_DOWNLOAD: + self.sdo_client.abort(0x05040001) raise SdoCommunicationError( "Unexpected response 0x%02X" % res_command) bytes_sent = len(b) @@ -413,6 +433,7 @@ def write(self, b): response = self.sdo_client.request_response(request) res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: + self.sdo_client.abort(0x05040001) raise SdoCommunicationError( "Unexpected response 0x%02X (expected 0x%02X)" % (res_command, RESPONSE_SEGMENT_DOWNLOAD)) @@ -420,7 +441,7 @@ def write(self, b): self.pos += bytes_sent return bytes_sent - def close(self): + def close(self) -> None: """Closes the stream. An empty segmented SDO message may be sent saying there is no more data. @@ -437,10 +458,10 @@ def close(self): self.sdo_client.request_response(request) self._done = True - def writable(self): + def writable(self) -> bool: return True - def tell(self): + def tell(self) -> int: return self.pos @@ -448,13 +469,19 @@ class BlockUploadStream(io.RawIOBase): """File like object for reading from a variable using block upload.""" #: Total size of data or ``None`` if not specified - size = None + size : Union[int,None] = None - blksize = 127 + blksize : int = 127 - crc_supported = False + crc_supported : bool = False - def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): + def __init__( + self, + sdo_client : SdoClient, + index : int , + subindex : int = 0, + request_crc_support : bool = True + ): """ :param canopen.sdo.SdoClient sdo_client: The SDO client to use for reading. @@ -465,12 +492,12 @@ def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): :param bool request_crc_support: If crc calculation should be requested when using block transfer """ - self._done = False + self._done : bool = False self.sdo_client = sdo_client - self.pos = 0 + self.pos : int = 0 self._crc = sdo_client.crc_cls() - self._server_crc = None - self._ackseq = 0 + self._server_crc : Union[int,None] = None + self._ackseq : int = 0 logger.debug("Reading 0x%X:%d from node %d", index, subindex, sdo_client.rx_cobid - 0x600) @@ -484,6 +511,7 @@ def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): response = sdo_client.request_response(request) res_command, res_index, res_subindex = SDO_STRUCT.unpack_from(response) if res_command & 0xE0 != RESPONSE_BLOCK_UPLOAD: + self.sdo_client.abort(0x05040001) raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) # Check that the message is for us if res_index != index or res_subindex != subindex: @@ -500,7 +528,7 @@ def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): request[0] = REQUEST_BLOCK_UPLOAD | START_BLOCK_UPLOAD sdo_client.send_request(request) - def read(self, size=-1): + def read(self, size : int = -1) -> bytes: """Read one segment which may be up to 7 bytes. :param int size: @@ -544,7 +572,7 @@ def read(self, size=-1): self.pos += len(data) return data - def _retransmit(self): + def _retransmit(self) -> None: logger.info("Only %d sequences were received. Requesting retransmission", self._ackseq) end_time = time.time() + self.sdo_client.RESPONSE_TIMEOUT @@ -557,9 +585,9 @@ def _retransmit(self): # We should be back in sync self._ackseq = seqno return response - raise SdoCommunicationError("Some data were lost and could not be retransmitted") + raise SdoCommunicationError("Some data was lost and could not be retransmitted") - def _ack_block(self): + def _ack_block(self) -> None: request = bytearray(8) request[0] = REQUEST_BLOCK_UPLOAD | BLOCK_TRANSFER_RESPONSE request[1] = self._ackseq @@ -568,7 +596,7 @@ def _ack_block(self): if self._ackseq == self.blksize: self._ackseq = 0 - def _end_upload(self): + def _end_upload(self) -> bytes: response = self.sdo_client.read_response() res_command, self._server_crc = struct.unpack_from("> 2) & 0x7 - def close(self): + def close(self) -> None: if self.closed: return super(BlockUploadStream, self).close() @@ -589,10 +617,10 @@ def close(self): request[0] = REQUEST_BLOCK_UPLOAD | END_BLOCK_TRANSFER self.sdo_client.send_request(request) - def tell(self): + def tell(self) -> int: return self.pos - def readinto(self, b): + def readinto(self, b : bytes) -> int: """ Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read. @@ -601,7 +629,7 @@ def readinto(self, b): b[:len(data)] = data return len(data) - def readable(self): + def readable(self) -> bool: return True diff --git a/doc/profiles.rst b/doc/profiles.rst index 1ef5ab58..9fdc1d29 100644 --- a/doc/profiles.rst +++ b/doc/profiles.rst @@ -66,7 +66,7 @@ class :attr:`.state` attribute can be read and set (command) by a string:: # command a state (an SDO message will be called) some_node.state = 'SWITCHED ON' # read the current state - some_node.state = 'SWITCHED ON' + some_node.state Available states: diff --git a/test/sample.eds b/test/sample.eds index bea6b9c3..671a559e 100644 --- a/test/sample.eds +++ b/test/sample.eds @@ -902,3 +902,39 @@ DataType=0x0008 AccessType=ro DefaultValue=0 PDOMapping=1 + +[3020] +ParameterName=INTEGER8 only positive values +ObjectType=0x7 +DataType=0x02 +AccessType=rw +HighLimit=0x7F +LowLimit=0x00 +PDOMapping=0 + +[3021] +ParameterName=UNSIGNED8 value range +2 to +10 +ObjectType=0x7 +DataType=0x05 +AccessType=rw +HighLimit=0x0A +LowLimit=0x02 +PDOMapping=0 + +[3030] +ParameterName=INTEGER32 only negative values +ObjectType=0x7 +DataType=0x04 +AccessType=rw +HighLimit=0x00000000 +LowLimit=0xFFFFFFFF +PDOMapping=0 + +[3040] +ParameterName=INTEGER64 value range -10 to +10 +ObjectType=0x7 +DataType=0x15 +AccessType=rw +HighLimit=0x000000000000000A +LowLimit=0x8000000000000009 +PDOMapping=0 diff --git a/test/test_eds.py b/test/test_eds.py index e5f6c89e..2a6d5098 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -4,6 +4,7 @@ EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') + class TestEDS(unittest.TestCase): def setUp(self): @@ -47,6 +48,20 @@ def test_record(self): self.assertEqual(var.data_type, canopen.objectdictionary.UNSIGNED32) self.assertEqual(var.access_type, 'ro') + def test_record_with_limits(self): + int8 = self.od[0x3020] + self.assertEqual(int8.min, 0) + self.assertEqual(int8.max, 127) + uint8 = self.od[0x3021] + self.assertEqual(uint8.min, 2) + self.assertEqual(uint8.max, 10) + int32 = self.od[0x3030] + self.assertEqual(int32.min, -2147483648) + self.assertEqual(int32.max, 0) + int64 = self.od[0x3040] + self.assertEqual(int64.min, -10) + self.assertEqual(int64.max, +10) + def test_array_compact_subobj(self): array = self.od[0x1003] self.assertIsInstance(array, canopen.objectdictionary.Array) @@ -98,18 +113,16 @@ def test_dummy_variable_undefined(self): def test_comments(self): self.assertEqual(self.od.comments, -""" + """ |-------------| | Don't panic | |-------------| -""".strip() - ) - +""".strip()) def test_export_eds(self): import tempfile for doctype in {"eds", "dcf"}: - with tempfile.NamedTemporaryFile(suffix="."+doctype, mode="w+") as tempeds: + with tempfile.NamedTemporaryFile(suffix="." + doctype, mode="w+") as tempeds: print("exporting %s to " % doctype + tempeds.name) canopen.export_od(self.od, tempeds, doc_type=doctype) tempeds.flush() @@ -117,54 +130,59 @@ def test_export_eds(self): for index in exported_od: self.assertIn(exported_od[index].name, self.od) - self.assertIn(index , self.od) + self.assertIn(index, self.od) for index in self.od: if index < 0x0008: # ignore dummies continue self.assertIn(self.od[index].name, exported_od) - self.assertIn(index , exported_od) + self.assertIn(index, exported_od) - actual_object = exported_od[index] - expected_object = self.od[index] + actual_object = exported_od[index] + expected_object = self.od[index] self.assertEqual(type(actual_object), type(expected_object)) self.assertEqual(actual_object.name, expected_object.name) if type(actual_object) is canopen.objectdictionary.Variable: expected_vars = [expected_object] - actual_vars = [actual_object ] - else : + actual_vars = [actual_object] + else: expected_vars = [expected_object[idx] for idx in expected_object] - actual_vars = [actual_object [idx] for idx in actual_object] + actual_vars = [actual_object[idx] for idx in actual_object] for prop in [ - "allowed_baudrates", - "vendor_name", - "vendor_number", - "product_name", - "product_number", - "revision_number", - "order_code", - "simple_boot_up_master", - "simple_boot_up_slave", - "granularity", - "dynamic_channels_supported", - "group_messaging", - "nr_of_RXPDO", - "nr_of_TXPDO", - "LSS_supported", + "allowed_baudrates", + "vendor_name", + "vendor_number", + "product_name", + "product_number", + "revision_number", + "order_code", + "simple_boot_up_master", + "simple_boot_up_slave", + "granularity", + "dynamic_channels_supported", + "group_messaging", + "nr_of_RXPDO", + "nr_of_TXPDO", + "LSS_supported", ]: - self.assertEqual(getattr(self.od.device_information, prop), getattr(exported_od.device_information, prop), f"prop {prop!r} mismatch on DeviceInfo") - - - for evar,avar in zip(expected_vars,actual_vars): - self. assertEqual(getattr(avar, "data_type" , None) , getattr(evar,"data_type" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) - self. assertEqual(getattr(avar, "default_raw", None) , getattr(evar,"default_raw",None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) - self. assertEqual(getattr(avar, "min" , None) , getattr(evar,"min" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) - self. assertEqual(getattr(avar, "max" , None) , getattr(evar,"max" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) + self.assertEqual(getattr(self.od.device_information, prop), + getattr(exported_od.device_information, prop), + f"prop {prop!r} mismatch on DeviceInfo") + + for evar, avar in zip(expected_vars, actual_vars): + self.assertEqual(getattr(avar, "data_type", None), getattr(evar, "data_type", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "default_raw", None), getattr(evar, "default_raw", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "min", None), getattr(evar, "min", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "max", None), getattr(evar, "max", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) if doctype == "dcf": - self.assertEqual(getattr(avar, "value" , None) , getattr(evar,"value" ,None) , " mismatch on %04X:%X"%(evar.index, evar.subindex)) + self.assertEqual(getattr(avar, "value", None), getattr(evar, "value", None), + " mismatch on %04X:%X" % (evar.index, evar.subindex)) self.assertEqual(self.od.comments, exported_od.comments) -