diff --git a/canopen/sdo/base.py b/canopen/sdo/base.py index ddc75ed9..53a093ec 100644 --- a/canopen/sdo/base.py +++ b/canopen/sdo/base.py @@ -148,7 +148,18 @@ def __init__(self, sdo_node: SdoBase, od: objectdictionary.ODVariable): variable.Variable.__init__(self, od) def get_data(self) -> bytes: - return self.sdo_node.upload(self.od.index, self.od.subindex) + data = self.sdo_node.upload(self.od.index, self.od.subindex) + response_size = len(data) + + # If size is available through variable in OD, then use the smaller of the two sizes. + # Some devices send U32/I32 even if variable is smaller in OD + if self.od.fixed_size: + # Get the size in bytes for this variable + var_size = len(self.od) // 8 + if response_size is None or var_size < response_size: + # Truncate the data to specified size + data = data[:var_size] + return data def set_data(self, data: bytes): force_segment = self.od.data_type == objectdictionary.DOMAIN diff --git a/canopen/sdo/client.py b/canopen/sdo/client.py index fd5fd99b..fef8bdae 100644 --- a/canopen/sdo/client.py +++ b/canopen/sdo/client.py @@ -105,6 +105,10 @@ def abort(self, abort_code=0x08000000): def upload(self, index: int, subindex: int) -> bytes: """May be called to make a read operation without an Object Dictionary. + No validation against the Object Dictionary is performed, even if an object description + would be available. The length of the returned data depends only on the transferred + amount, possibly truncated to the size indicated by the server. + :param index: Index of object to read. :param subindex: @@ -121,17 +125,8 @@ def upload(self, index: int, subindex: int) -> bytes: response_size = fp.size data = fp.read() - # If size is available through variable in OD, then use the smaller of the two sizes. - # Some devices send U32/I32 even if variable is smaller in OD - var = self.od.get_variable(index, subindex) - if var is not None: - # Found a matching variable in OD - if var.fixed_size: - # Get the size in bytes for this variable - var_size = len(var) // 8 - if response_size is None or var_size < response_size: - # Truncate the data to specified size - data = data[0:var_size] + if response_size and response_size < len(data): + data = data[:response_size] return data def download( diff --git a/test/test_sdo.py b/test/test_sdo.py index 78012a30..9764a6d3 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -90,8 +90,16 @@ def test_expedited_upload(self): # UNSIGNED8 without padded data part (see issue #5) self.data = [ - (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), - (RX, b'\x4f\x00\x14\x02\xfe') + (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), # upload initiate 0x1400:02 + (RX, b'\x4f\x00\x14\x02\xfe'), # expedited, size=1 + ] + trans_type = self.network[2].sdo[0x1400]['Transmission type RPDO 1'].raw + self.assertEqual(trans_type, 254) + + # Same with padding to a full SDO frame + self.data = [ + (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), # upload initiate 0x1400:02 + (RX, b'\x42\x00\x14\x02\xfe\x00\x00\x00'), # expedited, no size indicated ] trans_type = self.network[2].sdo[0x1400]['Transmission type RPDO 1'].raw self.assertEqual(trans_type, 254) @@ -102,9 +110,9 @@ def test_size_not_specified(self): (TX, b'\x40\x00\x14\x02\x00\x00\x00\x00'), (RX, b'\x42\x00\x14\x02\xfe\x00\x00\x00') ] - # Make sure the size of the data is 1 byte + # This method used to truncate to 1 byte, but returns raw content now data = self.network[2].sdo.upload(0x1400, 2) - self.assertEqual(data, b'\xfe') + self.assertEqual(data, b'\xfe\x00\x00\x00') self.assertTrue(self.message_sent) def test_expedited_download(self): @@ -131,6 +139,17 @@ def test_segmented_upload(self): device_name = self.network[2].sdo[0x1008].raw self.assertEqual(device_name, "Tiny Node - Mega Domains !") + def test_segmented_upload_too_much_data(self): + # Server sends 5 bytes, but indicated size 4 + self.data = [ + (TX, b'\x40\x08\x10\x00\x00\x00\x00\x00'), # upload initiate, 0x1008:00 + (RX, b'\x41\x08\x10\x00\x04\x00\x00\x00'), # segmented, size indicated, 4 bytes + (TX, b'\x60\x00\x00\x00\x00\x00\x00\x00'), # upload segment + (RX, b'\x05\x54\x69\x6E\x79\x20\x00\x00'), # segment complete, 5 bytes + ] + device_name = self.network[2].sdo[0x1008].raw + self.assertEqual(device_name, "Tiny") + def test_segmented_download(self): self.data = [ (TX, b'\x21\x00\x20\x00\x0d\x00\x00\x00'),