Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions custom_components/ocpp/chargepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,14 +810,58 @@ def _sum_l123(phase_info: dict) -> float:
metric_unit = phase_info.get(om.unit.value)

if metric_unit == DEFAULT_POWER_UNIT:
self._metrics[(target_cid, metric)].value = metric_value / 1000
self._metrics[(target_cid, metric)].unit = HA_POWER_UNIT
final_value = metric_value / 1000
final_unit = HA_POWER_UNIT
elif metric_unit == DEFAULT_ENERGY_UNIT:
self._metrics[(target_cid, metric)].value = metric_value / 1000
self._metrics[(target_cid, metric)].unit = HA_ENERGY_UNIT
final_value = metric_value / 1000
final_unit = HA_ENERGY_UNIT
else:
self._metrics[(target_cid, metric)].value = metric_value
self._metrics[(target_cid, metric)].unit = metric_unit
final_value = metric_value
final_unit = metric_unit

self._metrics[(target_cid, metric)].value = final_value
self._metrics[(target_cid, metric)].unit = final_unit

# Amend session energy based on incoming Energy.Active.Import.Register values if the charger does not report session energy directly.
if metric == DEFAULT_MEASURAND and not getattr(
self, "_charger_reports_session_energy", False
):
# Verify we are in an active transaction
tx_metric = self._metrics.get(
(
target_cid,
csess.transaction_id.value,
)
)

if tx_metric and tx_metric.value:
# Get meter start and session energy metrics
ms_metric = self._metrics.get(
(
target_cid,
csess.meter_start.value,
)
)
se_metric = self._metrics.get(
(
target_cid,
csess.session_energy.value,
)
)

if ms_metric and se_metric:
# Initialize baseline if missing
if ms_metric.value is None:
ms_metric.value = final_value
ms_metric.unit = final_unit
se_metric.value = 0.0
se_metric.unit = final_unit
# Session Energy Math: Current Total - Start Total
elif ms_metric.unit == final_unit:
se_metric.value = (
round(1000 * (final_value - ms_metric.value)) / 1000
)
se_metric.unit = final_unit

@staticmethod
def get_energy_kwh(measurand_value: MeasurandValue) -> float:
Expand Down Expand Up @@ -982,7 +1026,9 @@ def process_measurands(
].value
else:
# Initialize baseline on first tx-bound EAIR; then derive Session = EAIR - meter_start.
ms_metric = self._metrics[(target_cid, csess.meter_start)]
ms_metric = self._metrics[
(target_cid, csess.meter_start.value)
]
if ms_metric.value is None:
ms_metric.value = value
ms_metric.unit = unit
Expand All @@ -1000,7 +1046,15 @@ def process_measurands(
(target_cid, csess.session_energy.value)
].unit = unit
else:
unprocessed.append(sampled_value)
normalized_value = MeasurandValue(
measurand=measurand,
value=value,
phase=phase,
unit=unit,
context=context,
location=location,
)
unprocessed.append(normalized_value)

try:
self.process_phases(unprocessed, connector_id)
Expand Down
91 changes: 91 additions & 0 deletions tests/test_charge_point_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,31 @@ def test_process_phases_voltage_and_current_branches(hass):
p_kw = cp._metrics[(2, "Power.Active.Import")].value
assert p_kw == (1000 + 2000 + 3000) / 1000 # -> 6 kW
assert cp._metrics[(2, "Power.Active.Import")].unit == HA_POWER_UNIT
# Energy.Active.Import.Register in Wh should become kWh when aggregated
bucket3 = [
_mv(
"Energy.Active.Import.Register",
1000.0,
phase="L1",
unit=DEFAULT_ENERGY_UNIT,
),
_mv(
"Energy.Active.Import.Register",
2000.0,
phase="L2",
unit=DEFAULT_ENERGY_UNIT,
),
_mv(
"Energy.Active.Import.Register",
3000.0,
phase="L3",
unit=DEFAULT_ENERGY_UNIT,
),
]
cp.process_phases(bucket3, connector_id=3)
e_kwh = cp._metrics[(3, "Energy.Active.Import.Register")].value
assert e_kwh == (1000 + 2000 + 3000) / 1000 # -> 6.0 kWh
assert cp._metrics[(3, "Energy.Active.Import.Register")].unit == HA_ENERGY_UNIT


def test_get_energy_kwh_and_session_derive(hass):
Expand Down Expand Up @@ -328,3 +353,69 @@ def create_call_error(self, exc):
await cp._handle_call(DummyMsg())

assert sent.get("payload") == "ERR_JSON"


def test_process_phases_calculates_session_energy(hass):
"""Test that process_phases derives real-time session energy for phase-tagged registers."""
cp = _mk_cp(hass)
target_cid = 1

# 1. Simulate an active charging session
# The car plugged in when the meter was at 100.0 kWh
cp._metrics[(target_cid, csess.meter_start.value)].value = 100.0
cp._metrics[(target_cid, csess.meter_start.value)].unit = HA_ENERGY_UNIT
# The transaction is currently active
cp._metrics[(target_cid, csess.transaction_id.value)].value = 999

# Pre-populate the session energy metric so it exists
cp._metrics[(target_cid, csess.session_energy.value)].value = 0.0
cp._metrics[(target_cid, csess.session_energy.value)].unit = HA_ENERGY_UNIT

# 2. Create the "unprocessed" payload from the charger (105.0 kWh on L1)
bucket = [
_mv("Energy.Active.Import.Register", 105.0, phase="L1", unit=HA_ENERGY_UNIT)
]

# 3. Run the modified function
cp.process_phases(bucket, connector_id=target_cid)

# 4. Assert that the math worked perfectly
main_register = cp._metrics[(target_cid, "Energy.Active.Import.Register")].value
session_energy = cp._metrics[(target_cid, csess.session_energy.value)].value

assert main_register == 105.0, "Main register should update to the new L1 value."
assert (
session_energy == 5.0
), "Session energy should be exactly 5.0 (105.0 - 100.0)."


def test_process_phases_initializes_session_energy_baseline(hass):
"""Test that process_phases initializes the session energy baseline if meter_start is missing."""
cp = _mk_cp(hass)
target_cid = 1

# 1. Simulate an active charging session BUT meter_start is None
cp._metrics[(target_cid, csess.meter_start.value)].value = None
cp._metrics[(target_cid, csess.transaction_id.value)].value = 999

# Pre-populate session energy
cp._metrics[(target_cid, csess.session_energy.value)].value = None

# 2. Create the "unprocessed" payload from the charger (105.0 kWh on L1)
bucket = [
_mv("Energy.Active.Import.Register", 105.0, phase="L1", unit=HA_ENERGY_UNIT)
]

# 3. Run the modified function
cp.process_phases(bucket, connector_id=target_cid)

# 4. Assert the baseline was initialized perfectly
meter_start = cp._metrics[(target_cid, csess.meter_start.value)].value
session_energy = cp._metrics[(target_cid, csess.session_energy.value)].value

assert (
meter_start == 105.0
), "Meter start should be initialized to the first L1 reading."
assert (
session_energy == 0.0
), "Session energy should be exactly 0.0 at initialization."
Loading