|
| 1 | +"""Tests the Danfoss quirk (all tests were written for the Popp eT093WRO).""" |
| 2 | +from unittest import mock |
| 3 | + |
| 4 | +from zigpy.quirks import CustomCluster |
| 5 | +from zigpy.zcl import foundation |
| 6 | +from zigpy.zcl.clusters.hvac import Thermostat |
| 7 | +from zigpy.zcl.foundation import WriteAttributesStatusRecord, ZCLAttributeDef |
| 8 | + |
| 9 | +import zhaquirks |
| 10 | +from zhaquirks.danfoss.thermostat import CustomizedStandardCluster |
| 11 | + |
| 12 | +zhaquirks.setup() |
| 13 | + |
| 14 | + |
| 15 | +def test_popp_signature(assert_signature_matches_quirk): |
| 16 | + """Test the signature matching the Device Class.""" |
| 17 | + signature = { |
| 18 | + "node_descriptor": "NodeDescriptor(logical_type=<LogicalType.EndDevice: 2>, complex_descriptor_available=0, user_descriptor_available=0, reserved=0, aps_flags=0, frequency_band=<FrequencyBand.Freq2400MHz: 8>, mac_capability_flags=<MACCapabilityFlags.AllocateAddress: 128>, manufacturer_code=4678, maximum_buffer_size=82, maximum_incoming_transfer_size=82, server_mask=11264, maximum_outgoing_transfer_size=82, descriptor_capability_field=<DescriptorCapability.NONE: 0>, *allocate_address=True, *is_alternate_pan_coordinator=False, *is_coordinator=False, *is_end_device=True, *is_full_function_device=False, *is_mains_powered=False, *is_receiver_on_when_idle=False, *is_router=False, *is_security_capable=False)", |
| 19 | + # SizePrefixedSimpleDescriptor(endpoint=1, profile=260, device_type=769, device_version=1, input_clusters=[0, 1, 3, 10, 32, 513, 516, 2821], output_clusters=[0, 25]) |
| 20 | + "endpoints": { |
| 21 | + "1": { |
| 22 | + "profile_id": 260, |
| 23 | + "device_type": "0x0301", |
| 24 | + "in_clusters": [ |
| 25 | + "0x0000", |
| 26 | + "0x0001", |
| 27 | + "0x0003", |
| 28 | + "0x000a", |
| 29 | + "0x0020", |
| 30 | + "0x0201", |
| 31 | + "0x0204", |
| 32 | + "0x0b05", |
| 33 | + ], |
| 34 | + "out_clusters": ["0x0000", "0x0019"], |
| 35 | + } |
| 36 | + }, |
| 37 | + "manufacturer": "D5X84YU", |
| 38 | + "model": "eT093WRO", |
| 39 | + "class": "danfoss.thermostat.DanfossThermostat", |
| 40 | + } |
| 41 | + |
| 42 | + assert_signature_matches_quirk( |
| 43 | + zhaquirks.danfoss.thermostat.DanfossThermostat, signature |
| 44 | + ) |
| 45 | + |
| 46 | + |
| 47 | +@mock.patch("zigpy.zcl.Cluster.bind", mock.AsyncMock()) |
| 48 | +async def test_danfoss_time_bind(zigpy_device_from_quirk): |
| 49 | + """Test the time being set when binding the Time cluster.""" |
| 50 | + device = zigpy_device_from_quirk(zhaquirks.danfoss.thermostat.DanfossThermostat) |
| 51 | + |
| 52 | + danfoss_time_cluster = device.endpoints[1].time |
| 53 | + danfoss_thermostat_cluster = device.endpoints[1].thermostat |
| 54 | + |
| 55 | + def mock_write(attributes, manufacturer=None): |
| 56 | + records = [ |
| 57 | + WriteAttributesStatusRecord(foundation.Status.SUCCESS) |
| 58 | + for _ in attributes |
| 59 | + ] |
| 60 | + return [records, []] |
| 61 | + |
| 62 | + patch_danfoss_trv_write = mock.patch.object( |
| 63 | + danfoss_time_cluster, |
| 64 | + "_write_attributes", |
| 65 | + mock.AsyncMock(side_effect=mock_write), |
| 66 | + ) |
| 67 | + |
| 68 | + with patch_danfoss_trv_write: |
| 69 | + await danfoss_thermostat_cluster.bind() |
| 70 | + |
| 71 | + assert 0x0000 in danfoss_time_cluster._attr_cache |
| 72 | + assert 0x0001 in danfoss_time_cluster._attr_cache |
| 73 | + assert 0x0002 in danfoss_time_cluster._attr_cache |
| 74 | + |
| 75 | + |
| 76 | +async def test_danfoss_thermostat_write_attributes(zigpy_device_from_quirk): |
| 77 | + """Test the Thermostat writes behaving correctly, in particular regarding setpoint.""" |
| 78 | + device = zigpy_device_from_quirk(zhaquirks.danfoss.thermostat.DanfossThermostat) |
| 79 | + |
| 80 | + danfoss_thermostat_cluster = device.endpoints[1].thermostat |
| 81 | + |
| 82 | + def mock_write(attributes, manufacturer=None): |
| 83 | + records = [ |
| 84 | + WriteAttributesStatusRecord(foundation.Status.SUCCESS) |
| 85 | + for _ in attributes |
| 86 | + ] |
| 87 | + return [records, []] |
| 88 | + |
| 89 | + setting = -100 |
| 90 | + operation = -0x01 |
| 91 | + |
| 92 | + def mock_setpoint(oper, sett, manufacturer=None): |
| 93 | + nonlocal operation, setting |
| 94 | + operation = oper |
| 95 | + setting = sett |
| 96 | + |
| 97 | + # data is written to trv |
| 98 | + patch_danfoss_trv_write = mock.patch.object( |
| 99 | + danfoss_thermostat_cluster, |
| 100 | + "_write_attributes", |
| 101 | + mock.AsyncMock(side_effect=mock_write), |
| 102 | + ) |
| 103 | + patch_danfoss_setpoint = mock.patch.object( |
| 104 | + danfoss_thermostat_cluster, |
| 105 | + "setpoint_command", |
| 106 | + mock.AsyncMock(side_effect=mock_setpoint), |
| 107 | + ) |
| 108 | + |
| 109 | + with patch_danfoss_trv_write: |
| 110 | + # data should be written to trv, but reach thermostat |
| 111 | + success, fail = await danfoss_thermostat_cluster.write_attributes( |
| 112 | + {"external_open_window_detected": False} |
| 113 | + ) |
| 114 | + assert success |
| 115 | + assert not fail |
| 116 | + assert not danfoss_thermostat_cluster._attr_cache[0x4003] |
| 117 | + |
| 118 | + with patch_danfoss_setpoint: |
| 119 | + # data should be received from danfoss_trv |
| 120 | + success, fail = await danfoss_thermostat_cluster.write_attributes( |
| 121 | + {"occupied_heating_setpoint": 6} |
| 122 | + ) |
| 123 | + assert success |
| 124 | + assert not fail |
| 125 | + assert danfoss_thermostat_cluster._attr_cache[0x0012] == 6 |
| 126 | + assert operation == 0x01 |
| 127 | + assert setting == 6 |
| 128 | + |
| 129 | + danfoss_thermostat_cluster._attr_cache[ |
| 130 | + 0x0015 |
| 131 | + ] = 5 # min_limit is present normally |
| 132 | + |
| 133 | + success, fail = await danfoss_thermostat_cluster.write_attributes( |
| 134 | + {"system_mode": 0x00} |
| 135 | + ) |
| 136 | + assert success |
| 137 | + assert not fail |
| 138 | + assert danfoss_thermostat_cluster._attr_cache[0x001C] == 0x04 |
| 139 | + |
| 140 | + # setpoint to min_limit, when system_mode to off |
| 141 | + assert danfoss_thermostat_cluster._attr_cache[0x0012] == 5 |
| 142 | + |
| 143 | + assert operation == 0x01 |
| 144 | + assert setting == 5 |
| 145 | + |
| 146 | + |
| 147 | +async def test_customized_standardcluster(zigpy_device_from_quirk): |
| 148 | + """Test customized standard cluster class correctly separating zigbee operations. |
| 149 | +
|
| 150 | + This is regarding manufacturer specific attributes. |
| 151 | + """ |
| 152 | + device = zigpy_device_from_quirk(zhaquirks.danfoss.thermostat.DanfossThermostat) |
| 153 | + |
| 154 | + danfoss_thermostat_cluster = device.endpoints[1].in_clusters[Thermostat.cluster_id] |
| 155 | + |
| 156 | + assert CustomizedStandardCluster.combine_results([[4545], [5433]], [[345]]) == [ |
| 157 | + [4545, 345], |
| 158 | + [5433], |
| 159 | + ] |
| 160 | + assert CustomizedStandardCluster.combine_results( |
| 161 | + [[4545], [5433]], [[345], [45355]] |
| 162 | + ) == [[4545, 345], [5433, 45355]] |
| 163 | + |
| 164 | + mock_attributes = { |
| 165 | + 656: ZCLAttributeDef(is_manufacturer_specific=True), |
| 166 | + 56454: ZCLAttributeDef(is_manufacturer_specific=False), |
| 167 | + } |
| 168 | + |
| 169 | + danfoss_thermostat_cluster.attributes = mock_attributes |
| 170 | + |
| 171 | + reports = None |
| 172 | + |
| 173 | + def mock_configure_reporting(reps, *args, **kwargs): |
| 174 | + nonlocal reports |
| 175 | + if mock_attributes[reps[0].attrid].is_manufacturer_specific: |
| 176 | + reports = reps |
| 177 | + |
| 178 | + return [[545], [4545]] |
| 179 | + |
| 180 | + # data is written to trv |
| 181 | + patch_danfoss_configure_reporting = mock.patch.object( |
| 182 | + CustomCluster, |
| 183 | + "_configure_reporting", |
| 184 | + mock.AsyncMock(side_effect=mock_configure_reporting), |
| 185 | + ) |
| 186 | + |
| 187 | + with patch_danfoss_configure_reporting: |
| 188 | + one = foundation.AttributeReportingConfig() |
| 189 | + one.direction = True |
| 190 | + one.timeout = 4 |
| 191 | + one.attrid = 56454 |
| 192 | + |
| 193 | + two = foundation.AttributeReportingConfig() |
| 194 | + two.direction = True |
| 195 | + two.timeout = 4 |
| 196 | + two.attrid = 656 |
| 197 | + await danfoss_thermostat_cluster._configure_reporting([one, two]) |
| 198 | + assert reports == [two] |
| 199 | + |
| 200 | + reports = None |
| 201 | + |
| 202 | + def mock_read_attributes(attrs, *args, **kwargs): |
| 203 | + nonlocal reports |
| 204 | + if mock_attributes[attrs[0]].is_manufacturer_specific: |
| 205 | + reports = attrs |
| 206 | + |
| 207 | + return [[545]] |
| 208 | + |
| 209 | + # data is written to trv |
| 210 | + patch_danfoss_read_attributes = mock.patch.object( |
| 211 | + CustomCluster, |
| 212 | + "_read_attributes", |
| 213 | + mock.AsyncMock(side_effect=mock_read_attributes), |
| 214 | + ) |
| 215 | + |
| 216 | + with patch_danfoss_read_attributes: |
| 217 | + result = await danfoss_thermostat_cluster._read_attributes([56454, 656]) |
| 218 | + assert result |
| 219 | + assert reports == [656] |
| 220 | + |
| 221 | + def mock_read_attributes_fail(attrs, *args, **kwargs): |
| 222 | + nonlocal reports |
| 223 | + if mock_attributes[attrs[0]].is_manufacturer_specific: |
| 224 | + reports = attrs |
| 225 | + |
| 226 | + return [[545], [4545]] |
| 227 | + |
| 228 | + # data is written to trv |
| 229 | + patch_danfoss_read_attributes_fail = mock.patch.object( |
| 230 | + CustomCluster, |
| 231 | + "_read_attributes", |
| 232 | + mock.AsyncMock(side_effect=mock_read_attributes_fail), |
| 233 | + ) |
| 234 | + |
| 235 | + with patch_danfoss_read_attributes_fail: |
| 236 | + result, fail = await danfoss_thermostat_cluster._read_attributes([56454, 656]) |
| 237 | + assert result |
| 238 | + assert fail |
| 239 | + assert reports == [656] |
0 commit comments