Skip to content

Commit c3ae55b

Browse files
committed
Smile.py: import _find and _findall and implement
1 parent 8dd654a commit c3ae55b

File tree

1 file changed

+53
-51
lines changed

1 file changed

+53
-51
lines changed

plugwise/smile.py

Lines changed: 53 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
ResponseError,
4646
UnsupportedDeviceError,
4747
)
48-
from .helper import SmileComm, SmileHelper, update_helper
48+
from .helper import SmileComm, SmileHelper, _find, _findall, update_helper
4949

5050

5151
class SmileData(SmileHelper):
@@ -129,11 +129,11 @@ def get_all_devices(self) -> None:
129129
# Start by determining the system capabilities:
130130
# Find the connected heating/cooling device (heater_central), e.g. heat-pump or gas-fired heater
131131
if self.smile_type == "thermostat":
132-
onoff_boiler: etree = self._domain_objects.find(
133-
"./module/protocols/onoff_boiler"
132+
onoff_boiler: etree = _find(
133+
self._domain_objects, "./module/protocols/onoff_boiler"
134134
)
135-
open_therm_boiler: etree = self._domain_objects.find(
136-
"./module/protocols/open_therm_boiler"
135+
open_therm_boiler: etree = _find(
136+
self._domain_objects, "./module/protocols/open_therm_boiler"
137137
)
138138
self._on_off_device = onoff_boiler is not None
139139
self._opentherm_device = open_therm_boiler is not None
@@ -143,10 +143,10 @@ def get_all_devices(self) -> None:
143143
locator_2 = "./gateway/features/elga_support"
144144
search = self._domain_objects
145145
self._cooling_present = False
146-
if search.find(locator_1) is not None:
146+
if _find(search, locator_1) is not None:
147147
self._cooling_present = True
148148
# Alternative method for the Anna with Elga
149-
elif search.find(locator_2) is not None:
149+
elif _find(search, locator_2) is not None:
150150
self._cooling_present = True
151151
self._elga = True
152152

@@ -351,12 +351,12 @@ def __init__(
351351
async def connect(self) -> bool:
352352
"""Connect to Plugwise device and determine its name, type and version."""
353353
result = await self._request(DOMAIN_OBJECTS)
354-
vendor_names: list[etree] = result.findall("./module/vendor_name")
355-
vendor_models: list[etree] = result.findall("./module/vendor_model")
354+
vendor_names: list[etree] = _findall(result, "./module/vendor_name")
355+
vendor_models: list[etree] = _findall(result, "./module/vendor_model")
356356
# Work-around for Stretch fv 2.7.18
357357
if not vendor_names:
358358
result = await self._request(MODULES)
359-
vendor_names = result.findall("./module/vendor_name")
359+
vendor_names = _findall(result, "./module/vendor_name")
360360

361361
names: list[str] = []
362362
models: list[str] = []
@@ -365,7 +365,7 @@ async def connect(self) -> bool:
365365
for model in vendor_models:
366366
models.append(model.text)
367367

368-
dsmrmain = result.find("./module/protocols/dsmrmain")
368+
dsmrmain = _find(result, "./module/protocols/dsmrmain")
369369
if "Plugwise" not in names and dsmrmain is None: # pragma: no cover
370370
LOGGER.error(
371371
"Connected but expected text not returned, we got %s. Please create \
@@ -392,40 +392,42 @@ async def connect(self) -> bool:
392392
async def _smile_detect_legacy(self, result: etree, dsmrmain: etree) -> str:
393393
"""Helper-function for _smile_detect()."""
394394
# Stretch: find the MAC of the zigbee master_controller (= Stick)
395-
if network := result.find("./module/protocols/master_controller"):
396-
self.smile_zigbee_mac_address = network.find("mac_address").text
395+
if network := _find(result, "./module/protocols/master_controller"):
396+
self.smile_zigbee_mac_address = _find(network, "mac_address").text
397397
# Find the active MAC in case there is an orphaned Stick
398-
if zb_networks := result.findall("./network"):
398+
if zb_networks := _findall(result, "./network"):
399399
for zb_network in zb_networks:
400-
if zb_network.find("./nodes/network_router"):
401-
network = zb_network.find("./master_controller")
402-
self.smile_zigbee_mac_address = network.find("mac_address").text
400+
if _find(zb_network, "./nodes/network_router"):
401+
network = _find(zb_network, "./master_controller")
402+
self.smile_zigbee_mac_address = _find(network, "mac_address").text
403403

404404
# Assume legacy
405405
self._smile_legacy = True
406406
# Try if it is a legacy Anna, assuming appliance thermostat,
407407
# fake insert version assuming Anna, couldn't find another way to identify as legacy Anna
408408
self.smile_fw_version = "1.8.0"
409409
model = "smile_thermo"
410-
if result.find('./appliance[type="thermostat"]') is None:
410+
if _find(result, './appliance[type="thermostat"]') is None:
411411
# It's a P1 legacy:
412412
if dsmrmain is not None:
413413
self._status = await self._request(STATUS)
414-
self.smile_fw_version = self._status.find("./system/version").text
415-
model = self._status.find("./system/product").text
416-
self.smile_hostname = self._status.find("./network/hostname").text
417-
self.smile_mac_address = self._status.find("./network/mac_address").text
414+
self.smile_fw_version = _find(self._status, "./system/version").text
415+
model = _find(self._status, "./system/product").text
416+
self.smile_hostname = _find(self._status, "./network/hostname").text
417+
self.smile_mac_address = _find(
418+
self._status, "./network/mac_address"
419+
).text
418420

419421
# Or a legacy Stretch:
420422
elif network is not None:
421423
self._system = await self._request(SYSTEM)
422-
self.smile_fw_version = self._system.find("./gateway/firmware").text
423-
model = self._system.find("./gateway/product").text
424-
self.smile_hostname = self._system.find("./gateway/hostname").text
424+
self.smile_fw_version = _find(self._system, "./gateway/firmware").text
425+
model = _find(self._system, "./gateway/product").text
426+
self.smile_hostname = _find(self._system, "./gateway/hostname").text
425427
# If wlan0 contains data it's active, so eth0 should be checked last
426428
for network in ("wlan0", "eth0"):
427429
locator = f"./{network}/mac"
428-
if (net_locator := self._system.find(locator)) is not None:
430+
if (net_locator := _find(self._system, locator)) is not None:
429431
self.smile_mac_address = net_locator.text
430432

431433
else: # pragma: no cover
@@ -443,12 +445,12 @@ async def _smile_detect(self, result: etree, dsmrmain: etree) -> None:
443445
Detect which type of Smile is connected.
444446
"""
445447
model: str | None = None
446-
if (gateway := result.find("./gateway")) is not None:
447-
model = gateway.find("vendor_model").text
448-
self.smile_fw_version = gateway.find("firmware_version").text
449-
self.smile_hw_version = gateway.find("hardware_version").text
450-
self.smile_hostname = gateway.find("hostname").text
451-
self.smile_mac_address = gateway.find("mac_address").text
448+
if (gateway := _find(result, "./gateway")) is not None:
449+
model = _find(gateway, "vendor_model").text
450+
self.smile_fw_version = _find(gateway, "firmware_version").text
451+
self.smile_hw_version = _find(gateway, "hardware_version").text
452+
self.smile_hostname = _find(gateway, "hostname").text
453+
self.smile_mac_address = _find(gateway, "mac_address").text
452454
else:
453455
model = await self._smile_detect_legacy(result, dsmrmain)
454456

@@ -503,11 +505,11 @@ async def _update_domain_objects(self) -> None:
503505

504506
# If Plugwise notifications present:
505507
self._notifications = {}
506-
for notification in self._domain_objects.findall("./notification"):
508+
for notification in _findall(self._domain_objects, "./notification"):
507509
try:
508510
msg_id = notification.attrib["id"]
509-
msg_type = notification.find("type").text
510-
msg = notification.find("message").text
511+
msg_type = _find(notification, "type").text
512+
msg = _find(notification, "message").text
511513
self._notifications.update({msg_id: {msg_type: msg}})
512514
LOGGER.debug("Plugwise notifications: %s", self._notifications)
513515
except AttributeError: # pragma: no cover
@@ -563,8 +565,8 @@ async def _set_schedule_state_legacy(
563565
) -> None:
564566
"""Helper-function for set_schedule_state()."""
565567
schedule_rule_id: str | None = None
566-
for rule in self._domain_objects.findall("rule"):
567-
if rule.find("name").text == name:
568+
for rule in _findall(self._domain_objects, "rule"):
569+
if _find(rule, "name").text == name:
568570
schedule_rule_id = rule.attrib["id"]
569571

570572
if schedule_rule_id is None:
@@ -578,7 +580,7 @@ async def _set_schedule_state_legacy(
578580
return
579581

580582
locator = f'.//*[@id="{schedule_rule_id}"]/template'
581-
for rule in self._domain_objects.findall(locator):
583+
for rule in _findall(self._domain_objects, locator):
582584
template_id = rule.attrib["id"]
583585

584586
uri = f"{RULES};id={schedule_rule_id}"
@@ -626,13 +628,13 @@ async def set_schedule_state(
626628
)
627629
if self.smile_name != "Adam":
628630
locator = f'.//*[@id="{schedule_rule_id}"]/template'
629-
template_id = self._domain_objects.find(locator).attrib["id"]
631+
template_id = _find(self._domain_objects, locator).attrib["id"]
630632
template = f'<template id="{template_id}" />'
631633

632634
locator = f'.//*[@id="{schedule_rule_id}"]/contexts'
633-
contexts = self._domain_objects.find(locator)
635+
contexts = _find(self._domain_objects, locator)
634636
locator = f'.//*[@id="{loc_id}"].../...'
635-
if (subject := contexts.find(locator)) is None:
637+
if (subject := _find(contexts, locator)) is None:
636638
subject = f'<context><zone><location id="{loc_id}" /></zone></context>'
637639
subject = etree.fromstring(subject)
638640

@@ -655,7 +657,7 @@ async def set_schedule_state(
655657
async def _set_preset_legacy(self, preset: str) -> None:
656658
"""Set the given Preset on the relevant Thermostat - from DOMAIN_OBJECTS."""
657659
locator = f'rule/directives/when/then[@icon="{preset}"].../.../...'
658-
rule = self._domain_objects.find(locator)
660+
rule = _find(self._domain_objects, locator)
659661
data = f'<rules><rule id="{rule.attrib["id"]}"><active>true</active></rule></rules>'
660662

661663
await self._request(RULES, method="put", data=data)
@@ -671,9 +673,9 @@ async def set_preset(self, loc_id: str, preset: str) -> None:
671673
await self._set_preset_legacy(preset)
672674
return
673675

674-
current_location = self._locations.find(f'location[@id="{loc_id}"]')
675-
location_name = current_location.find("name").text
676-
location_type = current_location.find("type").text
676+
current_location = _find(self._locations, f'location[@id="{loc_id}"]')
677+
location_name = _find(current_location, "name").text
678+
location_type = _find(current_location, "type").text
677679

678680
uri = f"{LOCATIONS};id={loc_id}"
679681
data = (
@@ -715,9 +717,9 @@ async def set_number_setpoint(self, key: str, temperature: float) -> None:
715717
temp = str(temperature)
716718
thermostat_id: str | None = None
717719
locator = f'appliance[@id="{self._heater_id}"]/actuator_functionalities/thermostat_functionality'
718-
if th_func_list := self._appliances.findall(locator):
720+
if th_func_list := _findall(self._appliances, locator):
719721
for th_func in th_func_list:
720-
if th_func.find("type").text == key:
722+
if _find(th_func, "type").text == key:
721723
thermostat_id = th_func.attrib["id"]
722724

723725
if thermostat_id is None:
@@ -735,7 +737,7 @@ async def _set_groupswitch_member_state(
735737
"""
736738
for member in members:
737739
locator = f'appliance[@id="{member}"]/{switch.actuator}/{switch.func_type}'
738-
switch_id = self._appliances.find(locator).attrib["id"]
740+
switch_id = _find(self._appliances, locator).attrib["id"]
739741
uri = f"{APPLIANCES};id={member}/{switch.device};id={switch_id}"
740742
if self._stretch_v2:
741743
uri = f"{APPLIANCES};id={member}/{switch.device}"
@@ -774,9 +776,9 @@ async def set_switch_state(
774776
return await self._set_groupswitch_member_state(members, state, switch)
775777

776778
locator = f'appliance[@id="{appl_id}"]/{switch.actuator}/{switch.func_type}'
777-
found: list[etree] = self._appliances.findall(locator)
779+
found: list[etree] = _findall(self._appliances, locator)
778780
for item in found:
779-
if (sw_type := item.find("type")) is not None:
781+
if (sw_type := _find(item, "type")) is not None:
780782
if sw_type.text == switch.act_type:
781783
switch_id = item.attrib["id"]
782784
else:
@@ -793,7 +795,7 @@ async def set_switch_state(
793795
f'appliance[@id="{appl_id}"]/{switch.actuator}/{switch.func_type}/lock'
794796
)
795797
# Don't bother switching a relay when the corresponding lock-state is true
796-
if self._appliances.find(locator).text == "true":
798+
if _find(self._appliances, locator).text == "true":
797799
raise PlugwiseError("Plugwise: the locked Relay was not switched.")
798800

799801
await self._request(uri, method="put", data=data)

0 commit comments

Comments
 (0)