Skip to content

Commit 7305f20

Browse files
authored
Merge pull request #3 from DiUS/feature/zeroconf-discovery-background-task
All the hard work to resolve the issues around device discovery and disappearance.
2 parents 13c9cd8 + 7c40e17 commit 7305f20

12 files changed

+851
-310
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import asyncio
2+
3+
class AsyncSet:
4+
"""Thread/async-safe set."""
5+
6+
def __init__(self):
7+
self._items = set()
8+
self._lock = asyncio.Lock()
9+
10+
async def add(self, item):
11+
"""Add item to set."""
12+
async with self._lock:
13+
self._items.add(item)
14+
15+
async def discard(self, item):
16+
"""Remove item if present."""
17+
async with self._lock:
18+
self._items.discard(item)
19+
20+
async def remove(self, item):
21+
"""Remove item, raise KeyError if not present."""
22+
async with self._lock:
23+
self._items.remove(item)
24+
25+
async def pop(self):
26+
"""Remove and return arbitrary item."""
27+
async with self._lock:
28+
return self._items.pop()
29+
30+
async def clear(self):
31+
"""Remove all items."""
32+
async with self._lock:
33+
self._items.clear()
34+
35+
async def copy(self):
36+
"""Return a copy of the set."""
37+
async with self._lock:
38+
return self._items.copy()
39+
40+
def __contains__(self, item):
41+
"""Check if item in set."""
42+
return item in self._items
43+
44+
def __len__(self):
45+
"""Get size of set."""
46+
return len(self._items)
47+
48+
def __bool__(self):
49+
"""Check if set is empty in pythonic way"""
50+
return bool(self._items)
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from homeassistant.core import HomeAssistant
5+
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
6+
from homeassistant.helpers.dispatcher import async_dispatcher_send
7+
import homeassistant.components.zeroconf
8+
9+
from .const import DOMAIN
10+
11+
import logging
12+
_LOGGER = logging.getLogger(__name__)
13+
14+
15+
class PowersensorServiceListener(ServiceListener):
16+
def __init__(self, hass: HomeAssistant, debounce_timeout: float = 60):
17+
self._hass = hass
18+
self._plugs = {}
19+
self._discoveries = {}
20+
self._pending_removals = {}
21+
self._debounce_seconds = debounce_timeout
22+
23+
def add_service(self, zc, type_, name):
24+
self.cancel_any_pending_removal(name, "request to add")
25+
info = self.__add_plug(zc, type_, name)
26+
if info:
27+
asyncio.run_coroutine_threadsafe(
28+
self._async_service_add(self._plugs[name]),
29+
self._hass.loop
30+
)
31+
32+
async def _async_service_add(self, *args):
33+
async_dispatcher_send(self._hass, f"{DOMAIN}_zeroconf_add_plug", *args)
34+
35+
36+
async def _async_delayed_remove(self, name):
37+
"""Actually process the removal after delay."""
38+
try:
39+
await asyncio.sleep(self._debounce_seconds)
40+
_LOGGER.info(f"Request to remove service {name} still pending after timeout. Processing remove request...")
41+
if name in self._plugs:
42+
data = self._plugs[name].copy()
43+
del self._plugs[name]
44+
else:
45+
data = None
46+
asyncio.run_coroutine_threadsafe(
47+
self._async_service_remove(name, data),
48+
self._hass.loop
49+
)
50+
except asyncio.CancelledError:
51+
# Task was cancelled because service came back
52+
_LOGGER.info(f"Request to remove service {name} was canceled by request to update or add plug.")
53+
raise
54+
finally:
55+
# Either way were done with this task
56+
self._pending_removals.pop(name, None)
57+
58+
59+
def remove_service(self, zc, type_, name):
60+
if name in self._pending_removals:
61+
# removal for this service is already pending
62+
return
63+
64+
_LOGGER.info(f"Scheduling removal for {name}")
65+
self._pending_removals[name] = asyncio.run_coroutine_threadsafe(
66+
self._async_delayed_remove(name),
67+
self._hass.loop
68+
)
69+
70+
async def _async_service_remove(self, *args):
71+
async_dispatcher_send(self._hass, f"{DOMAIN}_zeroconf_remove_plug", *args)
72+
73+
def update_service(self, zc, type_, name):
74+
self.cancel_any_pending_removal(name, "request to update")
75+
info = self.__add_plug(zc, type_, name)
76+
if info:
77+
asyncio.run_coroutine_threadsafe(
78+
self._async_service_update( self._plugs[name]),
79+
self._hass.loop
80+
)
81+
82+
async def _async_service_update(self, *args):
83+
# remove from pending tasks if update received
84+
async_dispatcher_send(self._hass, f"{DOMAIN}_zeroconf_update_plug", *args)
85+
86+
async def _async_get_service_info(self, zc, type_, name):
87+
try:
88+
info = await zc.async_get_service_info(type_, name, timeout=3000)
89+
self._discoveries[name] = info
90+
except Exception as e:
91+
_LOGGER.error(f"Error retrieving info for {name}")
92+
93+
94+
def __add_plug(self, zc, type_, name):
95+
info = zc.get_service_info(type_, name)
96+
97+
if info:
98+
self._plugs[name] = {'type': type_,
99+
'name': name,
100+
'addresses': ['.'.join(str(b) for b in addr) for addr in info.addresses],
101+
'port': info.port,
102+
'server': info.server,
103+
'properties': info.properties
104+
}
105+
return info
106+
107+
def cancel_any_pending_removal(self, name, source):
108+
task = self._pending_removals.pop(name, None)
109+
if task:
110+
task.cancel()
111+
_LOGGER.info(f"Cancelled pending removal for {name} by {source}.")
112+
113+
class PowersensorDiscoveryService:
114+
def __init__(self, hass: HomeAssistant, service_type: str = "_powersensor._tcp.local."):
115+
self._hass = hass
116+
self.service_type = service_type
117+
118+
self.zc: Optional[Zeroconf] = None
119+
self.listener: Optional[PowersensorServiceListener] = None
120+
self.browser: Optional[ServiceBrowser] = None
121+
self.running = False
122+
self._task: Optional[asyncio.Task] = None
123+
124+
async def start(self):
125+
"""Start the mDNS discovery service"""
126+
if self.running:
127+
return
128+
129+
self.running = True
130+
self.zc = await homeassistant.components.zeroconf.async_get_instance(self._hass)
131+
self.listener = PowersensorServiceListener(self._hass)
132+
133+
# Create browser
134+
self.browser = ServiceBrowser(self.zc, self.service_type, self.listener)
135+
136+
# Start the background task
137+
self._task = asyncio.create_task(self._run())
138+
139+
async def _run(self):
140+
"""Background task that keeps the service alive"""
141+
try:
142+
while self.running:
143+
await asyncio.sleep(1)
144+
except asyncio.CancelledError:
145+
pass
146+
147+
async def stop(self):
148+
"""Stop the mDNS discovery service"""
149+
self.running = False
150+
151+
if self._task:
152+
self._task.cancel()
153+
try:
154+
await self._task
155+
except asyncio.CancelledError:
156+
pass
157+
158+
if self.zc:
159+
# self.zc.close()
160+
self.zc = None
161+
162+
self.browser = None
163+
self.listener = None
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
from datetime import timedelta
2+
from typing import Callable, Union
3+
4+
from homeassistant.components.sensor import SensorEntity
5+
from homeassistant.core import HomeAssistant, callback
6+
from homeassistant.helpers.device_registry import DeviceInfo
7+
from homeassistant.helpers import device_registry as dr
8+
from homeassistant.helpers import entity_registry as er
9+
from homeassistant.helpers.dispatcher import async_dispatcher_connect
10+
from homeassistant.helpers.event import async_track_point_in_utc_time
11+
from homeassistant.util.dt import utcnow
12+
from .PlugMeasurements import PlugMeasurements
13+
from .SensorMeasurements import SensorMeasurements
14+
from .const import DOMAIN, POWER_SENSOR_UPDATE_SIGNAL
15+
16+
import logging
17+
_LOGGER = logging.getLogger(__name__)
18+
19+
class PowersensorEntity(SensorEntity):
20+
"""Powersensor Plug Class--designed to handle all measurements of the plug--perhaps less expressive"""
21+
def __init__(self, hass: HomeAssistant, mac : str,
22+
input_config: dict[Union[SensorMeasurements|PlugMeasurements], dict],
23+
measurement_type: SensorMeasurements|PlugMeasurements, timeout_seconds: int = 60):
24+
"""Initialize the sensor."""
25+
self.role = None
26+
self._has_recently_received_update_message = False
27+
self._attr_native_value = 0.0
28+
self._hass = hass
29+
self._mac = mac
30+
self._model = f"PowersensorDevice"
31+
self._device_name = f'Powersensor Device (ID: {self._mac})'
32+
self._measurement_name= None
33+
self._remove_unavailability_tracker = None
34+
self._timeout = timedelta(seconds=timeout_seconds) # Adjust as needed
35+
36+
self.measurement_type = measurement_type
37+
config = input_config[measurement_type]
38+
self._attr_unique_id = f"powersensor_{mac}_{measurement_type}"
39+
self._attr_device_class = config["device_class"]
40+
self._attr_native_unit_of_measurement = config["unit"]
41+
self._attr_device_info = self.device_info
42+
self._attr_suggested_display_precision = config["precision"]
43+
self._signal = f"{POWER_SENSOR_UPDATE_SIGNAL}_{self._mac}_{config['event']}"
44+
if 'state_class' in config.keys():
45+
self._attr_state_class = config['state_class']
46+
self._message_key = config.get('message_key', None)
47+
self._message_callback = config.get('callback', None)
48+
49+
@property
50+
def device_info(self) -> DeviceInfo:
51+
raise NotImplementedError
52+
53+
@property
54+
def available(self) -> bool:
55+
"""Does data exist for this sensor type"""
56+
return self._has_recently_received_update_message
57+
58+
def _schedule_unavailable(self):
59+
"""Schedule entity to become unavailable."""
60+
if self._remove_unavailability_tracker:
61+
self._remove_unavailability_tracker()
62+
63+
self._remove_unavailability_tracker = async_track_point_in_utc_time(
64+
self.hass,
65+
self._async_make_unavailable,
66+
utcnow() + self._timeout
67+
)
68+
69+
async def _async_make_unavailable(self, _now):
70+
"""Mark entity as unavailable."""
71+
self._has_recently_received_update_message = False
72+
self.async_write_ha_state()
73+
74+
async def async_added_to_hass(self) -> None:
75+
"""Subscribe to messages when added to home assistant"""
76+
self._has_recently_received_update_message = False
77+
self.async_on_remove(async_dispatcher_connect(
78+
self._hass,
79+
self._signal,
80+
self._handle_update
81+
))
82+
83+
async def async_will_remove_from_hass(self):
84+
"""Clean up."""
85+
if self._remove_unavailability_tracker:
86+
self._remove_unavailability_tracker()
87+
88+
def _rename_based_on_role(self):
89+
return False
90+
91+
@callback
92+
def _handle_update(self, event, message):
93+
"""handle pushed data."""
94+
95+
# event is not presently used, but is passed to maintain flexibility for future development
96+
97+
name_updated = False
98+
self._has_recently_received_update_message = True
99+
if not self.role:
100+
if 'role' in message.keys():
101+
self.role = message['role']
102+
name_updated = self._rename_based_on_role()
103+
104+
105+
if self._message_key in message.keys():
106+
if self._message_callback:
107+
self._attr_native_value = self._message_callback( message[self._message_key])
108+
else:
109+
self._attr_native_value = message[self._message_key]
110+
self._schedule_unavailable()
111+
112+
if name_updated:
113+
device_registry = dr.async_get(self.hass)
114+
device = device_registry.async_get_device(
115+
identifiers={(DOMAIN, self._mac)}
116+
)
117+
118+
if device and device.name != self._device_name:
119+
# Update the device name
120+
device_registry.async_update_device(
121+
device.id,
122+
name=self._device_name
123+
)
124+
125+
entity_registry = er.async_get(self.hass)
126+
entity_registry.async_update_entity(
127+
self.entity_id,
128+
name = self._attr_name
129+
)
130+
self.async_write_ha_state()
131+

custom_components/powersensor/PowersensorHouseholdEntity.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,19 @@ class HouseholdMeasurements(Enum):
2222
ENERGY_TO_GRID = 7
2323
ENERGY_SOLAR_GENERATION = 8
2424

25+
ConsumptionMeasurements = [
26+
HouseholdMeasurements.POWER_HOME_USE,
27+
HouseholdMeasurements.POWER_FROM_GRID,
28+
HouseholdMeasurements.ENERGY_HOME_USE,
29+
HouseholdMeasurements.ENERGY_FROM_GRID
30+
]
31+
ProductionMeasurements = [
32+
HouseholdMeasurements.POWER_TO_GRID,
33+
HouseholdMeasurements.POWER_SOLAR_GENERATION,
34+
HouseholdMeasurements.ENERGY_TO_GRID,
35+
HouseholdMeasurements.ENERGY_SOLAR_GENERATION
36+
]
37+
2538
@dataclass
2639
class EntityConfig:
2740
name : str

0 commit comments

Comments
 (0)