|
1 | | -from collections.abc import Callable |
2 | | -import aiohttp |
3 | | -import asyncio |
4 | | -import json |
5 | | -import logging |
6 | | -from datetime import datetime, timezone |
| 1 | +from .stream import TeslemetryStream |
| 2 | +from .vehicle import TeslemetryStreamVehicle |
| 3 | +from .exception import ( |
| 4 | + TeslemetryStreamError, |
| 5 | + TeslemetryStreamConnectionError, |
| 6 | + TeslemetryStreamVehicleNotConfigured, |
| 7 | + TeslemetryStreamEnded |
| 8 | +) |
7 | 9 | from .const import TelemetryFields, TelemetryAlerts |
8 | 10 |
|
9 | | - |
10 | | -LOGGER = logging.getLogger(__package__) |
11 | | -DELAY = 1 |
12 | | - |
13 | | - |
14 | | -class TeslemetryStreamError(Exception): |
15 | | - """Teslemetry Stream Error""" |
16 | | - |
17 | | - message = "An error occurred with the Teslemetry Stream." |
18 | | - |
19 | | - def __init__(self) -> None: |
20 | | - super().__init__(self.message) |
21 | | - |
22 | | - |
23 | | -class TeslemetryStreamConnectionError(TeslemetryStreamError): |
24 | | - """Teslemetry Stream Connection Error""" |
25 | | - |
26 | | - message = "An error occurred with the Teslemetry Stream connection." |
27 | | - |
28 | | - |
29 | | -class TeslemetryStreamVehicleNotConfigured(TeslemetryStreamError): |
30 | | - """Teslemetry Stream Not Active Error""" |
31 | | - |
32 | | - message = "This vehicle is not configured to connect to Teslemetry." |
33 | | - |
34 | | - |
35 | | -class TeslemetryStreamEnded(TeslemetryStreamError): |
36 | | - """Teslemetry Stream Connection Error""" |
37 | | - |
38 | | - message = "The stream was ended by the server." |
39 | | - |
40 | | - |
41 | | -class TeslemetryStream: |
42 | | - """Teslemetry Stream Client""" |
43 | | - |
44 | | - fields: dict[TelemetryFields, dict[str, int]] | None = None |
45 | | - alerts: list[TelemetryAlerts] | None = None |
46 | | - preferTyped: bool |
47 | | - _response: aiohttp.ClientResponse | None = None |
48 | | - _listeners: dict[Callable, Callable] |
49 | | - delay: int |
50 | | - active = None |
51 | | - |
52 | | - def __init__( |
53 | | - self, |
54 | | - session: aiohttp.ClientSession, |
55 | | - access_token: str, |
56 | | - server: str | None = None, |
57 | | - vin: str | None = None, |
58 | | - parse_timestamp: bool = False, |
59 | | - ): |
60 | | - if not server and not vin: |
61 | | - raise ValueError("Either server or VIN is required") |
62 | | - |
63 | | - if server and not server.endswith(".teslemetry.com"): |
64 | | - raise ValueError("Server must be a teslemetry.com domain") |
65 | | - |
66 | | - self.vin = vin |
67 | | - self.server = server |
68 | | - self._listeners = {} |
69 | | - self._session = session |
70 | | - self._headers = {"Authorization": f"Bearer {access_token}", "X-Library": "python teslemetry-stream"} |
71 | | - self.parse_timestamp = parse_timestamp |
72 | | - self.delay = DELAY |
73 | | - |
74 | | - @property |
75 | | - def connected(self) -> bool: |
76 | | - """Return if connected.""" |
77 | | - return self._response is not None |
78 | | - |
79 | | - async def get_config(self, vin: str | None = None) -> None: |
80 | | - """Get the current stream config.""" |
81 | | - |
82 | | - vin = vin or self.vin |
83 | | - |
84 | | - if not vin: |
85 | | - raise ValueError("VIN is required") |
86 | | - |
87 | | - LOGGER.debug("Getting fleet telemetry config from %s", vin) |
88 | | - req = await self._session.get( |
89 | | - f"https://api.teslemetry.com/api/1/vehicles/{vin}/fleet_telemetry_config", |
90 | | - headers=self._headers, |
91 | | - raise_for_status=True, |
92 | | - ) |
93 | | - response = (await req.json()).get("response") |
94 | | - |
95 | | - if ( |
96 | | - response |
97 | | - and (config := response.get("config")) |
98 | | - and config["hostname"].endswith(".teslemetry.com") |
99 | | - ): |
100 | | - self.server = config["hostname"] |
101 | | - self.fields = config["fields"] |
102 | | - self.alerts = config["alert_types"] |
103 | | - self.preferTyped = config["prefer_typed"] |
104 | | - else: |
105 | | - raise TeslemetryStreamVehicleNotConfigured() |
106 | | - if not response.get("synced"): |
107 | | - LOGGER.warning("Vehicle configuration not active") |
108 | | - |
109 | | - async def prefer_typed(self, prefer_typed: bool = True, vin: str | None = None) -> dict: |
110 | | - """Set prefer typed.""" |
111 | | - assert (vin or self.vin) |
112 | | - resp = await self._session.patch( |
113 | | - f"https://api.teslemetry.com/api/config/{vin or self.vin}", |
114 | | - headers=self._headers, |
115 | | - json={"prefer_typed": prefer_typed}, |
116 | | - raise_for_status=False, |
117 | | - ) |
118 | | - if resp.ok: |
119 | | - self.preferTyped = prefer_typed |
120 | | - return await resp.json() |
121 | | - |
122 | | - async def update_fields(self, fields: dict, vin: str | None = None) -> dict: |
123 | | - """Update Fleet Telemetry configuration""" |
124 | | - assert (vin or self.vin) |
125 | | - resp = await self._session.patch( |
126 | | - f"https://api.teslemetry.com/api/config/{vin or self.vin}", |
127 | | - headers=self._headers, |
128 | | - json={"fields": fields}, |
129 | | - raise_for_status=False, |
130 | | - ) |
131 | | - if resp.ok: |
132 | | - self.fields = {**self.fields, **fields} |
133 | | - return await resp.json() |
134 | | - |
135 | | - async def replace_fields(self, fields: dict, vin: str | None = None) -> dict: |
136 | | - """Replace Fleet Telemetry configuration""" |
137 | | - resp = await self._session.post( |
138 | | - f"https://api.teslemetry.com/api/config/{vin or self.vin}", |
139 | | - headers=self._headers, |
140 | | - json={"fields": fields}, |
141 | | - raise_for_status=False, |
142 | | - ) |
143 | | - if resp.ok: |
144 | | - self.fields = fields |
145 | | - return await resp.json() |
146 | | - |
147 | | - @property |
148 | | - def config(self) -> dict: |
149 | | - """Return current configuration.""" |
150 | | - return { |
151 | | - "hostname": self.server, |
152 | | - "fields": self.fields, |
153 | | - "alerts": self.alerts, |
154 | | - } |
155 | | - |
156 | | - async def connect(self) -> None: |
157 | | - """Connect to the telemetry stream.""" |
158 | | - self.active = True |
159 | | - if not self.server: |
160 | | - await self.get_config() |
161 | | - |
162 | | - LOGGER.debug("Connecting to %s", self.server) |
163 | | - self._response = await self._session.get( |
164 | | - f"https://{self.server}/sse/{self.vin or ''}", |
165 | | - headers=self._headers, |
166 | | - raise_for_status=True, |
167 | | - timeout=aiohttp.ClientTimeout( |
168 | | - connect=5, sock_connect=5, sock_read=30, total=None |
169 | | - ), |
170 | | - ) |
171 | | - LOGGER.debug( |
172 | | - "Connected to %s with status %s", self._response.url, self._response.status |
173 | | - ) |
174 | | - |
175 | | - def close(self) -> None: |
176 | | - """Close connection.""" |
177 | | - if self._response is not None: |
178 | | - LOGGER.debug("Disconnecting from %s", self.server) |
179 | | - self._response.close() |
180 | | - self._response = None |
181 | | - |
182 | | - def __aiter__(self): |
183 | | - """Return""" |
184 | | - return self |
185 | | - |
186 | | - async def __anext__(self) -> dict: |
187 | | - """Return next event.""" |
188 | | - try: |
189 | | - if self.active is False: |
190 | | - # Stop the stream and loop |
191 | | - self.close() |
192 | | - raise StopAsyncIteration |
193 | | - if not self._response: |
194 | | - # Connect to the stream |
195 | | - await self.connect() |
196 | | - async for line_in_bytes in self._response.content: |
197 | | - field, _, value = line_in_bytes.decode("utf8").partition(": ") |
198 | | - if field == "data": |
199 | | - data = json.loads(value) |
200 | | - if self.parse_timestamp: |
201 | | - main, _, ns = data["createdAt"].partition(".") |
202 | | - data["timestamp"] = int( |
203 | | - datetime.strptime(main, "%Y-%m-%dT%H:%M:%S") |
204 | | - .replace(tzinfo=timezone.utc) |
205 | | - .timestamp() |
206 | | - ) * 1000 + int(ns[:3]) |
207 | | - # LOGGER.debug("event %s", json.dumps(data)) |
208 | | - self.delay = DELAY |
209 | | - return data |
210 | | - raise TeslemetryStreamEnded() |
211 | | - except (TeslemetryStreamEnded, aiohttp.ClientError) as error: |
212 | | - LOGGER.warning("Connection error: %s", error) |
213 | | - self.close() |
214 | | - LOGGER.debug("Reconnecting in %s seconds", self.delay) |
215 | | - await asyncio.sleep(self.delay) |
216 | | - self.delay += self.delay |
217 | | - |
218 | | - def async_add_listener( |
219 | | - self, callback: Callable, filters: dict | None = None |
220 | | - ) -> Callable[[], None]: |
221 | | - """Listen for data updates.""" |
222 | | - schedule_refresh = not self._listeners |
223 | | - |
224 | | - def remove_listener() -> None: |
225 | | - """Remove update listener.""" |
226 | | - self._listeners.pop(remove_listener) |
227 | | - if not self._listeners: |
228 | | - self.active = False |
229 | | - |
230 | | - self._listeners[remove_listener] = (callback, filters) |
231 | | - |
232 | | - # This is the first listener, set up task. |
233 | | - if schedule_refresh: |
234 | | - asyncio.create_task(self.listen()) |
235 | | - |
236 | | - return remove_listener |
237 | | - |
238 | | - async def listen(self): |
239 | | - """Listen to the telemetry stream.""" |
240 | | - |
241 | | - async for event in self: |
242 | | - if event: |
243 | | - for listener, filters in self._listeners.values(): |
244 | | - if recursive_match(filters, event): |
245 | | - listener(event) |
246 | | - LOGGER.debug("Listen has finished") |
247 | | - |
248 | | - |
249 | | -def recursive_match(dict1, dict2): |
250 | | - """Recursively match dict1 with dict2.""" |
251 | | - if dict1 is not None: |
252 | | - for key, value1 in dict1.items(): |
253 | | - if key not in dict2: |
254 | | - # A required key isn't present |
255 | | - return False |
256 | | - value2 = dict2[key] |
257 | | - if isinstance(value1, dict): |
258 | | - # Check the next level of the dict |
259 | | - if not recursive_match(value1, value2): |
260 | | - return False |
261 | | - elif isinstance(value1, list): |
262 | | - # Check each dict in the list |
263 | | - if not all( |
264 | | - any(recursive_match(item1, item2) for item2 in value2) |
265 | | - for item1 in value1 |
266 | | - ): |
267 | | - return False |
268 | | - elif value1 is not None: |
269 | | - # Check the value matches |
270 | | - if value1 != value2: |
271 | | - return False |
272 | | - # No differences found |
273 | | - return True |
| 11 | +__all__ = [ |
| 12 | + "TeslemetryStream", |
| 13 | + "TeslemetryStreamVehicle", |
| 14 | + "TeslemetryStreamError", |
| 15 | + "TeslemetryStreamConnectionError", |
| 16 | + "TeslemetryStreamVehicleNotConfigured", |
| 17 | + "TeslemetryStreamEnded", |
| 18 | + "TelemetryFields", |
| 19 | + "TelemetryAlerts" |
| 20 | +] |
0 commit comments