Skip to content

Fix SDO truncation of unknown datatypes #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions canopen/objectdictionary/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,16 @@ def decode_raw(self, data: bytes) -> Union[int, float, str, bytes, bytearray]:
# Strip any trailing NUL characters from C-based systems
return data.decode("utf_16_le", errors="ignore").rstrip("\x00")
elif self.data_type in self.STRUCT_TYPES:
size = self.STRUCT_TYPES[self.data_type].size
logger.warning(f"Decoding data {data!r} of type 0x%X at %s",
self.data_type, pretty_index(self.index, self.subindex))
logger.warning(f"Size: {size} bytes, data length: {len(data)} bytes")
Comment on lines +433 to +435
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely too much logging, both by number of messages and at a too high severity level.

And please try to stick to the previously discussed rule: logging methods use % formats and parameters, and f-strings for everything else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. These are left-overs from my own debugging. My bad. I'll remove them.

if len(data) > size:
logger.warning("Excessive data in %s. Data type 0x%X expects %s bytes, got %s",
pretty_index(self.index, self.subindex), self.data_type,
size, len(data))
# Truncate the data to the expected size
data = data[:size]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's way easier switching to the .unpack_from() method which simply discards trailing extra data. I don't think this severe warning is generally desired.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to remove the warning altogether.

try:
value, = self.STRUCT_TYPES[self.data_type].unpack(data)
return value
Expand Down
18 changes: 1 addition & 17 deletions canopen/sdo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,7 @@ def upload(self, index: int, subindex: int) -> bytes:
When node responds with an error.
"""
with self.open(index, subindex, buffering=0) as fp:
response_size = fp.size
data = fp.read()

# If size is available through variable in OD, then use the smaller of the two sizes.
# Some devices send U32/I32 even if variable is smaller in OD
var = self.od.get_variable(index, subindex)
if var is not None:
# Found a matching variable in OD
# If this is a data type (string, domain etc) the size is
# unknown anyway so keep the data as is
if var.data_type not in objectdictionary.DATA_TYPES:
# Get the size in bytes for this variable
var_size = len(var) // 8
if response_size is None or var_size < response_size:
# Truncate the data to specified size
data = data[0:var_size]
return data
return fp.read()

def download(
self,
Expand Down
6 changes: 3 additions & 3 deletions test/test_sdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def test_size_not_specified(self):
]
# Make sure the size of the data is 1 byte
data = self.network[2].sdo.upload(0x1400, 2)
self.assertEqual(data, b'\xfe')
self.assertEqual(data, b'\xfe\x00\x00\x00')
data = self.network[2].object_dictionary[0x1400][2].decode_raw(data)
self.assertEqual(data, 254)
self.assertTrue(self.message_sent)

def test_expedited_download(self):
Expand Down Expand Up @@ -814,7 +816,6 @@ def test_unknown_od_112(self):

def test_unknown_datatype32(self):
"""Test an unknown datatype, but known OD, of 32 bits (4 bytes)."""
return # FIXME: Disabled temporarily until datatype conditionals are fixed, see #436
# Add fake entry 0x2100 to OD, using fake datatype 0xFF
if 0x2100 not in self.node.object_dictionary:
fake_var = ODVariable("Fake", 0x2100)
Expand All @@ -829,7 +830,6 @@ def test_unknown_datatype32(self):

def test_unknown_datatype112(self):
"""Test an unknown datatype, but known OD, of 112 bits (14 bytes)."""
return # FIXME: Disabled temporarily until datatype conditionals are fixed, see #436
# Add fake entry 0x2100 to OD, using fake datatype 0xFF
if 0x2100 not in self.node.object_dictionary:
fake_var = ODVariable("Fake", 0x2100)
Expand Down
Loading