Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions src/python_testing/TC_IDM_3_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.endpoint = 0

# This test can take some time to run in heavily congested test environments, adding a longer timeout.

@property
def default_timeout(self) -> int:
return 300

def steps_TC_IDM_3_2(self) -> list[TestStep]:
return [
TestStep(0, "Commissioning, already done", is_commissioning=True),
Expand Down Expand Up @@ -148,9 +154,10 @@ async def test_TC_IDM_3_2(self):
f"Write to unsupported cluster should return UNSUPPORTED_CLUSTER, got {write_status}")

self.step(3)
# Write an unsupported attribute to DUT
unsupported_endpoint = None
unsupported_attribute = None
# Write an unsupported attribute to DUT.
# Build a flat list of all candidate (endpoint_id, attr_class) pairs first so we can
# iterate without nested-loop break complexity.
unsupported_candidates = []
for endpoint_id, endpoint in self.endpoints.items():
for cluster_type, cluster_data in endpoint.items():
if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard:
Expand All @@ -159,31 +166,38 @@ async def test_TC_IDM_3_2(self):
all_attrs = set(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())
dut_attrs = set(cluster_data.get(cluster_type.Attributes.AttributeList, []))

unsupported = [
attr_id for attr_id in (all_attrs - dut_attrs)
if global_attribute_ids.attribute_id_type(attr_id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal
]
if unsupported:
unsupported_attribute = ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]]
unsupported_endpoint = endpoint_id
log.info(f"Found unsupported attribute: {unsupported_attribute} in cluster {cluster_type.id}")
break
if unsupported_attribute:
log.info(f"Unsupported attribute: {unsupported_attribute}")
for attr_id in all_attrs - dut_attrs:
if global_attribute_ids.attribute_id_type(attr_id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal:
unsupported_candidates.append(
(endpoint_id, ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][attr_id])
)

# Iterate candidates until we find one whose type can be serialized with a simple 0 value.
# Some attribute types are structs/lists and will raise TypeError when passed 0 during TLV
# encoding; skip those and try the next candidate.
write_status2 = None
found_unsupported_attr = False
for endpoint_id, candidate_attr in unsupported_candidates:
try:
write_status2 = await self.write_single_attribute(
attribute_value=candidate_attr(0),
endpoint_id=endpoint_id,
expect_success=False
)
log.info(f"Testing unsupported attribute: {candidate_attr}")
found_unsupported_attr = True
break
except TypeError as e:
log.info(f"Attribute {candidate_attr} not serializable with value 0, trying next candidate: {e}")

if not unsupported_attribute:
log.warning("No unsupported attributes found - this may be OK for non-commissionable devices")
else:
write_status2 = await self.write_single_attribute(
attribute_value=unsupported_attribute(0),
endpoint_id=unsupported_endpoint,
expect_success=False
)
log.info(f"Writing unsupported attribute: {unsupported_attribute}")
if found_unsupported_attr:
asserts.assert_equal(write_status2, Status.UnsupportedAttribute,
f"Write to unsupported attribute should return UNSUPPORTED_ATTRIBUTE, got {write_status2}")

else:
self.skip_step(3)
log.info("No unsupported attributes found to test")

self.skip_step(4)
# Currently skipping step 4 as we have removed support in the python framework for this functionality currently.
# This is now contained in the SuppressResponse test module PR referenced below for TC_IDM_3_2, once this test module merges that PR can then be merged
Expand Down
Loading