|
1 | 1 | """Test the driver model.""" |
2 | 2 |
|
3 | 3 | import json |
| 4 | +from typing import Any |
4 | 5 |
|
5 | 6 | import pytest |
6 | 7 |
|
7 | 8 | from zwave_js_server.const import LogLevel |
8 | 9 | from zwave_js_server.event import Event |
9 | 10 | from zwave_js_server.model import ( |
10 | | - driver as driver_pkg, |
11 | 11 | log_config as log_config_pkg, |
12 | 12 | log_message as log_message_pkg, |
13 | 13 | ) |
| 14 | +from zwave_js_server.model.driver import Driver |
14 | 15 | from zwave_js_server.model.driver.firmware import DriverFirmwareUpdateStatus |
| 16 | +from zwave_js_server.model.firmware import FirmwareUpdateData, FirmwareUpdateInfo |
15 | 17 |
|
16 | 18 | from .. import load_fixture |
| 19 | +from ..common import MockCommandProtocol |
| 20 | +from .common import FIRMWARE_UPDATE_INFO |
17 | 21 |
|
18 | 22 |
|
19 | 23 | def test_from_state(client, log_config): |
20 | 24 | """Test from_state method.""" |
21 | 25 | ws_msgs = load_fixture("basic_dump.txt").strip().split("\n") |
22 | 26 |
|
23 | | - driver = driver_pkg.Driver( |
24 | | - client, json.loads(ws_msgs[0])["result"]["state"], log_config |
25 | | - ) |
26 | | - assert driver == driver_pkg.Driver( |
| 27 | + driver = Driver(client, json.loads(ws_msgs[0])["result"]["state"], log_config) |
| 28 | + assert driver == Driver( |
27 | 29 | client, json.loads(ws_msgs[0])["result"]["state"], log_config |
28 | 30 | ) |
29 | 31 | assert driver != driver.controller.home_id |
@@ -452,3 +454,93 @@ async def test_firmware_events(driver): |
452 | 454 | assert result.status == DriverFirmwareUpdateStatus.OK |
453 | 455 | assert result.success |
454 | 456 | assert driver.firmware_update_progress is None |
| 457 | + |
| 458 | + |
| 459 | +@pytest.mark.parametrize("progress", [True, False]) |
| 460 | +async def test_is_otw_firmware_update_in_progress( |
| 461 | + driver: Driver, |
| 462 | + uuid4: str, |
| 463 | + mock_command: MockCommandProtocol, |
| 464 | + progress: bool, |
| 465 | +) -> None: |
| 466 | + """Test is_otw_firmware_update_in_progress command.""" |
| 467 | + ack_commands = mock_command( |
| 468 | + {"command": "driver.is_otw_firmware_update_in_progress"}, |
| 469 | + {"progress": progress}, |
| 470 | + ) |
| 471 | + assert await driver.async_is_otw_firmware_update_in_progress() is progress |
| 472 | + |
| 473 | + assert len(ack_commands) == 1 |
| 474 | + assert ack_commands[0] == { |
| 475 | + "command": "driver.is_otw_firmware_update_in_progress", |
| 476 | + "messageId": uuid4, |
| 477 | + } |
| 478 | + |
| 479 | + |
| 480 | +@pytest.mark.parametrize( |
| 481 | + ("firmware_update_param", "expected_params"), |
| 482 | + [ |
| 483 | + ( |
| 484 | + { |
| 485 | + "update_data": FirmwareUpdateData( |
| 486 | + filename="test-file", file=b"test", file_format=None |
| 487 | + ) |
| 488 | + }, |
| 489 | + FirmwareUpdateData( |
| 490 | + filename="test-file", file=b"test", file_format=None |
| 491 | + ).to_dict(), |
| 492 | + ), |
| 493 | + ( |
| 494 | + {"update_info": FirmwareUpdateInfo.from_dict(FIRMWARE_UPDATE_INFO)}, |
| 495 | + {"updateInfo": FIRMWARE_UPDATE_INFO}, |
| 496 | + ), |
| 497 | + ], |
| 498 | +) |
| 499 | +@pytest.mark.parametrize(("status", "success"), [(1, True), (0, False)]) |
| 500 | +async def test_firmware_update_otw( |
| 501 | + driver: Driver, |
| 502 | + uuid4: str, |
| 503 | + mock_command: MockCommandProtocol, |
| 504 | + status: int, |
| 505 | + success: bool, |
| 506 | + firmware_update_param: dict[str, Any], |
| 507 | + expected_params: dict[str, Any], |
| 508 | +) -> None: |
| 509 | + """Test firmware_update_otw command.""" |
| 510 | + ack_commands = mock_command( |
| 511 | + {"command": "driver.firmware_update_otw", **expected_params}, |
| 512 | + {"result": {"status": status, "success": success}}, |
| 513 | + ) |
| 514 | + result = await driver.async_firmware_update_otw(**firmware_update_param) |
| 515 | + |
| 516 | + assert result.status == status |
| 517 | + assert result.success is success |
| 518 | + |
| 519 | + assert len(ack_commands) == 1 |
| 520 | + assert ack_commands[0] == { |
| 521 | + "command": "driver.firmware_update_otw", |
| 522 | + "messageId": uuid4, |
| 523 | + **expected_params, |
| 524 | + } |
| 525 | + |
| 526 | + |
| 527 | +@pytest.mark.parametrize( |
| 528 | + "firmware_update_param", |
| 529 | + [ |
| 530 | + {}, # No parameters |
| 531 | + { |
| 532 | + "update_data": FirmwareUpdateData( |
| 533 | + filename="test-file", file=b"test", file_format=None |
| 534 | + ), |
| 535 | + "update_info": FirmwareUpdateInfo.from_dict(FIRMWARE_UPDATE_INFO), |
| 536 | + }, # Invalid parameters |
| 537 | + ], |
| 538 | +) |
| 539 | +async def test_firmware_update_otw_invalid_params( |
| 540 | + driver: Driver, |
| 541 | + firmware_update_param: dict[str, FirmwareUpdateData | FirmwareUpdateInfo], |
| 542 | +) -> None: |
| 543 | + """Test firmware_update_otw command invalid parameters.""" |
| 544 | + with pytest.raises(ValueError): |
| 545 | + # Should raise ValueError if no parameters are provided or both are provided |
| 546 | + await driver.async_firmware_update_otw(**firmware_update_param) |
0 commit comments