|
1 | 1 | """Test api module.""" |
2 | 2 |
|
3 | 3 | import asyncio |
| 4 | +import binascii |
4 | 5 | import logging |
5 | 6 |
|
6 | 7 | import pytest |
|
18 | 19 |
|
19 | 20 |
|
20 | 21 | @pytest.fixture |
21 | | -def api(event_loop): |
| 22 | +def uart_gw(): |
| 23 | + gw = MagicMock(auto_spec=uart.Gateway(MagicMock())) |
| 24 | + return gw |
| 25 | + |
| 26 | + |
| 27 | +@pytest.fixture |
| 28 | +def api(event_loop, uart_gw): |
22 | 29 | controller = MagicMock( |
23 | 30 | spec_set=zigpy_deconz.zigbee.application.ControllerApplication |
24 | 31 | ) |
25 | 32 | api = deconz_api.Deconz(controller, {zigpy.config.CONF_DEVICE_PATH: "/dev/null"}) |
26 | | - api._uart = MagicMock() |
| 33 | + api._uart = uart_gw |
27 | 34 | return api |
28 | 35 |
|
29 | 36 |
|
@@ -192,18 +199,18 @@ def test_data_received_unk_status(api, monkeypatch): |
192 | 199 | my_handler = MagicMock() |
193 | 200 |
|
194 | 201 | for cmd, cmd_opts in deconz_api.RX_COMMANDS.items(): |
195 | | - _, unsolicited = cmd_opts |
| 202 | + _, solicited = cmd_opts |
196 | 203 | payload = b"\x01\x02\x03\x04" |
197 | 204 | status = t.uint8_t(0xFE).serialize() |
198 | 205 | data = cmd.serialize() + b"\x00" + status + b"\x00\x00" + payload |
199 | 206 | setattr(api, "_handle_{}".format(cmd.name), my_handler) |
200 | 207 | api._awaiting[0] = MagicMock() |
201 | 208 | api.data_received(data) |
202 | | - assert t.deserialize.call_count == 1 |
203 | | - assert t.deserialize.call_args[0][0] == payload |
204 | | - if unsolicited: |
| 209 | + if solicited: |
205 | 210 | assert my_handler.call_count == 0 |
| 211 | + assert t.deserialize.call_count == 0 |
206 | 212 | else: |
| 213 | + assert t.deserialize.call_count == 1 |
207 | 214 | assert my_handler.call_count == 1 |
208 | 215 | t.deserialize.reset_mock() |
209 | 216 | my_handler.reset_mock() |
@@ -566,3 +573,30 @@ def test_tx_status(value, name): |
566 | 573 | def test_handle_add_neighbour(api): |
567 | 574 | """Test handle_add_neighbour.""" |
568 | 575 | api._handle_add_neighbour((12, 1, 0x1234, sentinel.ieee, 0x80)) |
| 576 | + |
| 577 | + |
| 578 | +@pytest.mark.parametrize("status", (0x00, 0x05)) |
| 579 | +async def test_aps_data_req_deserialize_error(api, uart_gw, status, caplog): |
| 580 | + """Test deserialization error.""" |
| 581 | + |
| 582 | + device_state = ( |
| 583 | + deconz_api.DeviceState.APSDE_DATA_INDICATION |
| 584 | + | deconz_api.DeviceState.APSDE_DATA_REQUEST_SLOTS_AVAILABLE |
| 585 | + | deconz_api.NetworkState.CONNECTED |
| 586 | + ) |
| 587 | + api._handle_device_state_value(device_state) |
| 588 | + await asyncio.sleep(0) |
| 589 | + await asyncio.sleep(0) |
| 590 | + await asyncio.sleep(0) |
| 591 | + assert uart_gw.send.call_count == 1 |
| 592 | + assert api._data_indication is True |
| 593 | + |
| 594 | + api.data_received( |
| 595 | + uart_gw.send.call_args[0][0][0:2] |
| 596 | + + bytes([status]) |
| 597 | + + binascii.unhexlify("0800010022") |
| 598 | + ) |
| 599 | + await asyncio.sleep(0) |
| 600 | + await asyncio.sleep(0) |
| 601 | + await asyncio.sleep(0) |
| 602 | + assert api._data_indication is False |
0 commit comments