1+ import asyncio
12import logging
23
34from homeassistant .core import HomeAssistant , callback
45from homeassistant .helpers .dispatcher import async_dispatcher_send , async_dispatcher_connect
56
67from powersensor_local import PlugApi , VirtualHousehold
78
8- from custom_components .powersensor .const import POWER_SENSOR_UPDATE_SIGNAL , DOMAIN
9+ from custom_components .powersensor .AsyncSet import AsyncSet
10+ from custom_components .powersensor .const import POWER_SENSOR_UPDATE_SIGNAL , DOMAIN , DEFAULT_PORT
911
1012_LOGGER = logging .getLogger (__name__ )
1113class PowersensorMessageDispatcher :
1214 def __init__ (self , hass : HomeAssistant , vhh : VirtualHousehold ):
1315 self ._hass = hass
1416 self ._vhh = vhh
1517 self .plugs = dict ()
18+ self ._known_plugs = set ()
1619 self .sensors = dict ()
1720 self .on_start_sensor_queue = dict ()
1821 self ._unsubscribe_from_signals = [
@@ -27,15 +30,75 @@ def __init__(self, hass: HomeAssistant, vhh: VirtualHousehold):
2730 self ._plug_updated ),
2831 async_dispatcher_connect (self ._hass ,
2932 f"{ DOMAIN } _zeroconf_remove_plug" ,
30- self ._plug_remove )
33+ self ._plug_remove ),
34+ async_dispatcher_connect (self ._hass ,
35+ f"{ DOMAIN } _plug_added_to_homeassistant" ,
36+ self ._acknowledge_plug_added_to_homeassistant ),
3137 ]
3238
33-
34- def add_api (self , mac , network_info ):
35-
36- _LOGGER .info (f"Adding API for mac={ network_info ['mac' ]} , ip={ network_info ['host' ]} , port={ network_info ['port' ]} " )
37- api = PlugApi (mac = network_info ['mac' ], ip = network_info ['host' ], port = network_info ['port' ])
38- self .plugs [mac ] = api
39+ self ._monitor_add_plug_queue = None
40+ self ._stop_task = False
41+ self ._plug_added_queue = AsyncSet ()
42+ self ._safe_to_process_plug_queue = False
43+
44+ async def enqueue_plug_for_adding (self , network_info : dict ):
45+ _LOGGER .debug (f"Adding to plug processing queue: { network_info } " )
46+ await self ._plug_added_queue .add ((network_info ['mac' ], network_info ['host' ], network_info ['port' ]))
47+
48+ async def process_plug_queue (self ):
49+ """Start the background task if not already running."""
50+ self ._safe_to_process_plug_queue = True
51+ if self ._monitor_add_plug_queue is None or self ._monitor_add_plug_queue .done ():
52+ self ._stop_task = False
53+ self ._monitor_add_plug_queue = self ._hass .async_create_background_task (self ._monitor_plug_queue (), name = "plug_queue_monitor" )
54+ _LOGGER .debug ("Background task started" )
55+
56+ def _plug_has_been_seen (self , mac_address )-> bool :
57+ return mac_address in self .plugs or mac_address in self ._known_plugs
58+
59+ async def _monitor_plug_queue (self ):
60+ """The actual background task loop."""
61+ try :
62+ while not self ._stop_task and self ._plug_added_queue :
63+ queue_snapshot = await self ._plug_added_queue .copy ()
64+ for mac_address , host , port in queue_snapshot :
65+ if not self ._plug_has_been_seen (mac_address ):
66+ async_dispatcher_send (self ._hass , f"{ DOMAIN } _create_plug" ,
67+ mac_address , host , port )
68+ else :
69+ _LOGGER .debug (f"Plug: { mac_address } has already been created as an entity in Home Assistant."
70+ f" Skipping and flushing from queue." )
71+ await self ._plug_added_queue .remove ((mac_address , host , port ))
72+
73+
74+ await asyncio .sleep (5 )
75+ _LOGGER .debug ("Plug queue has been processed!" )
76+
77+ except asyncio .CancelledError :
78+ _LOGGER .debug ("Plug queue processing cancelled" )
79+ raise
80+ except Exception as e :
81+ _LOGGER .error (f"Error in Plug queue processing task: { e } " )
82+ finally :
83+ self ._monitor_add_plug_queue = None
84+
85+ async def stop_processing_plug_queue (self ):
86+ """Stop the background task."""
87+ self ._stop_task = True
88+ if self ._monitor_add_plug_queue and not self ._monitor_add_plug_queue .done ():
89+ self ._monitor_add_plug_queue .cancel ()
90+ try :
91+ await self ._monitor_add_plug_queue
92+ except asyncio .CancelledError :
93+ pass
94+ _LOGGER .debug ("Background task stopped" )
95+ self ._monitor_add_plug_queue = None
96+
97+ def _create_api (self , mac_address , ip , port ):
98+ _LOGGER .info (f"Adding API for mac={ mac_address } , ip={ ip } , port={ port } " )
99+ api = PlugApi (mac = mac_address , ip = ip , port = port )
100+ self .plugs [mac_address ] = api
101+ self ._known_plugs .add (mac_address )
39102 known_evs = [
40103 'exception' ,
41104 'average_flow' ,
@@ -53,6 +116,10 @@ def add_api(self, mac, network_info):
53116 api .subscribe (ev , lambda event , message : self .handle_message ( event , message ))
54117 api .connect ()
55118
119+ def add_api (self , network_info ):
120+ self ._create_api (mac_address = network_info ['mac' ], ip = network_info ['host' ], port = network_info ['port' ])
121+
122+
56123 async def handle_message (self , event : str , message : dict ):
57124 mac = message ['mac' ]
58125 if mac not in self .plugs .keys ():
@@ -79,18 +146,55 @@ async def disconnect(self):
79146 if unsubscribe is not None :
80147 unsubscribe ()
81148
149+ await self .stop_processing_plug_queue ()
150+
82151 @callback
83152 def _acknowledge_sensor_added_to_homeassistant (self ,mac , role ):
84153 self .sensors [mac ] = role
85154
86155 @callback
87- def _plug_added (self , info ):
88- _LOGGER .error (f" Request to add plug received: { info } " )
156+ async def _acknowledge_plug_added_to_homeassistant (self , mac_address , host , port ):
157+ self ._create_api (mac_address , host , port )
158+ await self ._plug_added_queue .remove ((mac_address , host , port ))
159+
160+ @callback
161+ async def _plug_added (self , info ):
162+ _LOGGER .debug (f" Request to add plug received: { info } " )
163+ network_info = dict ()
164+ network_info ['mac' ] = info ['properties' ][b'id' ].decode ('utf-8' )
165+ network_info ['host' ] = info ['addresses' ][0 ]
166+ network_info ['port' ] = info ['port' ]
167+
168+ if self ._safe_to_process_plug_queue :
169+ await self .enqueue_plug_for_adding (network_info )
170+ await self .process_plug_queue ()
171+ else :
172+ await self .enqueue_plug_for_adding (network_info )
89173
90174 @callback
91- def _plug_updated (self , info ):
92- _LOGGER .error (f" Request to update plug received: { info } " )
175+ async def _plug_updated (self , info ):
176+ _LOGGER .debug (f" Request to update plug received: { info } " )
177+ mac = info ['properties' ][b'id' ].decode ('utf-8' )
178+ host = info ['addresses' ][0 ]
179+ port = info ['port' ]
180+ if mac in self .plugs :
181+ self .plugs [mac ].disconnect ()
182+
183+ if mac in self ._known_plugs :
184+ self ._create_api (mac , host , port )
185+ else :
186+ network_info = dict ()
187+ network_info ['mac' ] = mac
188+ network_info ['host' ] = host
189+ network_info ['port' ] = port
190+ await self .enqueue_plug_for_adding (network_info )
191+ await self .process_plug_queue ()
93192
94193 @callback
95194 def _plug_remove (self ,name , info ):
96- _LOGGER .error (f" Request to delete plug received: { info } " )
195+ _LOGGER .debug (f" Request to delete plug received: { info } " )
196+ mac = info ['properties' ][b'id' ].decode ('utf-8' )
197+ if mac in self .plugs :
198+ self .plugs [mac ].disconnect ()
199+
200+ del self .plugs [mac ]
0 commit comments