|
2 | 2 |
|
3 | 3 | from unittest import mock
|
4 | 4 |
|
| 5 | +import pytest |
5 | 6 | from zigpy.zcl import foundation
|
| 7 | +from zigpy.zcl.clusters.general import Basic, PowerConfiguration |
6 | 8 | from zigpy.zcl.clusters.measurement import PM25
|
7 | 9 |
|
8 | 10 | import zhaquirks
|
9 | 11 | import zhaquirks.ikea.starkvind
|
10 | 12 |
|
| 13 | +from tests.common import ClusterListener |
| 14 | + |
11 | 15 | zhaquirks.setup()
|
12 | 16 |
|
13 | 17 |
|
@@ -122,3 +126,91 @@ def mock_read(attributes, manufacturer=None):
|
122 | 126 | assert success
|
123 | 127 | assert 6 in success.values()
|
124 | 128 | assert not fail
|
| 129 | + |
| 130 | + |
| 131 | +@mock.patch("zigpy.zcl.Cluster.bind", mock.AsyncMock()) |
| 132 | +@pytest.mark.parametrize( |
| 133 | + "firmware, pct_device, pct_correct, expected_pct_updates, expect_log_warning", |
| 134 | + ( |
| 135 | + ("2.3.075", 50, 100, 1, False), |
| 136 | + ("24.4.5", 50, 50, 2, False), |
| 137 | + ("invalid_fw_string", 50, 50, 2, True), |
| 138 | + ), |
| 139 | +) |
| 140 | +async def test_double_power_config_firmware( |
| 141 | + caplog, |
| 142 | + zigpy_device_from_quirk, |
| 143 | + firmware, |
| 144 | + pct_device, |
| 145 | + pct_correct, |
| 146 | + expected_pct_updates, |
| 147 | + expect_log_warning, |
| 148 | +): |
| 149 | + """Test battery percentage remaining is doubled for old firmware.""" |
| 150 | + |
| 151 | + device = zigpy_device_from_quirk(zhaquirks.ikea.fivebtnremote.IkeaTradfriRemote1) |
| 152 | + |
| 153 | + basic_cluster = device.endpoints[1].basic |
| 154 | + ClusterListener(basic_cluster) |
| 155 | + sw_build_id = Basic.AttributeDefs.sw_build_id.id |
| 156 | + |
| 157 | + power_cluster = device.endpoints[1].power |
| 158 | + power_listener = ClusterListener(power_cluster) |
| 159 | + battery_pct_id = PowerConfiguration.AttributeDefs.battery_percentage_remaining.id |
| 160 | + |
| 161 | + # fake read response for attributes: return plug_read argument for all attributes |
| 162 | + def mock_read(attributes, manufacturer=None): |
| 163 | + records = [ |
| 164 | + foundation.ReadAttributeRecord( |
| 165 | + attr, foundation.Status.SUCCESS, foundation.TypeValue(None, firmware) |
| 166 | + ) |
| 167 | + for attr in attributes |
| 168 | + ] |
| 169 | + return (records,) |
| 170 | + |
| 171 | + p1 = mock.patch.object(power_cluster, "create_catching_task") |
| 172 | + p2 = mock.patch.object( |
| 173 | + basic_cluster, "_read_attributes", mock.AsyncMock(side_effect=mock_read) |
| 174 | + ) |
| 175 | + |
| 176 | + with p1 as mock_task, p2 as request_mock: |
| 177 | + # update battery percentage with no firmware in attr cache, check pct doubled for now |
| 178 | + power_cluster.update_attribute(battery_pct_id, pct_device) |
| 179 | + assert len(power_listener.attribute_updates) == 1 |
| 180 | + assert power_listener.attribute_updates[0] == (battery_pct_id, pct_device * 2) |
| 181 | + |
| 182 | + # but also check that sw_build_id read is requested in the background for next update |
| 183 | + assert mock_task.call_count == 1 |
| 184 | + await mock_task.call_args[0][0] # await coroutine to read attribute |
| 185 | + assert request_mock.call_count == 1 # verify request to read sw_build_id |
| 186 | + assert request_mock.mock_calls[0][1][0][0] == sw_build_id |
| 187 | + |
| 188 | + # battery pct might be updated again when the attribute read returned new firmware, check pct not doubled then |
| 189 | + # if firmware turned out to be old or still unknown, do not update battery pct again, as we doubled it already |
| 190 | + assert len(power_listener.attribute_updates) == expected_pct_updates |
| 191 | + if expected_pct_updates > 2: |
| 192 | + assert power_listener.attribute_updates[1] == (battery_pct_id, pct_correct) |
| 193 | + |
| 194 | + # reset mocks for testing when sw_build_id is known next |
| 195 | + mock_task.reset_mock() |
| 196 | + request_mock.reset_mock() |
| 197 | + power_listener = ClusterListener(power_cluster) |
| 198 | + |
| 199 | + # update battery percentage with firmware in attr cache, check pct doubled if needed |
| 200 | + basic_cluster.update_attribute(sw_build_id, firmware) |
| 201 | + power_cluster.update_attribute(battery_pct_id, pct_device) |
| 202 | + assert len(power_listener.attribute_updates) == 1 |
| 203 | + assert power_listener.attribute_updates[0] == (battery_pct_id, pct_correct) |
| 204 | + |
| 205 | + # check no attribute reads were requested when sw_build_id is known |
| 206 | + assert mock_task.call_count == 0 |
| 207 | + assert request_mock.call_count == 0 |
| 208 | + |
| 209 | + # make sure a call to bind() always reads sw_build_id (e.g. on join or to refresh when repaired/reconfigured) |
| 210 | + await power_cluster.bind() |
| 211 | + assert request_mock.call_count == 1 |
| 212 | + assert request_mock.mock_calls[0][1][0][0] == sw_build_id |
| 213 | + |
| 214 | + # check log output if we expect a warning |
| 215 | + if expect_log_warning: |
| 216 | + assert f"sw_build_id is not a number: {firmware} for device" in caplog.text |
0 commit comments