Skip to content

Commit c6aa1aa

Browse files
authored
Merge pull request #519 from plugwise/improve-6
Again more improvements
2 parents cb5a0ee + c45881f commit c6aa1aa

File tree

6 files changed

+197
-269
lines changed

6 files changed

+197
-269
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Changelog
22

3-
## Ongoing
3+
## V0.37.1
44

5-
- General improving of code
5+
- Further optimization / deduplication of the refactored code.
66

77
## v0.37.0
88

plugwise/common.py

Lines changed: 119 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,23 @@
44
"""
55
from __future__ import annotations
66

7+
from typing import cast
8+
79
from plugwise.constants import (
810
ANNA,
11+
SPECIAL_PLUG_TYPES,
912
SWITCH_GROUP_TYPES,
13+
ApplianceType,
1014
DeviceData,
1115
ModelData,
1216
SensorType,
1317
)
1418
from plugwise.util import (
19+
check_alternative_location,
1520
check_heater_central,
1621
check_model,
1722
get_vendor_name,
23+
power_data_local_format,
1824
return_valid,
1925
)
2026

@@ -103,6 +109,103 @@ def _appl_thermostat_info(self, appl: Munch, xml_1: etree, xml_2: etree = None)
103109

104110
return appl
105111

112+
def _collect_power_values(self, data: DeviceData, loc: Munch, tariff: str, legacy: bool = False) -> None:
113+
"""Something."""
114+
for loc.peak_select in ("nl_peak", "nl_offpeak"):
115+
loc.locator = (
116+
f'./{loc.log_type}[type="{loc.measurement}"]/period/'
117+
f'measurement[@{tariff}="{loc.peak_select}"]'
118+
)
119+
if legacy:
120+
loc.locator = (
121+
f"./{loc.meas_list[0]}_{loc.log_type}/measurement"
122+
f'[@directionality="{loc.meas_list[1]}"][@{tariff}="{loc.peak_select}"]'
123+
)
124+
125+
loc = self._power_data_peak_value(loc, legacy)
126+
if not loc.found:
127+
continue
128+
129+
data = self._power_data_energy_diff(
130+
loc.measurement, loc.net_string, loc.f_val, data
131+
)
132+
key = cast(SensorType, loc.key_string)
133+
data["sensors"][key] = loc.f_val
134+
135+
def _power_data_peak_value(self, loc: Munch, legacy: bool) -> Munch:
136+
"""Helper-function for _power_data_from_location() and _power_data_from_modules()."""
137+
loc.found = True
138+
if loc.logs.find(loc.locator) is None:
139+
loc = check_alternative_location(loc, legacy)
140+
if not loc.found:
141+
return loc
142+
143+
if (peak := loc.peak_select.split("_")[1]) == "offpeak":
144+
peak = "off_peak"
145+
log_found = loc.log_type.split("_")[0]
146+
loc.key_string = f"{loc.measurement}_{peak}_{log_found}"
147+
if "gas" in loc.measurement or loc.log_type == "point_meter":
148+
loc.key_string = f"{loc.measurement}_{log_found}"
149+
# Only for P1 Actual -------------------#
150+
if "phase" in loc.measurement:
151+
loc.key_string = f"{loc.measurement}"
152+
# --------------------------------------#
153+
loc.net_string = f"net_electricity_{log_found}"
154+
val = loc.logs.find(loc.locator).text
155+
loc.f_val = power_data_local_format(loc.attrs, loc.key_string, val)
156+
157+
return loc
158+
159+
def _power_data_energy_diff(
160+
self,
161+
measurement: str,
162+
net_string: SensorType,
163+
f_val: float | int,
164+
direct_data: DeviceData,
165+
) -> DeviceData:
166+
"""Calculate differential energy."""
167+
if (
168+
"electricity" in measurement
169+
and "phase" not in measurement
170+
and "interval" not in net_string
171+
):
172+
diff = 1
173+
if "produced" in measurement:
174+
diff = -1
175+
if net_string not in direct_data["sensors"]:
176+
tmp_val: float | int = 0
177+
else:
178+
tmp_val = direct_data["sensors"][net_string]
179+
180+
if isinstance(f_val, int):
181+
tmp_val += f_val * diff
182+
else:
183+
tmp_val += float(f_val * diff)
184+
tmp_val = float(f"{round(tmp_val, 3):.3f}")
185+
186+
direct_data["sensors"][net_string] = tmp_val
187+
188+
return direct_data
189+
190+
def _create_gw_devices(self, appl: Munch) -> None:
191+
"""Helper-function for creating/updating gw_devices."""
192+
self.gw_devices[appl.dev_id] = {"dev_class": appl.pwclass}
193+
self._count += 1
194+
for key, value in {
195+
"firmware": appl.firmware,
196+
"hardware": appl.hardware,
197+
"location": appl.location,
198+
"mac_address": appl.mac,
199+
"model": appl.model,
200+
"name": appl.name,
201+
"zigbee_mac_address": appl.zigbee_mac,
202+
"vendor": appl.vendor_name,
203+
}.items():
204+
if value is not None or key == "location":
205+
appl_key = cast(ApplianceType, key)
206+
self.gw_devices[appl.dev_id][appl_key] = value
207+
self._count += 1
208+
106209
def _device_data_switching_group(
107210
self, device: DeviceData, data: DeviceData
108211
) -> None:
@@ -154,6 +257,22 @@ def _get_group_switches(self) -> dict[str, DeviceData]:
154257

155258
return switch_groups
156259

260+
def _get_lock_state(self, xml: etree, data: DeviceData, stretch_v2: bool = False) -> None:
261+
"""Helper-function for _get_measurement_data().
262+
263+
Adam & Stretches: obtain the relay-switch lock state.
264+
"""
265+
actuator = "actuator_functionalities"
266+
func_type = "relay_functionality"
267+
if stretch_v2:
268+
actuator = "actuators"
269+
func_type = "relay"
270+
if xml.find("type").text not in SPECIAL_PLUG_TYPES:
271+
locator = f"./{actuator}/{func_type}/lock"
272+
if (found := xml.find(locator)) is not None:
273+
data["switches"]["lock"] = found.text == "true"
274+
self._count += 1
275+
157276
def _get_module_data(
158277
self,
159278
xml_1: etree,
@@ -208,34 +327,3 @@ def _get_zigbee_data(self, module: etree, model_data: ModelData, legacy: bool) -
208327
elif (zb_node := module.find("./protocols/zig_bee_node")) is not None:
209328
model_data["zigbee_mac_address"] = zb_node.find("mac_address").text
210329
model_data["reachable"] = zb_node.find("reachable").text == "true"
211-
212-
def _power_data_energy_diff(
213-
self,
214-
measurement: str,
215-
net_string: SensorType,
216-
f_val: float | int,
217-
direct_data: DeviceData,
218-
) -> DeviceData:
219-
"""Calculate differential energy."""
220-
if (
221-
"electricity" in measurement
222-
and "phase" not in measurement
223-
and "interval" not in net_string
224-
):
225-
diff = 1
226-
if "produced" in measurement:
227-
diff = -1
228-
if net_string not in direct_data["sensors"]:
229-
tmp_val: float | int = 0
230-
else:
231-
tmp_val = direct_data["sensors"][net_string]
232-
233-
if isinstance(f_val, int):
234-
tmp_val += f_val * diff
235-
else:
236-
tmp_val += float(f_val * diff)
237-
tmp_val = float(f"{round(tmp_val, 3):.3f}")
238-
239-
direct_data["sensors"][net_string] = tmp_val
240-
241-
return direct_data

plugwise/helper.py

Lines changed: 6 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@
2727
LOCATIONS,
2828
LOGGER,
2929
NONE,
30-
OBSOLETE_MEASUREMENTS,
3130
OFF,
3231
P1_MEASUREMENTS,
3332
SENSORS,
34-
SPECIAL_PLUG_TYPES,
3533
SPECIALS,
3634
SWITCHES,
3735
TEMP_CELSIUS,
@@ -41,7 +39,6 @@
4139
ActuatorData,
4240
ActuatorDataType,
4341
ActuatorType,
44-
ApplianceType,
4542
BinarySensorType,
4643
DeviceData,
4744
GatewayData,
@@ -61,7 +58,7 @@
6158
check_model,
6259
escape_illegal_xml_characters,
6360
format_measure,
64-
power_data_local_format,
61+
skip_obsolete_measurements,
6562
)
6663

6764
# This way of importing aiohttp is because of patch/mocking in testing (aiohttp timeouts)
@@ -294,22 +291,7 @@ def _all_appliances(self) -> None:
294291
if appl.pwclass in THERMOSTAT_CLASSES and appl.location is None:
295292
continue
296293

297-
self.gw_devices[appl.dev_id] = {"dev_class": appl.pwclass}
298-
self._count += 1
299-
for key, value in {
300-
"firmware": appl.firmware,
301-
"hardware": appl.hardware,
302-
"location": appl.location,
303-
"mac_address": appl.mac,
304-
"model": appl.model,
305-
"name": appl.name,
306-
"zigbee_mac_address": appl.zigbee_mac,
307-
"vendor": appl.vendor_name,
308-
}.items():
309-
if value is not None or key == "location":
310-
appl_key = cast(ApplianceType, key)
311-
self.gw_devices[appl.dev_id][appl_key] = value
312-
self._count += 1
294+
self._create_gw_devices(appl)
313295

314296
# For P1 collect the connected SmartMeter info
315297
if self.smile_type == "power":
@@ -356,23 +338,7 @@ def _p1_smartmeter_info_finder(self, appl: Munch) -> None:
356338
location = self._domain_objects.find(f'./location[@id="{loc_id}"]')
357339
appl = self._energy_device_info_finder(appl, location)
358340

359-
self.gw_devices[appl.dev_id] = {"dev_class": appl.pwclass}
360-
self._count += 1
361-
362-
for key, value in {
363-
"firmware": appl.firmware,
364-
"hardware": appl.hardware,
365-
"location": appl.location,
366-
"mac_address": appl.mac,
367-
"model": appl.model,
368-
"name": appl.name,
369-
"zigbee_mac_address": appl.zigbee_mac,
370-
"vendor": appl.vendor_name,
371-
}.items():
372-
if value is not None or key == "location":
373-
p1_key = cast(ApplianceType, key)
374-
self.gw_devices[appl.dev_id][p1_key] = value
375-
self._count += 1
341+
self._create_gw_devices(appl)
376342

377343
def _appliance_info_finder(self, appl: Munch, appliance: etree) -> Munch:
378344
"""Collect device info (Smile/Stretch, Thermostats, OpenTherm/On-Off): firmware, model and vendor name."""
@@ -591,68 +557,17 @@ def _power_data_from_location(self, loc_id: str) -> DeviceData:
591557
direct_data: DeviceData = {"sensors": {}}
592558
loc = Munch()
593559
log_list: list[str] = ["point_log", "cumulative_log", "interval_log"]
594-
peak_list: list[str] = ["nl_peak", "nl_offpeak"]
595560
t_string = "tariff"
596561

597562
search = self._domain_objects
598563
loc.logs = search.find(f'./location[@id="{loc_id}"]/logs')
599564
for loc.measurement, loc.attrs in P1_MEASUREMENTS.items():
600565
for loc.log_type in log_list:
601-
for loc.peak_select in peak_list:
602-
loc.locator = (
603-
f'./{loc.log_type}[type="{loc.measurement}"]/period/'
604-
f'measurement[@{t_string}="{loc.peak_select}"]'
605-
)
606-
loc = self._power_data_peak_value(loc)
607-
if not loc.found:
608-
continue
609-
610-
direct_data = self._power_data_energy_diff(
611-
loc.measurement, loc.net_string, loc.f_val, direct_data
612-
)
613-
key = cast(SensorType, loc.key_string)
614-
direct_data["sensors"][key] = loc.f_val
566+
self._collect_power_values(direct_data, loc, t_string)
615567

616568
self._count += len(direct_data["sensors"])
617569
return direct_data
618570

619-
def _power_data_peak_value(self, loc: Munch) -> Munch:
620-
"""Helper-function for _power_data_from_location() and _power_data_from_modules()."""
621-
loc.found = True
622-
# If locator not found look for P1 gas_consumed or phase data (without tariff)
623-
if loc.logs.find(loc.locator) is None:
624-
if "log" in loc.log_type and (
625-
"gas" in loc.measurement or "phase" in loc.measurement
626-
):
627-
# Avoid double processing by skipping one peak-list option
628-
if loc.peak_select == "nl_offpeak":
629-
loc.found = False
630-
return loc
631-
632-
loc.locator = (
633-
f'./{loc.log_type}[type="{loc.measurement}"]/period/measurement'
634-
)
635-
if loc.logs.find(loc.locator) is None:
636-
loc.found = False
637-
return loc
638-
else:
639-
loc.found = False # pragma: no cover
640-
return loc # pragma: no cover
641-
642-
if (peak := loc.peak_select.split("_")[1]) == "offpeak":
643-
peak = "off_peak"
644-
log_found = loc.log_type.split("_")[0]
645-
loc.key_string = f"{loc.measurement}_{peak}_{log_found}"
646-
if "gas" in loc.measurement or loc.log_type == "point_meter":
647-
loc.key_string = f"{loc.measurement}_{log_found}"
648-
if "phase" in loc.measurement:
649-
loc.key_string = f"{loc.measurement}"
650-
loc.net_string = f"net_electricity_{log_found}"
651-
val = loc.logs.find(loc.locator).text
652-
loc.f_val = power_data_local_format(loc.attrs, loc.key_string, val)
653-
654-
return loc
655-
656571
def _appliance_measurements(
657572
self,
658573
appliance: etree,
@@ -663,20 +578,8 @@ def _appliance_measurements(
663578
for measurement, attrs in measurements.items():
664579
p_locator = f'.//logs/point_log[type="{measurement}"]/period/measurement'
665580
if (appl_p_loc := appliance.find(p_locator)) is not None:
666-
# Skip known obsolete measurements
667-
updated_date_locator = (
668-
f'.//logs/point_log[type="{measurement}"]/updated_date'
669-
)
670-
if (
671-
measurement in OBSOLETE_MEASUREMENTS
672-
and (updated_date_key := appliance.find(updated_date_locator))
673-
is not None
674-
):
675-
updated_date = updated_date_key.text.split("T")[0]
676-
date_1 = dt.datetime.strptime(updated_date, "%Y-%m-%d")
677-
date_2 = dt.datetime.now()
678-
if int((date_2 - date_1).days) > 7:
679-
continue
581+
if skip_obsolete_measurements(appliance, measurement):
582+
continue
680583

681584
if new_name := getattr(attrs, ATTR_NAME, None):
682585
measurement = new_name
@@ -719,19 +622,6 @@ def _appliance_measurements(
719622
# Don't count the above top-level dicts, only the remaining single items
720623
self._count += len(data) - 3
721624

722-
def _get_lock_state(self, xml: etree, data: DeviceData) -> None:
723-
"""Helper-function for _get_measurement_data().
724-
725-
Adam & Stretches: obtain the relay-switch lock state.
726-
"""
727-
actuator = "actuator_functionalities"
728-
func_type = "relay_functionality"
729-
if xml.find("type").text not in SPECIAL_PLUG_TYPES:
730-
locator = f"./{actuator}/{func_type}/lock"
731-
if (found := xml.find(locator)) is not None:
732-
data["switches"]["lock"] = found.text == "true"
733-
self._count += 1
734-
735625
def _get_toggle_state(
736626
self, xml: etree, toggle: str, name: ToggleNameType, data: DeviceData
737627
) -> None:

0 commit comments

Comments
 (0)