From efd8a190215963afd79de3f0e25b070922c0083e Mon Sep 17 00:00:00 2001 From: Petar Petrov Date: Wed, 20 Aug 2025 11:50:38 +0300 Subject: [PATCH 1/2] Add Aqara S1 air quality monitor --- tests/test_xiaomi_airm_fhac01.py | 215 ++++++++++++++++++++++++++ zhaquirks/xiaomi/aqara/airm_fhac01.py | 44 ++++++ 2 files changed, 259 insertions(+) create mode 100644 tests/test_xiaomi_airm_fhac01.py create mode 100644 zhaquirks/xiaomi/aqara/airm_fhac01.py diff --git a/tests/test_xiaomi_airm_fhac01.py b/tests/test_xiaomi_airm_fhac01.py new file mode 100644 index 0000000000..37482a53fb --- /dev/null +++ b/tests/test_xiaomi_airm_fhac01.py @@ -0,0 +1,215 @@ +"""Test Aqara air quality monitor (lumi.airm.fhac01) quirk.""" + +import pytest +from zigpy.zcl.clusters.general import DeviceTemperature +from zigpy.zcl.clusters.measurement import ( + CarbonDioxideConcentration, + RelativeHumidity, + TemperatureMeasurement, +) + +from tests.common import ClusterListener +import zhaquirks +from zhaquirks.xiaomi import LUMI +import zhaquirks.xiaomi.aqara.airm_fhac01 # Import the quirk to register it + +zhaquirks.setup() + + +@pytest.fixture +def air_quality_device(zigpy_device_from_v2_quirk): + """Aqara air quality monitor device.""" + return zigpy_device_from_v2_quirk(LUMI, "lumi.airm.fhac01") + + +def test_device_creation(air_quality_device): + """Test that the device is created correctly with all expected clusters.""" + device = air_quality_device + + # Check that the device was created + assert device is not None + assert device.manufacturer == LUMI + assert device.model == "lumi.airm.fhac01" + + # Check that endpoint 1 exists + assert 1 in device.endpoints + endpoint = device.endpoints[1] + + # Check that all expected clusters are present + expected_clusters = { + CarbonDioxideConcentration.cluster_id, + DeviceTemperature.cluster_id, + TemperatureMeasurement.cluster_id, + RelativeHumidity.cluster_id, + } + + for cluster_id in expected_clusters: + assert cluster_id in endpoint.in_clusters, f"Missing cluster {cluster_id}" + + +def test_co2_concentration_scaling(air_quality_device): + """Test CO2 concentration cluster scaling (divide by 1e6).""" + device = air_quality_device + co2_cluster = device.endpoints[1].carbon_dioxide_concentration + co2_listener = ClusterListener(co2_cluster) + + # Test normal CO2 value with 6 extra zeros + test_value = 400_000_000 # Should represent 400 ppm + expected_value = 400.0 # After scaling + + co2_cluster._update_attribute( + CarbonDioxideConcentration.AttributeDefs.measured_value.id, test_value + ) + + assert len(co2_listener.attribute_updates) == 1 + assert ( + co2_listener.attribute_updates[0][0] + == CarbonDioxideConcentration.AttributeDefs.measured_value.id + ) + assert co2_listener.attribute_updates[0][1] == expected_value + + +def test_co2_concentration_scaling_edge_cases(air_quality_device): + """Test CO2 concentration scaling with edge cases.""" + device = air_quality_device + co2_cluster = device.endpoints[1].carbon_dioxide_concentration + co2_listener = ClusterListener(co2_cluster) + + test_cases = [ + (0, 0.0), # Zero value + (1_000_000, 1.0), # 1 ppm + (2000_000_000, 2000.0), # 2000 ppm (high but reasonable) + ] + + for i, (test_value, expected_value) in enumerate(test_cases): + co2_cluster._update_attribute( + CarbonDioxideConcentration.AttributeDefs.measured_value.id, test_value + ) + + assert len(co2_listener.attribute_updates) == i + 1 + assert co2_listener.attribute_updates[i][1] == expected_value + + +def test_device_temperature_scaling(air_quality_device): + """Test device temperature cluster scaling (multiply by 100).""" + device = air_quality_device + temp_cluster = device.endpoints[1].device_temperature + temp_listener = ClusterListener(temp_cluster) + + # Test normal temperature value divided by 100 + test_value = 25 # Should represent 25°C + expected_value = 2500 # After scaling (25°C * 100) + + temp_cluster._update_attribute( + DeviceTemperature.AttributeDefs.current_temperature.id, test_value + ) + + assert len(temp_listener.attribute_updates) == 1 + assert ( + temp_listener.attribute_updates[0][0] + == DeviceTemperature.AttributeDefs.current_temperature.id + ) + assert temp_listener.attribute_updates[0][1] == expected_value + + +def test_device_temperature_scaling_edge_cases(air_quality_device): + """Test device temperature scaling with edge cases.""" + device = air_quality_device + temp_cluster = device.endpoints[1].device_temperature + temp_listener = ClusterListener(temp_cluster) + + test_cases = [ + (0, 0), # 0°C + (-10, -1000), # -10°C (negative temperature) + (50, 5000), # 50°C (high temperature) + (1, 100), # 1°C + ] + + for i, (test_value, expected_value) in enumerate(test_cases): + temp_cluster._update_attribute( + DeviceTemperature.AttributeDefs.current_temperature.id, test_value + ) + + assert len(temp_listener.attribute_updates) == i + 1 + assert temp_listener.attribute_updates[i][1] == expected_value + + +def test_other_clusters_unchanged(air_quality_device): + """Test that other clusters work normally without scaling.""" + device = air_quality_device + + # Test TemperatureMeasurement cluster (should work normally) + temp_measurement_cluster = device.endpoints[1].temperature + temp_measurement_listener = ClusterListener(temp_measurement_cluster) + + test_value = 2500 # 25.0°C in centidegrees + temp_measurement_cluster._update_attribute( + TemperatureMeasurement.AttributeDefs.measured_value.id, test_value + ) + + assert len(temp_measurement_listener.attribute_updates) == 1 + assert temp_measurement_listener.attribute_updates[0][1] == test_value # No scaling + + # Test RelativeHumidity cluster (should work normally) + humidity_cluster = device.endpoints[1].humidity + humidity_listener = ClusterListener(humidity_cluster) + + test_value = 5000 # 50.0% humidity + humidity_cluster._update_attribute( + RelativeHumidity.AttributeDefs.measured_value.id, test_value + ) + + assert len(humidity_listener.attribute_updates) == 1 + assert humidity_listener.attribute_updates[0][1] == test_value # No scaling + + +def test_co2_other_attributes_unchanged(air_quality_device): + """Test that other CO2 cluster attributes are not affected by scaling.""" + device = air_quality_device + co2_cluster = device.endpoints[1].carbon_dioxide_concentration + co2_listener = ClusterListener(co2_cluster) + + # Test min_measured_value attribute (should not be scaled) + test_value = 1000 + co2_cluster._update_attribute( + CarbonDioxideConcentration.AttributeDefs.min_measured_value.id, test_value + ) + + assert len(co2_listener.attribute_updates) == 1 + assert co2_listener.attribute_updates[0][1] == test_value # No scaling + + +def test_device_temperature_other_attributes_unchanged(air_quality_device): + """Test that other device temperature cluster attributes are not affected by scaling.""" + device = air_quality_device + temp_cluster = device.endpoints[1].device_temperature + temp_listener = ClusterListener(temp_cluster) + + # Test min_temp_experienced attribute (should not be scaled) + test_value = 20 + temp_cluster._update_attribute( + DeviceTemperature.AttributeDefs.min_temp_experienced.id, test_value + ) + + assert len(temp_listener.attribute_updates) == 1 + assert temp_listener.attribute_updates[0][1] == test_value # No scaling + + +def test_quirk_registration(): + """Test that the quirk is properly registered.""" + import zigpy.quirks + + # Check that the quirk is registered for the correct manufacturer and model + registry = zigpy.quirks.DEVICE_REGISTRY + + # For v2 quirks, check the registry_v2 + found_quirk = False + for quirk_entries in registry.registry_v2.values(): + for entry in quirk_entries: + if entry.manufacturer == LUMI and entry.model == "lumi.airm.fhac01": + found_quirk = True + break + if found_quirk: + break + + assert found_quirk, "Quirk not found in registry" diff --git a/zhaquirks/xiaomi/aqara/airm_fhac01.py b/zhaquirks/xiaomi/aqara/airm_fhac01.py new file mode 100644 index 0000000000..b4c8aaa95d --- /dev/null +++ b/zhaquirks/xiaomi/aqara/airm_fhac01.py @@ -0,0 +1,44 @@ +"""Quirk for LUMI lumi.airm.fhac01 air quality monitor.""" + +from zigpy.quirks import CustomCluster +from zigpy.quirks.v2 import QuirkBuilder +from zigpy.zcl.clusters.general import DeviceTemperature +from zigpy.zcl.clusters.measurement import ( + CarbonDioxideConcentration, + RelativeHumidity, + TemperatureMeasurement, +) + +from zhaquirks.xiaomi import LUMI + + +class CarbonDioxideConcentrationCluster(CustomCluster, CarbonDioxideConcentration): + """Carbon Dioxide concentration cluster that fixes the scaling issue.""" + + def _update_attribute(self, attrid, value): + """Fix CO2 concentration scaling by dividing by 1e6.""" + if attrid == CarbonDioxideConcentration.AttributeDefs.measured_value.id: + # The device reports values with 6 extra zeros, so divide by 1e6 + value = value / 1_000_000 + super()._update_attribute(attrid, value) + + +class CustomDeviceTemperature(CustomCluster, DeviceTemperature): + """Temperature measurement cluster that fixes the scaling issue.""" + + def _update_attribute(self, attrid, value): + """Fix temperature scaling by multiplying by 100.""" + if attrid == DeviceTemperature.AttributeDefs.current_temperature.id: + # The device reports temperature divided by 100, so multiply by 100 + value = value * 100 + super()._update_attribute(attrid, value) + + +( + QuirkBuilder(LUMI, "lumi.airm.fhac01") + .adds(TemperatureMeasurement) + .adds(RelativeHumidity) + .replaces(CarbonDioxideConcentrationCluster) + .replaces(CustomDeviceTemperature) + .add_to_registry() +) From 77b5bac7b22db474745271dc26a214d3bfe7575d Mon Sep 17 00:00:00 2001 From: Petar Petrov Date: Wed, 20 Aug 2025 12:00:58 +0300 Subject: [PATCH 2/2] fix test --- tests/test_xiaomi_airm_fhac01.py | 219 +++++++++---------------------- 1 file changed, 63 insertions(+), 156 deletions(-) diff --git a/tests/test_xiaomi_airm_fhac01.py b/tests/test_xiaomi_airm_fhac01.py index 37482a53fb..155202535f 100644 --- a/tests/test_xiaomi_airm_fhac01.py +++ b/tests/test_xiaomi_airm_fhac01.py @@ -1,79 +1,34 @@ """Test Aqara air quality monitor (lumi.airm.fhac01) quirk.""" -import pytest from zigpy.zcl.clusters.general import DeviceTemperature -from zigpy.zcl.clusters.measurement import ( - CarbonDioxideConcentration, - RelativeHumidity, - TemperatureMeasurement, -) - -from tests.common import ClusterListener -import zhaquirks -from zhaquirks.xiaomi import LUMI -import zhaquirks.xiaomi.aqara.airm_fhac01 # Import the quirk to register it - -zhaquirks.setup() - - -@pytest.fixture -def air_quality_device(zigpy_device_from_v2_quirk): - """Aqara air quality monitor device.""" - return zigpy_device_from_v2_quirk(LUMI, "lumi.airm.fhac01") - +from zigpy.zcl.clusters.measurement import CarbonDioxideConcentration -def test_device_creation(air_quality_device): - """Test that the device is created correctly with all expected clusters.""" - device = air_quality_device - - # Check that the device was created - assert device is not None - assert device.manufacturer == LUMI - assert device.model == "lumi.airm.fhac01" - - # Check that endpoint 1 exists - assert 1 in device.endpoints - endpoint = device.endpoints[1] - - # Check that all expected clusters are present - expected_clusters = { - CarbonDioxideConcentration.cluster_id, - DeviceTemperature.cluster_id, - TemperatureMeasurement.cluster_id, - RelativeHumidity.cluster_id, - } - - for cluster_id in expected_clusters: - assert cluster_id in endpoint.in_clusters, f"Missing cluster {cluster_id}" +from zhaquirks.xiaomi.aqara.airm_fhac01 import ( + CarbonDioxideConcentrationCluster, + CustomDeviceTemperature, +) -def test_co2_concentration_scaling(air_quality_device): - """Test CO2 concentration cluster scaling (divide by 1e6).""" - device = air_quality_device - co2_cluster = device.endpoints[1].carbon_dioxide_concentration - co2_listener = ClusterListener(co2_cluster) +def test_co2_concentration_cluster_scaling(): + """Test CO2 concentration cluster scaling functionality.""" + cluster = CarbonDioxideConcentrationCluster(None, None) # Test normal CO2 value with 6 extra zeros test_value = 400_000_000 # Should represent 400 ppm expected_value = 400.0 # After scaling - co2_cluster._update_attribute( + cluster._update_attribute( CarbonDioxideConcentration.AttributeDefs.measured_value.id, test_value ) - assert len(co2_listener.attribute_updates) == 1 - assert ( - co2_listener.attribute_updates[0][0] - == CarbonDioxideConcentration.AttributeDefs.measured_value.id - ) - assert co2_listener.attribute_updates[0][1] == expected_value + # Check that the value was correctly scaled + actual_value = cluster.get("measured_value") + assert actual_value == expected_value -def test_co2_concentration_scaling_edge_cases(air_quality_device): - """Test CO2 concentration scaling with edge cases.""" - device = air_quality_device - co2_cluster = device.endpoints[1].carbon_dioxide_concentration - co2_listener = ClusterListener(co2_cluster) +def test_co2_concentration_cluster_edge_cases(): + """Test CO2 concentration cluster with edge cases.""" + cluster = CarbonDioxideConcentrationCluster(None, None) test_cases = [ (0, 0.0), # Zero value @@ -81,135 +36,87 @@ def test_co2_concentration_scaling_edge_cases(air_quality_device): (2000_000_000, 2000.0), # 2000 ppm (high but reasonable) ] - for i, (test_value, expected_value) in enumerate(test_cases): - co2_cluster._update_attribute( + for test_value, expected_value in test_cases: + cluster._update_attribute( CarbonDioxideConcentration.AttributeDefs.measured_value.id, test_value ) + actual_value = cluster.get("measured_value") + assert actual_value == expected_value + + +def test_co2_concentration_other_attributes_unchanged(): + """Test that other CO2 cluster attributes are not affected by scaling.""" + cluster = CarbonDioxideConcentrationCluster(None, None) + + # Test min_measured_value attribute (should not be scaled) + test_value = 1000 + cluster._update_attribute( + CarbonDioxideConcentration.AttributeDefs.min_measured_value.id, test_value + ) - assert len(co2_listener.attribute_updates) == i + 1 - assert co2_listener.attribute_updates[i][1] == expected_value + actual_value = cluster.get("min_measured_value") + assert actual_value == test_value # No scaling -def test_device_temperature_scaling(air_quality_device): - """Test device temperature cluster scaling (multiply by 100).""" - device = air_quality_device - temp_cluster = device.endpoints[1].device_temperature - temp_listener = ClusterListener(temp_cluster) +def test_device_temperature_cluster_scaling(): + """Test device temperature cluster scaling functionality.""" + cluster = CustomDeviceTemperature(None, None) # Test normal temperature value divided by 100 test_value = 25 # Should represent 25°C expected_value = 2500 # After scaling (25°C * 100) - temp_cluster._update_attribute( + cluster._update_attribute( DeviceTemperature.AttributeDefs.current_temperature.id, test_value ) - assert len(temp_listener.attribute_updates) == 1 - assert ( - temp_listener.attribute_updates[0][0] - == DeviceTemperature.AttributeDefs.current_temperature.id - ) - assert temp_listener.attribute_updates[0][1] == expected_value + # Check that the value was correctly scaled + actual_value = cluster.get("current_temperature") + assert actual_value == expected_value -def test_device_temperature_scaling_edge_cases(air_quality_device): - """Test device temperature scaling with edge cases.""" - device = air_quality_device - temp_cluster = device.endpoints[1].device_temperature - temp_listener = ClusterListener(temp_cluster) +def test_device_temperature_cluster_edge_cases(): + """Test device temperature cluster with edge cases.""" + cluster = CustomDeviceTemperature(None, None) test_cases = [ (0, 0), # 0°C - (-10, -1000), # -10°C (negative temperature) - (50, 5000), # 50°C (high temperature) + (-10, -1000), # -10°C + (100, 10000), # 100°C (1, 100), # 1°C ] - for i, (test_value, expected_value) in enumerate(test_cases): - temp_cluster._update_attribute( + for test_value, expected_value in test_cases: + cluster._update_attribute( DeviceTemperature.AttributeDefs.current_temperature.id, test_value ) + actual_value = cluster.get("current_temperature") + assert actual_value == expected_value - assert len(temp_listener.attribute_updates) == i + 1 - assert temp_listener.attribute_updates[i][1] == expected_value - -def test_other_clusters_unchanged(air_quality_device): - """Test that other clusters work normally without scaling.""" - device = air_quality_device - - # Test TemperatureMeasurement cluster (should work normally) - temp_measurement_cluster = device.endpoints[1].temperature - temp_measurement_listener = ClusterListener(temp_measurement_cluster) - - test_value = 2500 # 25.0°C in centidegrees - temp_measurement_cluster._update_attribute( - TemperatureMeasurement.AttributeDefs.measured_value.id, test_value - ) - - assert len(temp_measurement_listener.attribute_updates) == 1 - assert temp_measurement_listener.attribute_updates[0][1] == test_value # No scaling - - # Test RelativeHumidity cluster (should work normally) - humidity_cluster = device.endpoints[1].humidity - humidity_listener = ClusterListener(humidity_cluster) - - test_value = 5000 # 50.0% humidity - humidity_cluster._update_attribute( - RelativeHumidity.AttributeDefs.measured_value.id, test_value - ) - - assert len(humidity_listener.attribute_updates) == 1 - assert humidity_listener.attribute_updates[0][1] == test_value # No scaling - - -def test_co2_other_attributes_unchanged(air_quality_device): - """Test that other CO2 cluster attributes are not affected by scaling.""" - device = air_quality_device - co2_cluster = device.endpoints[1].carbon_dioxide_concentration - co2_listener = ClusterListener(co2_cluster) - - # Test min_measured_value attribute (should not be scaled) - test_value = 1000 - co2_cluster._update_attribute( - CarbonDioxideConcentration.AttributeDefs.min_measured_value.id, test_value - ) - - assert len(co2_listener.attribute_updates) == 1 - assert co2_listener.attribute_updates[0][1] == test_value # No scaling - - -def test_device_temperature_other_attributes_unchanged(air_quality_device): +def test_device_temperature_other_attributes_unchanged(): """Test that other device temperature cluster attributes are not affected by scaling.""" - device = air_quality_device - temp_cluster = device.endpoints[1].device_temperature - temp_listener = ClusterListener(temp_cluster) + cluster = CustomDeviceTemperature(None, None) # Test min_temp_experienced attribute (should not be scaled) test_value = 20 - temp_cluster._update_attribute( + cluster._update_attribute( DeviceTemperature.AttributeDefs.min_temp_experienced.id, test_value ) - assert len(temp_listener.attribute_updates) == 1 - assert temp_listener.attribute_updates[0][1] == test_value # No scaling - + actual_value = cluster.get("min_temp_experienced") + assert actual_value == test_value # No scaling -def test_quirk_registration(): - """Test that the quirk is properly registered.""" - import zigpy.quirks - # Check that the quirk is registered for the correct manufacturer and model - registry = zigpy.quirks.DEVICE_REGISTRY +def test_cluster_inheritance(): + """Test that clusters properly inherit from their base classes.""" + co2_cluster = CarbonDioxideConcentrationCluster(None, None) + temp_cluster = CustomDeviceTemperature(None, None) - # For v2 quirks, check the registry_v2 - found_quirk = False - for quirk_entries in registry.registry_v2.values(): - for entry in quirk_entries: - if entry.manufacturer == LUMI and entry.model == "lumi.airm.fhac01": - found_quirk = True - break - if found_quirk: - break + # Check that they are instances of the correct base classes + assert isinstance(co2_cluster, CarbonDioxideConcentration) + assert isinstance(temp_cluster, DeviceTemperature) - assert found_quirk, "Quirk not found in registry" + # Check that they have the expected cluster IDs + assert co2_cluster.cluster_id == CarbonDioxideConcentration.cluster_id + assert temp_cluster.cluster_id == DeviceTemperature.cluster_id