-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Expand file tree
/
Copy pathTC_IDM_3_2.py
More file actions
404 lines (343 loc) · 21.9 KB
/
TC_IDM_3_2.py
File metadata and controls
404 lines (343 loc) · 21.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --PICS src/app/tests/suites/certification/ci-pics-values
# factory-reset: true
# quiet: true
# === END CI TEST ARGUMENTS ===
import logging
from mobly import asserts
from support_modules.idm_support import IDMBaseTest
import matter.clusters as Clusters
from matter.clusters import ClusterObjects as ClusterObjects
# from matter.exceptions import ChipStackError
from matter.interaction_model import InteractionModelError, Status
from matter.testing import global_attribute_ids
from matter.testing.basic_composition import BasicCompositionTests
from matter.testing.decorators import async_test_body
from matter.testing.runner import TestStep, default_matter_test_main
log = logging.getLogger(__name__)
class TC_IDM_3_2(IDMBaseTest, BasicCompositionTests):
"""Test case for IDM-3.2: Write Response Action from DUT to TH. [{DUT_Server}]"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.endpoint = 0
# This test can take some time to run in heavily congested test environments, adding a longer timeout.
@property
def default_timeout(self) -> int:
return 300
def steps_TC_IDM_3_2(self) -> list[TestStep]:
return [
TestStep(0, "Commissioning, already done", is_commissioning=True),
TestStep(1, "TH sends the WriteRequestMessage to the DUT to write any attribute on an unsupported Endpoint. DUT responds with the Write Response action",
"Verify on the TH that the DUT sends the status code UNSUPPORTED_ENDPOINT"),
TestStep(2, "TH sends the WriteRequestMessage to the DUT to write any attribute on an unsupported cluster. DUT responds with the Write Response action",
"Verify on the TH that the DUT sends the status code UNSUPPORTED_CLUSTER"),
TestStep(3, "TH sends the WriteRequestMessage to the DUT to write an unsupported attribute. DUT responds with the Write Response action",
"Verify on the TH that the DUT sends the status code UNSUPPORTED_ATTRIBUTE"),
TestStep(4, "TH sends the WriteRequestMessage to the DUT to modify the value of one attribute and Set SuppressResponse to True.",
"On the TH verify that the DUT does not send a Write Response message with a success back to the TH."),
TestStep(5, "TH sends a ReadRequest message to the DUT to read any attribute on any cluster. DUT returns with a report data action with the attribute values and the dataversion of the cluster. TH sends a WriteRequestMessage to the DUT to modify the value of one attribute with the DataVersion field set to the one received in the prior step.",
"Verify that the DUT sends a Write Response message with a success back to the TH. Verify by sending a ReadRequest that the Write Action on DUT was successful."),
TestStep(6, "TH sends a ReadRequest message to the DUT to read any attribute on any cluster. DUT returns with a report data action with the attribute values and the dataversion of the cluster. TH sends a WriteRequestMessage to the DUT to modify the value of one attribute no DataVersion indicated. TH sends a second WriteRequestMessage to the DUT to modify the value of an attribute with the dataversion field set to the value received earlier.",
"Verify that the DUT sends a Write Response message with the error DATA_VERSION_MISMATCH for the second Write request."),
TestStep(7, "TH sends the WriteRequestMessage to the DUT to modify the value of a specific attribute data that needs Timed Write transaction to write and this action is not part of a Timed Write transaction.",
"On the TH verify that the DUT sends a status code NEEDS_TIMED_INTERACTION."),
]
@async_test_body
async def test_TC_IDM_3_2(self):
self.step(0)
# Test Setup with robust endpoint/cluster discovery
await self.setup_class_helper(allow_pase=False)
self.step(1)
'''
Write any attribute on an unsupported endpoint to DUT
Find an unsupported endpoint
'''
supported_endpoints = set(self.endpoints.keys())
all_endpoints = set(range(max(supported_endpoints) + 2))
unsupported_endpoints = list(all_endpoints - supported_endpoints)
unsupported_endpoint = unsupported_endpoints[0]
test_attribute = Clusters.Descriptor.Attributes.FeatureMap
# Try to write to an unsupported endpoint using framework method
write_status = await self.write_single_attribute(
attribute_value=test_attribute(0),
endpoint_id=unsupported_endpoint,
expect_success=False
)
# Verify we get UNSUPPORTED_ENDPOINT error
asserts.assert_equal(write_status, Status.UnsupportedEndpoint,
f"Write to unsupported endpoint should return UNSUPPORTED_ENDPOINT, got {write_status}")
self.step(2)
'''
Write all attributes on an unsupported cluster to DUT
Find an unsupported cluster
'''
supported_cluster_ids = set()
for endpoint_clusters in self.endpoints.values():
supported_cluster_ids.update({
cluster.id for cluster in endpoint_clusters
if global_attribute_ids.cluster_id_type(cluster.id) == global_attribute_ids.ClusterIdType.kStandard
})
# Get all possible standard clusters
all_standard_cluster_ids = {
cluster_id for cluster_id in ClusterObjects.ALL_CLUSTERS
if global_attribute_ids.cluster_id_type(cluster_id) == global_attribute_ids.ClusterIdType.kStandard
}
# Find unsupported clusters
unsupported_cluster_ids = all_standard_cluster_ids - supported_cluster_ids
if not unsupported_cluster_ids:
self.skip_step("No unsupported standard clusters found to test")
# Use the first unsupported cluster
unsupported_cluster_id = next(iter(unsupported_cluster_ids))
cluster_attributes = ClusterObjects.ALL_ATTRIBUTES[unsupported_cluster_id]
test_unsupported_attribute = next(iter(cluster_attributes.values()))
write_status = await self.write_single_attribute(
attribute_value=test_unsupported_attribute,
endpoint_id=self.endpoint,
expect_success=False
)
# Verify we get UNSUPPORTED_CLUSTER error
asserts.assert_equal(write_status, Status.UnsupportedCluster,
f"Write to unsupported cluster should return UNSUPPORTED_CLUSTER, got {write_status}")
self.step(3)
# Write an unsupported attribute to DUT.
# Build a flat list of all candidate (endpoint_id, attr_class) pairs first so we can
# iterate without nested-loop break complexity.
unsupported_candidates = []
for endpoint_id, endpoint in self.endpoints.items():
for cluster_type, cluster_data in endpoint.items():
if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard:
continue
all_attrs = set(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())
dut_attrs = set(cluster_data.get(cluster_type.Attributes.AttributeList, []))
for attr_id in all_attrs - dut_attrs:
if global_attribute_ids.attribute_id_type(attr_id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal:
unsupported_candidates.append(
(endpoint_id, ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][attr_id])
)
# Iterate candidates until we find one whose type can be serialized with a simple 0 value.
# Some attribute types are structs/lists and will raise TypeError when passed 0 during TLV
# encoding; skip those and try the next candidate.
write_status2 = None
found_unsupported_attr = False
for endpoint_id, candidate_attr in unsupported_candidates:
try:
write_status2 = await self.write_single_attribute(
attribute_value=candidate_attr(0),
endpoint_id=endpoint_id,
expect_success=False
)
log.info(f"Testing unsupported attribute: {candidate_attr}")
found_unsupported_attr = True
break
except TypeError as e:
log.info(f"Attribute {candidate_attr} not serializable with value 0, trying next candidate: {e}")
if found_unsupported_attr:
asserts.assert_equal(write_status2, Status.UnsupportedAttribute,
f"Write to unsupported attribute should return UNSUPPORTED_ATTRIBUTE, got {write_status2}")
else:
self.skip_step(3)
log.info("No unsupported attributes found to test")
self.skip_step(4)
# Currently skipping step 4 as we have removed support in the python framework for this functionality currently.
# This is now contained in the SuppressResponse test module PR referenced below for TC_IDM_3_2, once this test module merges that PR can then be merged
# and this test step will become valid after issues with SuppressResponse mentioned in issue https://github.com/project-chip/connectedhomeip/issues/41227.
# SuppressResponse PR Reference: https://github.com/project-chip/connectedhomeip/pull/41590
# TODO: Once the SuppressResponse test module PR merges, uncomment the following code and remove the skip_step line above.
"""
self.step(4)
# Check if NodeLabel attribute exists for step 4 (SuppressResponse tests)
if await self.attribute_guard(endpoint=self.endpoint, attribute=Clusters.BasicInformation.Attributes.NodeLabel):
'''
TH sends the WriteRequestMessage to the DUT to modify the value of one attribute and Set SuppressResponse to True.
NOTE: Per Issue #41227, the current spec does not strictly enforce that devices must suppress the response.
For now, we just ensure the device doesn't crash. The device MAY respond or may not - either is acceptable.
Future spec revisions will enforce no response behavior.
Reference: https://github.com/project-chip/connectedhomeip/issues/41227
'''
test_attribute = Clusters.BasicInformation.Attributes.NodeLabel
test_value = "SuppressResponse-Test"
log.info("Testing SuppressResponse functionality with NodeLabel attribute")
log.info("NOTE: Device may or may not respond - both behaviors are acceptable for now per Issue #41227")
# Send write request with suppressResponse=True
# Device may respond or not - we just ensure it doesn't crash
try:
res = await self.default_controller.WriteAttribute(
nodeid=self.dut_node_id,
attributes=[(self.endpoint, test_attribute(test_value))],
suppressResponse=True
)
log.info(f"Device responded to suppressResponse=True request: {res}")
except Exception as e:
# Device didn't respond (timeout or other error) - this is also acceptable
log.info(f"Device did not respond or encountered error: {e}")
# Verify the write operation succeeded by reading back the value
log.info("Verifying that the write operation succeeded")
actual_value = await self.read_single_attribute_check_success(
endpoint=self.endpoint,
cluster=Clusters.BasicInformation,
attribute=test_attribute
)
asserts.assert_equal(actual_value, test_value,
f"Attribute should be written. Expected {test_value}, got {actual_value}")
"""
# Check if NodeLabel attribute exists for steps 5 and 6 (DataVersion test steps)
self.step(5)
if await self.attribute_guard(endpoint=self.endpoint, attribute=Clusters.BasicInformation.Attributes.NodeLabel):
'''
TH sends a ReadRequest message to the DUT to read any attribute on any cluster.
DUT returns with a report data action with the attribute values and the dataversion of the cluster.
TH sends a WriteRequestMessage to the DUT to modify the value of one attribute with the DataVersion field set to the one received in the prior step.
'''
test_cluster = Clusters.BasicInformation
test_attribute = Clusters.BasicInformation.Attributes.NodeLabel
new_value0 = "New-Label-Step5"
# Read an attribute to get the current DataVersion
read_result = await self.default_controller.ReadAttribute(
self.dut_node_id,
[(self.endpoint, test_cluster, test_attribute)]
)
# Get the current DataVersion
current_data_version = read_result[self.endpoint][test_cluster][Clusters.Attribute.DataVersion]
log.info(f"Current DataVersion for cluster {test_cluster.id}: {current_data_version}")
write_result = await self.default_controller.WriteAttribute(
self.dut_node_id,
[(self.endpoint, test_attribute(new_value0), current_data_version)]
)
# Verify write was successful
asserts.assert_equal(write_result[0].Status, Status.Success,
f"Write with correct DataVersion should succeed, got {write_result[0].Status}")
# Verify the value was written by reading it back
actual_value = await self.read_single_attribute_check_success(endpoint=self.endpoint, cluster=test_cluster, attribute=test_attribute)
asserts.assert_equal(actual_value, new_value0,
f"Read value {actual_value} should match written value {new_value0}")
self.step(6)
if await self.attribute_guard(endpoint=self.endpoint, attribute=Clusters.BasicInformation.Attributes.NodeLabel):
'''
TH sends a ReadRequest message to the DUT to read any attribute on any cluster.
DUT returns with a report data action with the attribute values and the dataversion of the cluster.
TH sends a WriteRequestMessage to the DUT to modify the value of one attribute no DataVersion indicated.
TH sends a second WriteRequestMessage to the DUT to modify the value of an attribute with the dataversion field set to the value received earlier.
'''
# First, read to get the initial DataVersion
initial_read = await self.default_controller.ReadAttribute(
self.dut_node_id,
[(self.endpoint, test_cluster, test_attribute)]
)
initial_data_version = initial_read[self.endpoint][test_cluster][Clusters.Attribute.DataVersion]
log.info(f"Initial DataVersion for step 6: {initial_data_version}")
# Write without DataVersion (this should succeed and increment the DataVersion)
new_value1 = "New-Label-Step6"
write_status = await self.write_single_attribute(
attribute_value=test_attribute(new_value1),
endpoint_id=self.endpoint,
expect_success=True
)
# Now try to write with the old DataVersion (this should fail with DATA_VERSION_MISMATCH)
new_value2 = "New-Label-Step6-2"
write_result_old_version = await self.default_controller.WriteAttribute(
self.dut_node_id,
[(self.endpoint, test_attribute(new_value2), initial_data_version)]
)
# Verify we get DATA_VERSION_MISMATCH error
asserts.assert_equal(write_result_old_version[0].Status, Status.DataVersionMismatch,
f"Write with old DataVersion should return DATA_VERSION_MISMATCH, got {write_result_old_version[0].Status}")
else:
# NodeLabel doesn't exist - skip these steps for now
# Created following follow-up task for the event that the node label attribute does not exist
# TODO: https://github.com/project-chip/matter-test-scripts/issues/693
log.info("NodeLabel not found - this may be a non-commissionable device")
self.skip_step(5)
self.skip_step(6)
endpoint_id, timed_attr = await self.find_timed_write_attribute(self.endpoints)
if timed_attr:
self.step(7)
'''
TH sends the WriteRequestMessage to the DUT to modify the value of a specific attribute data that needs
timed write transaction to write and this action is not part of a timed write transaction.
This step tests the following 3 timed write error scenarios:
1. NEEDS_TIMED_INTERACTION: Writing timed-write-required attribute without timed transaction
2. TIMED_REQUEST_MISMATCH: Writing with TimedRequest flag but no actual timed transaction
(Timed Request ACTION = No, TimedRequest FLAG = True)
3. TIMED_REQUEST_MISMATCH: Writing with timed action performed but TimedRequest flag set to false
(Timed Request ACTION = Yes, TimedRequest FLAG = False)
Understanding the distinction:
- TIMED REQUEST ACTION: The TimedRequest protocol message sent BEFORE the WriteRequest
- TIMEDREQUEST FLAG: A boolean field IN the WriteRequest message itself
Normal timed write: Action=Yes, Flag=True (both must match)
'''
# Test with the real timed-write attribute found on the device
log.info(f"Testing timed write attribute: {timed_attr}")
# Test NEEDS_TIMED_INTERACTION - Writing timed-write-required attribute without timed transaction
# Found below logic in /home/ubuntu/connectedhomeapi/connectedhomeip/src/controller/python/tests/scripts/cluster_objects.py and TC_IDM_1_2 test logic.
log.info("Writing timed-write-required attribute without timedRequestTimeoutMs should be rejected")
try:
await self.default_controller.WriteAttribute(
self.dut_node_id,
attributes=[(endpoint_id, timed_attr(True))]
)
asserts.fail("The write request should be rejected due to InteractionModelError: NeedsTimedInteraction (0xc6).")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.NeedsTimedInteraction,
f"WriteAttribute should return NeedsTimedInteraction, got {e.status}")
# TIMED_REQUEST_MISMATCH - Writing with TimedRequest flag but no actual timed transaction
# Thanks to Cecille for the guidance on the test step logic and plumbing for this to function below.
log.info("Writing with TimedRequest flag but no timed transaction should return TIMED_REQUEST_MISMATCH")
try:
await self.default_controller.TestOnlyWriteAttributeWithMismatchedTimedRequestField(
self.dut_node_id,
timedRequestTimeoutMs=0, # No timed action
timedRequestFieldValue=True, # But field=true
attributes=[(endpoint_id, timed_attr(False))]
)
asserts.fail("The write request should be rejected due to InteractionModelError: TimedRequestMismatch (0xc9).")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.TimedRequestMismatch,
f"WriteAttribute should return TimedRequestMismatch, got {e.status}")
# TIMED_REQUEST_MISMATCH - Writing with timed action performed but TimedRequest flag set to false
log.info("Writing with timed action but TimedRequest flag=false should return TIMED_REQUEST_MISMATCH")
try:
await self.default_controller.TestOnlyWriteAttributeWithMismatchedTimedRequestField(
self.dut_node_id,
timedRequestTimeoutMs=1000, # Timed action performed
timedRequestFieldValue=False, # But field=false
attributes=[(endpoint_id, timed_attr(False))]
)
asserts.fail("The write request should be rejected due to InteractionModelError: TimedRequestMismatch (0xc9).")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.TimedRequestMismatch,
f"WriteAttribute should return TimedRequestMismatch, got {e.status}")
else:
self.skip_step(7)
if __name__ == "__main__":
default_matter_test_main()