diff --git a/src/python_testing/TC_IDM_3_2.py b/src/python_testing/TC_IDM_3_2.py index c41ee927b3c1cf..b26687bcbb43bb 100644 --- a/src/python_testing/TC_IDM_3_2.py +++ b/src/python_testing/TC_IDM_3_2.py @@ -60,6 +60,12 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.endpoint = 0 + # This test can take some time to run in heavily congested test environments, adding a longer timeout. + + @property + def default_timeout(self) -> int: + return 300 + def steps_TC_IDM_3_2(self) -> list[TestStep]: return [ TestStep(0, "Commissioning, already done", is_commissioning=True), @@ -148,9 +154,10 @@ async def test_TC_IDM_3_2(self): f"Write to unsupported cluster should return UNSUPPORTED_CLUSTER, got {write_status}") self.step(3) - # Write an unsupported attribute to DUT - unsupported_endpoint = None - unsupported_attribute = None + # Write an unsupported attribute to DUT. + # Build a flat list of all candidate (endpoint_id, attr_class) pairs first so we can + # iterate without nested-loop break complexity. + unsupported_candidates = [] for endpoint_id, endpoint in self.endpoints.items(): for cluster_type, cluster_data in endpoint.items(): if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: @@ -159,31 +166,38 @@ async def test_TC_IDM_3_2(self): all_attrs = set(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys()) dut_attrs = set(cluster_data.get(cluster_type.Attributes.AttributeList, [])) - unsupported = [ - attr_id for attr_id in (all_attrs - dut_attrs) - if global_attribute_ids.attribute_id_type(attr_id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal - ] - if unsupported: - unsupported_attribute = ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]] - unsupported_endpoint = endpoint_id - log.info(f"Found unsupported attribute: {unsupported_attribute} in cluster {cluster_type.id}") - break - if unsupported_attribute: - log.info(f"Unsupported attribute: {unsupported_attribute}") + for attr_id in all_attrs - dut_attrs: + if global_attribute_ids.attribute_id_type(attr_id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: + unsupported_candidates.append( + (endpoint_id, ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][attr_id]) + ) + + # Iterate candidates until we find one whose type can be serialized with a simple 0 value. + # Some attribute types are structs/lists and will raise TypeError when passed 0 during TLV + # encoding; skip those and try the next candidate. + write_status2 = None + found_unsupported_attr = False + for endpoint_id, candidate_attr in unsupported_candidates: + try: + write_status2 = await self.write_single_attribute( + attribute_value=candidate_attr(0), + endpoint_id=endpoint_id, + expect_success=False + ) + log.info(f"Testing unsupported attribute: {candidate_attr}") + found_unsupported_attr = True break + except TypeError as e: + log.info(f"Attribute {candidate_attr} not serializable with value 0, trying next candidate: {e}") - if not unsupported_attribute: - log.warning("No unsupported attributes found - this may be OK for non-commissionable devices") - else: - write_status2 = await self.write_single_attribute( - attribute_value=unsupported_attribute(0), - endpoint_id=unsupported_endpoint, - expect_success=False - ) - log.info(f"Writing unsupported attribute: {unsupported_attribute}") + if found_unsupported_attr: asserts.assert_equal(write_status2, Status.UnsupportedAttribute, f"Write to unsupported attribute should return UNSUPPORTED_ATTRIBUTE, got {write_status2}") + else: + self.skip_step(3) + log.info("No unsupported attributes found to test") + self.skip_step(4) # Currently skipping step 4 as we have removed support in the python framework for this functionality currently. # This is now contained in the SuppressResponse test module PR referenced below for TC_IDM_3_2, once this test module merges that PR can then be merged