11from collections .abc import Callable
2+ from typing import Any
23import aiohttp
34import asyncio
45import json
@@ -15,7 +16,7 @@ class TeslemetryStream:
1516 """Teslemetry Stream Client"""
1617
1718 _response : aiohttp .ClientResponse | None = None
18- _listeners : dict [Callable , Callable ]
19+ _listeners : dict [Callable , tuple [ Callable [[ dict [ str , Any ]], None ], dict | None ] ]
1920 delay : int
2021 active = None
2122 vehicle : TeslemetryStreamVehicle
@@ -198,6 +199,7 @@ async def __anext__(self) -> dict:
198199 if not self ._response :
199200 # Connect to the stream
200201 await self .connect ()
202+ assert self ._response
201203 async for line_in_bytes in self ._response .content :
202204 field , _ , value = line_in_bytes .decode ("utf8" ).partition (": " )
203205 if field == "data" :
@@ -219,6 +221,10 @@ async def __anext__(self) -> dict:
219221 LOGGER .debug ("Reconnecting in %s seconds" , self .delay )
220222 await asyncio .sleep (self .delay )
221223 self .delay += self .delay
224+ except Exception as error :
225+ LOGGER .error ("Unexpected error: %s" , error )
226+ self .close ()
227+ LOGGER .debug ("Reconnecting immediately" )
222228
223229 def async_add_listener (
224230 self , callback : Callable , filters : dict | None = None
@@ -238,6 +244,7 @@ def remove_listener() -> None:
238244 """
239245 self ._listeners .pop (remove_listener )
240246 if not self ._listeners :
247+ LOGGER .info ("Shutting down stream as there are no more listeners" )
241248 self .active = False
242249
243250 self ._listeners [remove_listener ] = (callback , filters )
0 commit comments