diff --git a/tests/test_tuya_pj1203.py b/tests/test_tuya_pj1203.py new file mode 100644 index 0000000000..4273e02e44 --- /dev/null +++ b/tests/test_tuya_pj1203.py @@ -0,0 +1,947 @@ +"""Tests for Tuya PJ-1203 Single Channel Clamp Energy Meter quirk.""" + +import pytest +from zigpy.profiles import zha +from zigpy.zcl import foundation +from zigpy.zcl.clusters.homeautomation import ElectricalMeasurement +from zigpy.zcl.clusters.smartenergy import Metering + +from tests.common import ClusterListener +from zhaquirks.const import MODELS_INFO +from zhaquirks.tuya.ts0601_pj1203 import ( + TuyaElectricalMeasurementPJ1203, + TuyaMeteringPJ1203, + TuyaPJ1203ManufCluster, + TuyaPJ1203PowerMeter, +) + + +@pytest.fixture +def pj1203_device(zigpy_device_from_quirk): + """Create a PJ-1203 device from quirk.""" + return zigpy_device_from_quirk(TuyaPJ1203PowerMeter) + + +def test_pj1203_signature_matches(): + """Test that quirk matches both manufacturer IDs.""" + assert ("_TZE284_cjbofhxw", "TS0601") in TuyaPJ1203PowerMeter.signature[MODELS_INFO] + assert ("_TZE204_cjbofhxw", "TS0601") in TuyaPJ1203PowerMeter.signature[MODELS_INFO] + + +def test_pj1203_device_type(): + """Test that replacement device type is METER_INTERFACE.""" + from zhaquirks.const import DEVICE_TYPE, ENDPOINTS + + replacement_device_type = TuyaPJ1203PowerMeter.replacement[ENDPOINTS][1][ + DEVICE_TYPE + ] + assert replacement_device_type == zha.DeviceType.METER_INTERFACE + + +def test_pj1203_clusters_present(pj1203_device): + """Test that the device has required clusters.""" + assert hasattr(pj1203_device.endpoints[1], "electrical_measurement") + assert hasattr(pj1203_device.endpoints[1], "tuya_manufacturer") + assert hasattr(pj1203_device.endpoints[1], "smartenergy_metering") + + +def test_pj1203_cluster_constants(): + """Test that cluster constants are correctly defined.""" + constants = TuyaElectricalMeasurementPJ1203._CONSTANT_ATTRIBUTES + + # Voltage divisor: 10 (device reports in dV) + assert constants[ElectricalMeasurement.AttributeDefs.ac_voltage_divisor.id] == 10 + assert constants[ElectricalMeasurement.AttributeDefs.ac_voltage_multiplier.id] == 1 + + # Current divisor: 1000 (device reports in mA) + assert constants[ElectricalMeasurement.AttributeDefs.ac_current_divisor.id] == 1000 + assert constants[ElectricalMeasurement.AttributeDefs.ac_current_multiplier.id] == 1 + + # Power divisor: 1 (already converted by DP converter) + assert constants[ElectricalMeasurement.AttributeDefs.ac_power_divisor.id] == 1 + assert constants[ElectricalMeasurement.AttributeDefs.ac_power_multiplier.id] == 1 + + +def test_pj1203_dp_mappings(): + """Test that DP to attribute mappings are correct.""" + dp_mappings = TuyaPJ1203ManufCluster.dp_to_attribute + + # DP 18 should map to current + assert 18 in dp_mappings + assert dp_mappings[18].attribute_name == "rms_current" + + # DP 19 should map to power (with converter) + assert 19 in dp_mappings + assert dp_mappings[19].attribute_name == "active_power" + # Test the converter divides by 10 + assert dp_mappings[19].converter(1000) == 100 + + # DP 20 should map to voltage + assert 20 in dp_mappings + assert dp_mappings[20].attribute_name == "rms_voltage" + + # DP 101 should map to total energy + assert 101 in dp_mappings + assert dp_mappings[101].attribute_name == "current_summ_delivered" + + +def test_pj1203_data_point_handlers(): + """Test that all DPs have handlers defined.""" + handlers = TuyaPJ1203ManufCluster.data_point_handlers + + assert 18 in handlers + assert 19 in handlers + assert 20 in handlers + assert 101 in handlers + + assert handlers[18] == "_dp_2_attr_update" + assert handlers[19] == "_dp_2_attr_update" + assert handlers[20] == "_dp_2_attr_update" + assert handlers[101] == "_dp_2_attr_update" + + +def test_pj1203_time_offset(): + """Test that time offset is set to 1970.""" + import datetime + + expected_offset = datetime.datetime(1970, 1, 1, tzinfo=datetime.UTC) + assert TuyaPJ1203ManufCluster.set_time_offset == expected_offset + + +async def test_pj1203_voltage_attribute_update(pj1203_device): + """Test voltage attribute update.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + em_listener = ClusterListener(em_cluster) + + # Simulate voltage update (2398 = 239.8V) + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_voltage.id, 2398 + ) + + assert len(em_listener.attribute_updates) >= 1 + voltage_update = next( + ( + u + for u in em_listener.attribute_updates + if u[0] == ElectricalMeasurement.AttributeDefs.rms_voltage.id + ), + None, + ) + assert voltage_update is not None + assert voltage_update[1] == 2398 + + +async def test_pj1203_current_attribute_update(pj1203_device): + """Test current attribute update.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + em_listener = ClusterListener(em_cluster) + + # Simulate current update (305 = 0.305A) + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_current.id, 305 + ) + + assert len(em_listener.attribute_updates) >= 1 + current_update = next( + ( + u + for u in em_listener.attribute_updates + if u[0] == ElectricalMeasurement.AttributeDefs.rms_current.id + ), + None, + ) + assert current_update is not None + assert current_update[1] == 305 + + +async def test_pj1203_power_attribute_update(pj1203_device): + """Test power attribute update.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + em_listener = ClusterListener(em_cluster) + + # Simulate power update (41 = 41W, already divided by 10 by converter) + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 41 + ) + + assert len(em_listener.attribute_updates) >= 1 + power_update = next( + ( + u + for u in em_listener.attribute_updates + if u[0] == ElectricalMeasurement.AttributeDefs.active_power.id + ), + None, + ) + assert power_update is not None + assert power_update[1] == 41 + + +async def test_pj1203_calculated_apparent_power(pj1203_device): + """Test that apparent power is calculated from voltage and current.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + em_listener = ClusterListener(em_cluster) + + # Update voltage: 2398 dV = 239.8V + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_voltage.id, 2398 + ) + + # Update current: 305 mA = 0.305A + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_current.id, 305 + ) + + # Find apparent power update + # Calculation: round((2398 * 305) / 10000) = 73 VA + apparent_power_update = next( + ( + u + for u in em_listener.attribute_updates + if u[0] == ElectricalMeasurement.AttributeDefs.apparent_power.id + ), + None, + ) + assert apparent_power_update is not None + expected_apparent_power = round((2398 * 305) / 10000) + assert apparent_power_update[1] == expected_apparent_power + + +async def test_pj1203_calculated_power_factor(pj1203_device): + """Test that power factor is calculated from active and apparent power.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + em_listener = ClusterListener(em_cluster) + + # Update voltage: 2398 dV + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_voltage.id, 2398 + ) + + # Update current: 305 mA + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_current.id, 305 + ) + + # Update active power: 41W + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 41 + ) + + # Find power factor update + # Apparent power = 73 VA + # Power factor = round((41 * 100) / 73) = 56% + power_factor_update = next( + ( + u + for u in em_listener.attribute_updates + if u[0] == ElectricalMeasurement.AttributeDefs.power_factor.id + ), + None, + ) + assert power_factor_update is not None + apparent_power = round((2398 * 305) / 10000) + expected_pf = min(round((abs(41) * 100) / apparent_power), 100) + assert power_factor_update[1] == expected_pf + + +async def test_pj1203_power_factor_capped_at_100(pj1203_device): + """Test that power factor is capped at 100%.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + em_listener = ClusterListener(em_cluster) + + # Set up values where calculated PF would exceed 100 + # Low voltage and current, high power + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_voltage.id, 1000 + ) + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_current.id, 100 + ) + # Apparent power = (1000 * 100) // 10000 = 10 VA + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 50 + ) + # Uncapped PF would be (50 * 100) // 10 = 500% + + # Find power factor update (should be capped at 100) + power_factor_updates = [ + u + for u in em_listener.attribute_updates + if u[0] == ElectricalMeasurement.AttributeDefs.power_factor.id + ] + assert len(power_factor_updates) > 0 + # Last update should be capped at 100 + assert power_factor_updates[-1][1] == 100 + + +async def test_pj1203_no_division_by_zero(pj1203_device): + """Test that power factor calculation handles zero apparent power.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Set voltage and current to values that result in zero apparent power + em_cluster._update_attribute(ElectricalMeasurement.AttributeDefs.rms_voltage.id, 0) + em_cluster._update_attribute(ElectricalMeasurement.AttributeDefs.rms_current.id, 0) + em_cluster._update_attribute(ElectricalMeasurement.AttributeDefs.active_power.id, 0) + + # Should not raise division by zero error + # Apparent power = 0, so power factor calculation is skipped + + +async def test_pj1203_read_attributes_constant(pj1203_device): + """Test reading constant attributes.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Read voltage divisor (constant attribute) + result = await em_cluster.read_attributes( + [ElectricalMeasurement.AttributeDefs.ac_voltage_divisor.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 10 + + +async def test_pj1203_read_attributes_cached(pj1203_device): + """Test reading cached attributes.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # First update an attribute + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_voltage.id, 2400 + ) + + # Then read it back + result = await em_cluster.read_attributes( + [ElectricalMeasurement.AttributeDefs.rms_voltage.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 2400 + + +async def test_pj1203_read_attributes_unsupported(pj1203_device): + """Test reading unsupported attributes returns UNSUPPORTED_ATTRIBUTE.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Read an attribute that's not in cache or constants + result = await em_cluster.read_attributes( + [ElectricalMeasurement.AttributeDefs.ac_frequency.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.UNSUPPORTED_ATTRIBUTE + + +async def test_pj1203_read_attributes_by_name(pj1203_device): + """Test reading attributes by name.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Update attribute first + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.rms_current.id, 500 + ) + + # Read by name + result = await em_cluster.read_attributes(["rms_current"], allow_cache=False) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 500 + + +async def test_pj1203_read_active_power_type(pj1203_device): + """Test that active_power is read with correct type (int16s).""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Update active power + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, -50 + ) + + # Read it back + result = await em_cluster.read_attributes( + [ElectricalMeasurement.AttributeDefs.active_power.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + # Should preserve negative value (signed int) + assert records[0].value.value == -50 + + +def test_pj1203_metering_cluster_constants(): + """Test that metering cluster constants are correctly defined.""" + constants = TuyaMeteringPJ1203._CONSTANT_ATTRIBUTES + + # Unit of measure: kWh (0x00) + assert constants[Metering.AttributeDefs.unit_of_measure.id] == 0x0000 + + # Multiplier and divisor for kWh conversion (raw value is in Wh) + assert constants[Metering.AttributeDefs.multiplier.id] == 1 + assert constants[Metering.AttributeDefs.divisor.id] == 1000 + + # Summation formatting + assert constants[Metering.AttributeDefs.summation_formatting.id] == 0b0_0100_011 + + +async def test_pj1203_energy_attribute_update(pj1203_device): + """Test energy attribute update on metering cluster.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + metering_listener = ClusterListener(metering_cluster) + + # Simulate energy update (12345 Wh = 12.345 kWh) + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 12345 + ) + + assert len(metering_listener.attribute_updates) >= 1 + energy_update = next( + ( + u + for u in metering_listener.attribute_updates + if u[0] == Metering.AttributeDefs.current_summ_delivered.id + ), + None, + ) + assert energy_update is not None + assert energy_update[1] == 12345 + + +async def test_pj1203_read_metering_attributes_constant(pj1203_device): + """Test reading constant metering attributes.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Read divisor (constant attribute) + result = await metering_cluster.read_attributes( + [Metering.AttributeDefs.divisor.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 1000 + + +async def test_pj1203_read_metering_attributes_cached(pj1203_device): + """Test reading cached metering attributes.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # First update an attribute + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 54321 + ) + + # Then read it back + result = await metering_cluster.read_attributes( + [Metering.AttributeDefs.current_summ_delivered.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 54321 + + +# Tests for energy integration feature + + +async def test_pj1203_energy_counter_reset_compensation(pj1203_device): + """Test that metering cluster compensates for device energy counter resets.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Initial energy reading + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 1000 + ) + assert metering_cluster.get_compensated_energy_wh() == 1000 + + # Energy increases normally + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 1500 + ) + assert metering_cluster.get_compensated_energy_wh() == 1500 + + # Device resets - value drops (simulating reconnect) + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 100 + ) + # Should compensate: 100 + 1500 (previous max) = 1600 + assert metering_cluster.get_compensated_energy_wh() == 1600 + + # Continue accumulating after reset + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 300 + ) + assert metering_cluster.get_compensated_energy_wh() == 1800 + + +async def test_pj1203_energy_counter_multiple_resets(pj1203_device): + """Test that multiple device resets are handled correctly.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # First session: accumulate to 500 + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 500 + ) + assert metering_cluster.get_compensated_energy_wh() == 500 + + # First reset + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 50 + ) + assert metering_cluster.get_compensated_energy_wh() == 550 + + # Accumulate more + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 200 + ) + assert metering_cluster.get_compensated_energy_wh() == 700 + + # Second reset + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 25 + ) + # Offset should now be 500 + 200 = 700, plus new value 25 = 725 + assert metering_cluster.get_compensated_energy_wh() == 725 + + +async def test_pj1203_power_integration_basic(pj1203_device): + """Test basic power-to-energy integration.""" + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Verify initial state + assert em_cluster._integrated_energy_wh == 0.0 + assert em_cluster._last_power_time is None + assert em_cluster._last_power_value is None + + # First power reading - just sets the baseline, no integration yet + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + assert em_cluster._last_power_value == 100 + assert em_cluster._integrated_energy_wh == 0.0 # No integration on first reading + assert em_cluster._last_power_time is not None + + +async def test_pj1203_power_integration_accumulation(pj1203_device, monkeypatch): + """Test that power readings are integrated over time.""" + from zhaquirks.tuya import ts0601_pj1203 + + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + # Mock time.monotonic to control time progression + mock_time = [0.0] + + def mock_monotonic(): + return mock_time[0] + + monkeypatch.setattr(ts0601_pj1203.time, "monotonic", mock_monotonic) + + # First reading at t=0 + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + + # Second reading at t=60 (1 minute later) with same power + mock_time[0] = 60.0 + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + + # Energy = avg_power * time = 100W * 60s / 3600 = 1.6667 Wh + expected_energy = (100 * 60) / 3600 + assert abs(em_cluster._integrated_energy_wh - expected_energy) < 0.01 + + +async def test_pj1203_power_integration_trapezoidal(pj1203_device, monkeypatch): + """Test trapezoidal integration with changing power levels.""" + from zhaquirks.tuya import ts0601_pj1203 + + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + mock_time = [0.0] + + def mock_monotonic(): + return mock_time[0] + + monkeypatch.setattr(ts0601_pj1203.time, "monotonic", mock_monotonic) + + # Start at 0W + em_cluster._update_attribute(ElectricalMeasurement.AttributeDefs.active_power.id, 0) + + # 60 seconds later, power jumps to 200W + mock_time[0] = 60.0 + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 200 + ) + + # Trapezoidal: avg(0, 200) * 60s / 3600 = 100 * 60 / 3600 = 1.6667 Wh + expected_energy = (100 * 60) / 3600 + assert abs(em_cluster._integrated_energy_wh - expected_energy) < 0.01 + + +async def test_pj1203_power_integration_gap_handling(pj1203_device, monkeypatch): + """Test that large time gaps don't cause spurious energy accumulation.""" + from zhaquirks.tuya import ts0601_pj1203 + + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + mock_time = [0.0] + + def mock_monotonic(): + return mock_time[0] + + monkeypatch.setattr(ts0601_pj1203.time, "monotonic", mock_monotonic) + + # Initial reading + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 1000 + ) + + # Gap larger than MAX_INTEGRATION_GAP (5 minutes = 300 seconds) + mock_time[0] = 600.0 # 10 minutes + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 1000 + ) + + # Should NOT integrate due to large gap + assert em_cluster._integrated_energy_wh == 0.0 + + +async def test_pj1203_power_integration_updates_metering(pj1203_device, monkeypatch): + """Test that integrated energy is passed to metering cluster.""" + from zhaquirks.tuya import ts0601_pj1203 + + em_cluster = pj1203_device.endpoints[1].electrical_measurement + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + mock_time = [0.0] + + def mock_monotonic(): + return mock_time[0] + + monkeypatch.setattr(ts0601_pj1203.time, "monotonic", mock_monotonic) + + # First reading + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + + # Second reading 60 seconds later + mock_time[0] = 60.0 + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + + # Check metering cluster received the integrated value (rounded) + # 100W * 60s / 3600 = 1.6667 Wh -> rounded to 2 + assert metering_cluster.get_integrated_energy_wh() == 2 + + +async def test_pj1203_reset_integrated_energy(pj1203_device, monkeypatch): + """Test resetting the integrated energy counter.""" + from zhaquirks.tuya import ts0601_pj1203 + + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + mock_time = [0.0] + + def mock_monotonic(): + return mock_time[0] + + monkeypatch.setattr(ts0601_pj1203.time, "monotonic", mock_monotonic) + + # Accumulate some energy + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + mock_time[0] = 60.0 # 60 seconds + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 100 + ) + + # 100W * 60s / 3600 = 1.6667 Wh + expected_energy = (100 * 60) / 3600 + assert abs(em_cluster._integrated_energy_wh - expected_energy) < 0.01 + + # Reset + em_cluster.reset_integrated_energy() + + assert em_cluster._integrated_energy_wh == 0.0 + assert em_cluster._last_power_time is None + assert em_cluster._last_power_value is None + + +async def test_pj1203_get_integrated_energy(pj1203_device, monkeypatch): + """Test get_integrated_energy_wh method.""" + from zhaquirks.tuya import ts0601_pj1203 + + em_cluster = pj1203_device.endpoints[1].electrical_measurement + + mock_time = [0.0] + + def mock_monotonic(): + return mock_time[0] + + monkeypatch.setattr(ts0601_pj1203.time, "monotonic", mock_monotonic) + + # Initial state + assert em_cluster.get_integrated_energy_wh() == 0.0 + + # Accumulate energy + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 50 + ) + mock_time[0] = 120.0 # 2 minutes + em_cluster._update_attribute( + ElectricalMeasurement.AttributeDefs.active_power.id, 50 + ) + + # 50W * 120s / 3600 = 1.6667 Wh + expected_energy = (50 * 120) / 3600 + assert abs(em_cluster.get_integrated_energy_wh() - expected_energy) < 0.01 + + +# Tests for cache restoration on quirk reload + + +async def test_pj1203_cache_restoration_on_reload(pj1203_device): + """Test that energy offset is restored from cache after quirk reload.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Simulate previous session: device accumulated to 500 Wh + # This would be in the cache after ZHA persisted it + metering_cluster._attr_cache[Metering.AttributeDefs.current_summ_delivered.id] = 500 + + # Simulate quirk reload - reset the internal state but keep the cache + metering_cluster._last_device_energy = None + metering_cluster._energy_offset = 0 + metering_cluster._initialized_from_cache = False + + # Device reports new value lower than cached (device reset to 100 Wh) + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 100 + ) + + # Should restore offset from cache (500) and add device value (100) + # Compensated = 100 + 500 = 600 Wh + assert metering_cluster.get_compensated_energy_wh() == 600 + assert metering_cluster._energy_offset == 500 + + +async def test_pj1203_cache_restoration_device_higher_than_cache(pj1203_device): + """Test that no offset is restored when device value >= cached value.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Cache has 500 Wh from previous session + metering_cluster._attr_cache[Metering.AttributeDefs.current_summ_delivered.id] = 500 + + # Reset internal state (simulating reload) + metering_cluster._last_device_energy = None + metering_cluster._energy_offset = 0 + metering_cluster._initialized_from_cache = False + + # Device reports value higher than cache (no reset occurred) + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 600 + ) + + # No offset should be added since device value >= cached + assert metering_cluster.get_compensated_energy_wh() == 600 + assert metering_cluster._energy_offset == 0 + + +async def test_pj1203_cache_restoration_no_cache(pj1203_device): + """Test behavior when no cached value exists (first boot or cache cleared).""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # No cached value (empty cache) + # _attr_cache should be empty or not contain the attribute + + # Reset internal state + metering_cluster._last_device_energy = None + metering_cluster._energy_offset = 0 + metering_cluster._initialized_from_cache = False + + # Device reports initial value + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 100 + ) + + # Should just use device value as-is (no restoration possible) + assert metering_cluster.get_compensated_energy_wh() == 100 + assert metering_cluster._energy_offset == 0 + + +async def test_pj1203_cache_restoration_then_device_reset(pj1203_device): + """Test cache restoration followed by a device reset during operation.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Cache has 500 Wh + metering_cluster._attr_cache[Metering.AttributeDefs.current_summ_delivered.id] = 500 + + # Reset internal state (simulating reload) + metering_cluster._last_device_energy = None + metering_cluster._energy_offset = 0 + metering_cluster._initialized_from_cache = False + + # First update: device at 100 Wh (lower than cache) + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 100 + ) + assert metering_cluster._energy_offset == 500 + assert metering_cluster.get_compensated_energy_wh() == 600 + + # Device accumulates to 300 Wh + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 300 + ) + assert metering_cluster.get_compensated_energy_wh() == 800 + + # Device resets again to 50 Wh + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 50 + ) + # Offset should be 500 (from cache) + 300 (from device reset) = 800 + assert metering_cluster._energy_offset == 800 + assert metering_cluster.get_compensated_energy_wh() == 850 + + +async def test_pj1203_cache_restoration_only_happens_once(pj1203_device): + """Test that cache restoration only happens on first update.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Cache has 1000 Wh + metering_cluster._attr_cache[Metering.AttributeDefs.current_summ_delivered.id] = ( + 1000 + ) + + # Reset internal state + metering_cluster._last_device_energy = None + metering_cluster._energy_offset = 0 + metering_cluster._initialized_from_cache = False + + # First update: device at 200 Wh (lower than cache) + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 200 + ) + assert metering_cluster._initialized_from_cache is True + assert metering_cluster._energy_offset == 1000 + + # Update cache to simulate ZHA updating it + metering_cluster._attr_cache[Metering.AttributeDefs.current_summ_delivered.id] = ( + 1200 + ) + + # Second update with lower value - should NOT re-check cache + # This is a normal device reset, not a cache restoration + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 100 + ) + + # Offset should be 1000 (initial) + 200 (from device reset) = 1200 + # NOT 1200 (from re-reading cache) + assert metering_cluster._energy_offset == 1200 + assert metering_cluster.get_compensated_energy_wh() == 1300 + + +async def test_pj1203_metering_update_non_energy_attribute(pj1203_device): + """Test that non-energy attributes are passed through unchanged on metering cluster.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + metering_listener = ClusterListener(metering_cluster) + + # Update a non-energy attribute (not current_summ_delivered) + # Use status attribute as an example + metering_cluster._update_attribute(Metering.AttributeDefs.status.id, 0x01) + + # Should be passed through to parent class + status_update = next( + ( + u + for u in metering_listener.attribute_updates + if u[0] == Metering.AttributeDefs.status.id + ), + None, + ) + assert status_update is not None + assert status_update[1] == 0x01 + + +async def test_pj1203_get_compensated_energy_before_any_updates(pj1203_device): + """Test get_compensated_energy_wh when no device energy has been received.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Reset state - no device energy received yet + metering_cluster._last_device_energy = None + metering_cluster._energy_offset = 100 # Some offset from cache restoration + + # Should return just the offset when no device energy received + assert metering_cluster.get_compensated_energy_wh() == 100 + + +async def test_pj1203_read_metering_attributes_by_name(pj1203_device): + """Test reading metering attributes by name.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Update attribute first + metering_cluster._update_attribute( + Metering.AttributeDefs.current_summ_delivered.id, 12345 + ) + + # Read by name + result = await metering_cluster.read_attributes( + ["current_summ_delivered"], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 12345 + + +async def test_pj1203_read_metering_summation_formatting(pj1203_device): + """Test reading summation_formatting constant attribute (bitmap8 type).""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + result = await metering_cluster.read_attributes( + [Metering.AttributeDefs.summation_formatting.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 0b0_0100_011 + + +async def test_pj1203_read_metering_unit_of_measure(pj1203_device): + """Test reading unit_of_measure constant attribute (enum8 type).""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + result = await metering_cluster.read_attributes( + [Metering.AttributeDefs.unit_of_measure.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.SUCCESS + assert records[0].value.value == 0x0000 # POWER_WATT + + +async def test_pj1203_read_metering_unsupported_attribute(pj1203_device): + """Test reading unsupported metering attributes returns UNSUPPORTED_ATTRIBUTE.""" + metering_cluster = pj1203_device.endpoints[1].smartenergy_metering + + # Read an attribute that's not in cache or constants + result = await metering_cluster.read_attributes( + [Metering.AttributeDefs.current_tier1_summ_delivered.id], allow_cache=False + ) + + records = result[0] + assert len(records) == 1 + assert records[0].status == foundation.Status.UNSUPPORTED_ATTRIBUTE diff --git a/zhaquirks/tuya/ts0601_pj1203.py b/zhaquirks/tuya/ts0601_pj1203.py new file mode 100644 index 0000000000..90ed3bf2f6 --- /dev/null +++ b/zhaquirks/tuya/ts0601_pj1203.py @@ -0,0 +1,442 @@ +"""Tuya PJ-1203 Single Channel Clamp Energy Meter. + +This quirk supports the Tuya PJ-1203 single channel clamp power meter. +The device reports voltage, current, power, and total energy. Apparent power +and power factor are calculated from these values. + +Energy tracking: The device's native energy counter (DP 101) resets on reconnects. +This quirk provides an additional calculated energy value that integrates power +over time, providing a more reliable cumulative energy measurement. + +Reset handling: The metering cluster tracks device counter resets and maintains +a cumulative offset. On quirk reload/restart, it attempts to restore the offset +from ZHA's attribute cache to maintain continuity. + +Manufacturer IDs: _TZE204_cjbofhxw, _TZE284_cjbofhxw +Model: TS0601 + +Datapoints: +- DP 18: Current (mA) +- DP 19: Power (W * 10) +- DP 20: Voltage (V * 10) +- DP 101: Total Energy (Wh) +""" + +import datetime +import logging +import time + +from zigpy.profiles import zha +from zigpy.quirks import CustomDevice +import zigpy.types as t +from zigpy.zcl import foundation +from zigpy.zcl.clusters.general import Basic, Groups, Ota, Scenes, Time +from zigpy.zcl.clusters.homeautomation import ElectricalMeasurement +from zigpy.zcl.clusters.smartenergy import Metering + +from zhaquirks.const import ( + DEVICE_TYPE, + ENDPOINTS, + INPUT_CLUSTERS, + MODELS_INFO, + OUTPUT_CLUSTERS, + PROFILE_ID, +) +from zhaquirks.tuya import NoManufacturerCluster, TuyaLocalCluster +from zhaquirks.tuya.mcu import DPToAttributeMapping, TuyaMCUCluster + +_LOGGER = logging.getLogger(__name__) + + +class TuyaElectricalMeasurementPJ1203(TuyaLocalCluster, ElectricalMeasurement): + """ElectricalMeasurement cluster for PJ-1203 with calculated apparent power and power factor. + + Also integrates power over time to calculate cumulative energy consumption. + """ + + cluster_id = ElectricalMeasurement.cluster_id + + _CONSTANT_ATTRIBUTES = { + ElectricalMeasurement.AttributeDefs.ac_current_divisor.id: 1000, + ElectricalMeasurement.AttributeDefs.ac_current_multiplier.id: 1, + ElectricalMeasurement.AttributeDefs.ac_power_divisor.id: 1, + ElectricalMeasurement.AttributeDefs.ac_power_multiplier.id: 1, + ElectricalMeasurement.AttributeDefs.ac_voltage_divisor.id: 10, + ElectricalMeasurement.AttributeDefs.ac_voltage_multiplier.id: 1, + } + + # Maximum gap between power readings to consider for integration (seconds) + # If gap is larger, we skip integration to avoid counting offline time + MAX_INTEGRATION_GAP = 300 # 5 minutes + + def __init__(self, *args, **kwargs): + """Initialize the cluster with energy integration state.""" + super().__init__(*args, **kwargs) + self._last_power_time: float | None = None + self._last_power_value: int | None = None + self._integrated_energy_wh: float = 0.0 + + def _update_attribute(self, attrid, value): + """Update attribute and calculate derived values.""" + super()._update_attribute(attrid, value) + + # Get current cached values + rms_voltage = self._attr_cache.get(self.AttributeDefs.rms_voltage.id) + rms_current = self._attr_cache.get(self.AttributeDefs.rms_current.id) + active_power = self._attr_cache.get(self.AttributeDefs.active_power.id) + + # Calculate apparent power and power factor when we have the necessary values + if rms_voltage is not None and rms_current is not None: + # Calculate apparent power: V * A + # voltage is in dV (tenths), current is in mA + # Result in VA (same scale as active_power in W) + apparent_power = round((rms_voltage * rms_current) / 10000) + super()._update_attribute( + self.AttributeDefs.apparent_power.id, apparent_power + ) + + # Calculate power factor if we have active power and apparent power + if active_power is not None and apparent_power > 0: + # Power factor as percentage (0-100) + power_factor = round((abs(active_power) * 100) / apparent_power) + power_factor = min(power_factor, 100) + super()._update_attribute( + self.AttributeDefs.power_factor.id, power_factor + ) + elif apparent_power == 0: + # No apparent power means power factor is undefined, set to 0 + super()._update_attribute(self.AttributeDefs.power_factor.id, 0) + + # Integrate power over time when active_power is updated + if attrid == self.AttributeDefs.active_power.id and value is not None: + self._integrate_power(value) + + def _integrate_power(self, power_watts: int) -> None: + """Integrate power over time to calculate energy consumption. + + Uses trapezoidal integration for better accuracy: takes average of + previous and current power readings multiplied by time delta. + + Args: + power_watts: Current power reading in watts + + """ + current_time = time.monotonic() + + if self._last_power_time is not None and self._last_power_value is not None: + time_delta = current_time - self._last_power_time + + # Only integrate if the gap is reasonable + if 0 < time_delta <= self.MAX_INTEGRATION_GAP: + # Trapezoidal integration: average power * time + avg_power = (self._last_power_value + power_watts) / 2 + # Convert: W * seconds -> Wh (divide by 3600) + energy_wh = (avg_power * time_delta) / 3600 + self._integrated_energy_wh += energy_wh + + # Update the metering cluster with integrated energy (in Wh) + metering = self.endpoint.smartenergy_metering + if hasattr(metering, "update_integrated_energy"): + metering.update_integrated_energy(round(self._integrated_energy_wh)) + + self._last_power_time = current_time + self._last_power_value = power_watts + + def get_integrated_energy_wh(self) -> float: + """Return the current integrated energy value in Wh.""" + return self._integrated_energy_wh + + def reset_integrated_energy(self) -> None: + """Reset the integrated energy counter.""" + self._integrated_energy_wh = 0.0 + self._last_power_time = None + self._last_power_value = None + + async def read_attributes( + self, attributes, allow_cache=False, only_cache=False, manufacturer=None + ): + """Read attributes ZCL foundation command.""" + records = [] + for attr in attributes: + if isinstance(attr, str): + attr_id = self.attributes_by_name[attr].id + else: + attr_id = attr + + # Check constant attributes first + if attr_id in self._CONSTANT_ATTRIBUTES: + records.append( + foundation.ReadAttributeRecord( + attr_id, + foundation.Status.SUCCESS, + foundation.TypeValue( + type=t.uint16_t, value=self._CONSTANT_ATTRIBUTES[attr_id] + ), + ) + ) + elif attr_id in self._attr_cache: + # Determine the correct type for the attribute + if attr_id == self.AttributeDefs.active_power.id: + attr_type = t.int16s + else: + attr_type = t.uint16_t + + records.append( + foundation.ReadAttributeRecord( + attr_id, + foundation.Status.SUCCESS, + foundation.TypeValue( + type=attr_type, value=self._attr_cache[attr_id] + ), + ) + ) + else: + records.append( + foundation.ReadAttributeRecord( + attr_id, + foundation.Status.UNSUPPORTED_ATTRIBUTE, + foundation.TypeValue(), + ) + ) + + return (records,) + + +class TuyaMeteringPJ1203(TuyaLocalCluster, Metering): + """Metering cluster for PJ-1203 to report total energy consumption. + + This cluster tracks energy in two ways: + 1. Device's native counter (DP 101) - resets on reconnects + 2. Integrated energy from power readings - more reliable cumulative value + + When the device's counter resets (value decreases), we compensate by + tracking the offset to maintain a continuous cumulative reading. + """ + + cluster_id = Metering.cluster_id + + POWER_WATT = 0x0000 + + # Mark instantaneous_demand as unsupported since this device only reports total energy + set_unsupported_attributes = {Metering.AttributeDefs.instantaneous_demand.id} + + _CONSTANT_ATTRIBUTES = { + Metering.AttributeDefs.unit_of_measure.id: POWER_WATT, + Metering.AttributeDefs.multiplier.id: 1, + Metering.AttributeDefs.divisor.id: 1000, # Device reports in Wh, convert to kWh + Metering.AttributeDefs.summation_formatting.id: 0b0_0100_011, # 4 digits after decimal + } + + def __init__(self, *args, **kwargs): + """Initialize the cluster with energy tracking state.""" + super().__init__(*args, **kwargs) + self._last_device_energy: int | None = None + self._energy_offset: int = 0 # Offset to add when device counter resets + self._integrated_energy_wh: int = 0 + self._initialized_from_cache: bool = False # Track if we've restored from cache + + def _update_attribute(self, attrid, value): + """Update attribute and handle device energy counter resets. + + On first update after initialization, attempts to restore the energy + offset from ZHA's attribute cache to maintain continuity across + quirk reloads/restarts. + """ + if attrid == Metering.AttributeDefs.current_summ_delivered.id: + # On first update, try to restore offset from cached value + if not self._initialized_from_cache: + self._initialized_from_cache = True + cached_value = self._attr_cache.get(attrid) + if cached_value is not None and value < cached_value: + # Device value is lower than cached - restore offset to maintain continuity + self._energy_offset = cached_value + _LOGGER.debug( + "PJ1203: Restored energy offset from cache. " + "Cached: %s Wh, Device: %s Wh, Offset set to: %s Wh", + cached_value, + value, + self._energy_offset, + ) + elif cached_value is not None: + _LOGGER.debug( + "PJ1203: Cache value (%s Wh) <= device value (%s Wh), " + "no offset restoration needed", + cached_value, + value, + ) + else: + _LOGGER.debug( + "PJ1203: No cached value available for offset restoration" + ) + + # Track device energy and detect resets during normal operation + if ( + self._last_device_energy is not None + and value < self._last_device_energy + ): + # Device counter reset detected - add previous value to offset + self._energy_offset += self._last_device_energy + _LOGGER.debug( + "PJ1203: Device counter reset detected. " + "Previous: %s Wh, New: %s Wh, Offset now: %s Wh", + self._last_device_energy, + value, + self._energy_offset, + ) + + self._last_device_energy = value + + # Report the compensated value (device value + offset) + compensated_value = value + self._energy_offset + super()._update_attribute(attrid, compensated_value) + else: + super()._update_attribute(attrid, value) + + def update_integrated_energy(self, energy_wh: int) -> None: + """Update the integrated energy value from power readings. + + This provides an alternative energy measurement that doesn't rely + on the device's resetting counter. + + Args: + energy_wh: Integrated energy in Wh + + """ + self._integrated_energy_wh = energy_wh + + def get_integrated_energy_wh(self) -> int: + """Return the integrated energy value in Wh.""" + return self._integrated_energy_wh + + def get_compensated_energy_wh(self) -> int: + """Return the compensated device energy (with reset handling) in Wh.""" + if self._last_device_energy is not None: + return self._last_device_energy + self._energy_offset + return self._energy_offset + + async def read_attributes( + self, attributes, allow_cache=False, only_cache=False, manufacturer=None + ): + """Read attributes ZCL foundation command.""" + records = [] + for attr in attributes: + if isinstance(attr, str): + attr_id = self.attributes_by_name[attr].id + else: + attr_id = attr + + # Check constant attributes first + if attr_id in self._CONSTANT_ATTRIBUTES: + # Determine the correct type for each constant attribute + if attr_id == Metering.AttributeDefs.summation_formatting.id: + attr_type = t.bitmap8 + elif attr_id == Metering.AttributeDefs.unit_of_measure.id: + attr_type = t.enum8 + else: + attr_type = t.uint24_t + + records.append( + foundation.ReadAttributeRecord( + attr_id, + foundation.Status.SUCCESS, + foundation.TypeValue( + type=attr_type, value=self._CONSTANT_ATTRIBUTES[attr_id] + ), + ) + ) + elif attr_id in self._attr_cache: + records.append( + foundation.ReadAttributeRecord( + attr_id, + foundation.Status.SUCCESS, + foundation.TypeValue( + type=t.uint48_t, value=self._attr_cache[attr_id] + ), + ) + ) + else: + records.append( + foundation.ReadAttributeRecord( + attr_id, + foundation.Status.UNSUPPORTED_ATTRIBUTE, + foundation.TypeValue(), + ) + ) + + return (records,) + + +class TuyaPJ1203ManufCluster(NoManufacturerCluster, TuyaMCUCluster): + """Manufacturer cluster for PJ-1203 single channel energy meter.""" + + set_time_offset = datetime.datetime(1970, 1, 1, tzinfo=datetime.UTC) + + dp_to_attribute: dict[int, DPToAttributeMapping] = { + 18: DPToAttributeMapping( + TuyaElectricalMeasurementPJ1203.ep_attribute, + "rms_current", + ), + 19: DPToAttributeMapping( + TuyaElectricalMeasurementPJ1203.ep_attribute, + "active_power", + converter=lambda x: x // 10, + ), + 20: DPToAttributeMapping( + TuyaElectricalMeasurementPJ1203.ep_attribute, + "rms_voltage", + ), + 101: DPToAttributeMapping( + TuyaMeteringPJ1203.ep_attribute, + "current_summ_delivered", + ), + } + + data_point_handlers = { + 18: "_dp_2_attr_update", + 19: "_dp_2_attr_update", + 20: "_dp_2_attr_update", + 101: "_dp_2_attr_update", + } + + +class TuyaPJ1203PowerMeter(CustomDevice): + """Tuya PJ-1203 single channel clamp energy meter.""" + + signature = { + MODELS_INFO: [ + ("_TZE204_cjbofhxw", "TS0601"), + ("_TZE284_cjbofhxw", "TS0601"), + ], + ENDPOINTS: { + 1: { + PROFILE_ID: zha.PROFILE_ID, + DEVICE_TYPE: zha.DeviceType.SMART_PLUG, + INPUT_CLUSTERS: [ + Basic.cluster_id, # 0x0000 + Groups.cluster_id, # 0x0004 + Scenes.cluster_id, # 0x0005 + TuyaMCUCluster.cluster_id, # 0xef00 + 0xED00, # 60672 + ], + OUTPUT_CLUSTERS: [Time.cluster_id, Ota.cluster_id], + } + }, + } + + replacement = { + ENDPOINTS: { + 1: { + PROFILE_ID: zha.PROFILE_ID, + DEVICE_TYPE: zha.DeviceType.METER_INTERFACE, + INPUT_CLUSTERS: [ + Basic.cluster_id, + Groups.cluster_id, + Scenes.cluster_id, + TuyaPJ1203ManufCluster, + TuyaElectricalMeasurementPJ1203, + TuyaMeteringPJ1203, + ], + OUTPUT_CLUSTERS: [Time.cluster_id, Ota.cluster_id], + } + } + }