Skip to content

Commit 12ca350

Browse files
committed
Implement Bluetooth receive queue
1 parent f1f5971 commit 12ca350

File tree

3 files changed

+65
-63
lines changed

3 files changed

+65
-63
lines changed

tesla_fleet_api/tesla/vehicle/bluetooth.py

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import hashlib
4-
from asyncio import Future, get_running_loop
4+
import asyncio
55
from typing import TYPE_CHECKING
66
from google.protobuf.message import DecodeError
77

@@ -85,7 +85,7 @@ class VehicleBluetooth(Commands):
8585

8686
ble_name: str
8787
client: BleakClient
88-
_futures: dict[Domain, Future]
88+
_queues: dict[Domain, asyncio.Queue]
8989
_ekey: ec.EllipticCurvePublicKey
9090
_recv: bytearray = bytearray()
9191
_recv_len: int = 0
@@ -96,7 +96,10 @@ def __init__(
9696
):
9797
super().__init__(parent, vin, key)
9898
self.ble_name = "S" + hashlib.sha1(vin.encode('utf-8')).hexdigest()[:16] + "C"
99-
self._futures = {}
99+
self._queues = {
100+
Domain.DOMAIN_VEHICLE_SECURITY: asyncio.Queue(),
101+
Domain.DOMAIN_INFOTAINMENT: asyncio.Queue(),
102+
}
100103
if device is not None:
101104
self.client = BleakClient(device, services=[SERVICE_UUID])
102105

@@ -135,7 +138,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
135138
"""Exit the async context."""
136139
await self.disconnect()
137140

138-
def _on_notify(self,sender: BleakGATTCharacteristic,data : bytearray) -> None:
141+
async def _on_notify(self,sender: BleakGATTCharacteristic,data : bytearray) -> None:
139142
"""Receive data from the Tesla BLE device."""
140143
if self._recv_len:
141144
self._recv += data
@@ -145,16 +148,16 @@ def _on_notify(self,sender: BleakGATTCharacteristic,data : bytearray) -> None:
145148
LOGGER.debug(f"Received {len(self._recv)} of {self._recv_len} bytes")
146149
while len(self._recv) > self._recv_len:
147150
LOGGER.warn(f"Received more data than expected: {len(self._recv)} > {self._recv_len}")
148-
self._on_message(bytes(self._recv[:self._recv_len]))
151+
await self._on_message(bytes(self._recv[:self._recv_len]))
149152
self._recv_len = int.from_bytes(self._recv[self._recv_len:self._recv_len+2], 'big')
150153
self._recv = self._recv[self._recv_len+2:]
151154
continue
152155
if len(self._recv) == self._recv_len:
153-
self._on_message(bytes(self._recv))
156+
await self._on_message(bytes(self._recv))
154157
self._recv = bytearray()
155158
self._recv_len = 0
156159

157-
def _on_message(self, data:bytes) -> None:
160+
async def _on_message(self, data:bytes) -> None:
158161
"""Receive messages from the Tesla BLE data."""
159162
try:
160163
msg = RoutableMessage.FromString(data)
@@ -164,57 +167,36 @@ def _on_message(self, data:bytes) -> None:
164167
self._recv_len = 0
165168
return
166169

167-
# Update Session
168-
if(msg.session_info):
169-
info = SessionInfo.FromString(msg.session_info)
170-
# maybe dont?
171-
if(info.status == Session_Info_Status.SESSION_INFO_STATUS_KEY_NOT_ON_WHITELIST):
172-
self._futures[msg.from_destination.domain].set_exception(NotOnWhitelistFault())
173-
return
174-
self._sessions[msg.from_destination.domain].update(info)
175-
176170
if(msg.to_destination.routing_address != self._from_destination):
177-
# Get the ephemeral key here and save to self._ekey
178-
return
179-
180-
if(msg.from_destination.domain in self._futures):
181-
LOGGER.debug(f"Received response for request {msg.request_uuid}")
182-
self._futures[msg.from_destination.domain].set_result(msg)
183-
del self._futures[msg.from_destination.domain]
171+
# Ignore ephemeral key broadcasts
184172
return
185173

186-
if msg.from_destination.domain == Domain.DOMAIN_VEHICLE_SECURITY:
187-
submsg = FromVCSECMessage.FromString(msg.protobuf_message_as_bytes)
188-
LOGGER.warning(f"Received orphaned VCSEC response: {submsg}")
189-
elif msg.from_destination.domain == Domain.DOMAIN_INFOTAINMENT:
190-
submsg = Response.FromString(msg.protobuf_message_as_bytes)
191-
LOGGER.warning(f"Received orphaned INFOTAINMENT response: {submsg}")
192-
else:
193-
LOGGER.warning(f"Received orphaned response: {msg}")
194-
195-
async def _create_future(self, domain: Domain) -> Future:
196-
if(not self._sessions[domain].lock.locked):
197-
raise ValueError("Session is not locked")
198-
self._futures[domain] = get_running_loop().create_future()
199-
return self._futures[domain]
174+
LOGGER.info(f"Received response: {msg}")
175+
await self._queues[msg.from_destination.domain].put(msg)
200176

201-
async def _send(self, msg: RoutableMessage) -> RoutableMessage:
177+
async def _send(self, msg: RoutableMessage, requires: str) -> RoutableMessage:
202178
"""Serialize a message and send to the vehicle and wait for a response."""
203179
domain = msg.to_destination.domain
204180
async with self._sessions[domain].lock:
205181
LOGGER.debug(f"Sending message {msg}")
206-
future = await self._create_future(domain)
182+
207183
payload = prependLength(msg.SerializeToString())
208184

185+
# Empty the queue before sending the message
186+
while not self._queues[domain].empty():
187+
await self._queues[domain].get()
209188
await self.client.write_gatt_char(WRITE_UUID, payload, True)
210189

211-
resp = await future
212-
LOGGER.debug(f"Received message {resp}")
190+
# Process the response
191+
async with asyncio.timeout(10):
192+
while True:
193+
resp = await self._queues[domain].get()
194+
LOGGER.debug(f"Received message {resp}")
213195

214-
if resp.signedMessageStatus.signed_message_fault > 0:
215-
raise MESSAGE_FAULTS[resp.signedMessageStatus.signed_message_fault]
196+
self.validate_msg(resp)
216197

217-
return resp
198+
if resp.HasField(requires):
199+
return resp
218200

219201
async def pair(self, role: Role = Role.ROLE_OWNER, form: KeyFormFactor = KeyFormFactor.KEY_FORM_FACTOR_CLOUD_KEY):
220202
"""Pair the key."""

tesla_fleet_api/tesla/vehicle/commands.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
from asyncio import Lock, sleep
1515

1616
from tesla_fleet_api.exceptions import (
17+
MESSAGE_FAULTS,
1718
SIGNED_MESSAGE_INFORMATION_FAULTS,
19+
NotOnWhitelistFault,
1820
#TeslaFleetMessageFaultInvalidSignature,
1921
TeslaFleetMessageFaultIncorrectEpoch,
2022
TeslaFleetMessageFaultInvalidTokenOrCounter,
@@ -38,6 +40,7 @@
3840
Response,
3941
)
4042
from tesla_fleet_api.tesla.vehicle.proto.signatures_pb2 import (
43+
Session_Info_Status,
4144
SignatureType,
4245
Tag,
4346
AES_GCM_Personalized_Signature_Data,
@@ -251,10 +254,21 @@ def shared_key(self, vehicleKey: bytes) -> bytes:
251254

252255

253256
@abstractmethod
254-
async def _send(self, msg: RoutableMessage) -> RoutableMessage:
257+
async def _send(self, msg: RoutableMessage, requires: str) -> RoutableMessage:
255258
"""Transmit the message to the vehicle."""
256259
raise NotImplementedError
257260

261+
def validate_msg(self, msg: RoutableMessage) -> None:
262+
"""Validate the message."""
263+
if(msg.session_info):
264+
info = SessionInfo.FromString(msg.session_info)
265+
if(info.status == Session_Info_Status.SESSION_INFO_STATUS_KEY_NOT_ON_WHITELIST):
266+
raise NotOnWhitelistFault
267+
self._sessions[msg.from_destination.domain].update(info)
268+
269+
if msg.signedMessageStatus.signed_message_fault > 0:
270+
raise MESSAGE_FAULTS[msg.signedMessageStatus.signed_message_fault]
271+
258272
@abstractmethod
259273
async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> dict[str, Any]:
260274
"""Serialize a message and send to the signed command endpoint."""
@@ -270,7 +284,7 @@ async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> di
270284
raise ValueError(f"Unknown auth method: {self._auth_method}")
271285

272286
try:
273-
resp = await self._send(msg)
287+
resp = await self._send(msg, "protobuf_message_as_bytes")
274288
except (
275289
#TeslaFleetMessageFaultInvalidSignature,
276290
TeslaFleetMessageFaultIncorrectEpoch,
@@ -334,7 +348,11 @@ async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> di
334348
resp.protobuf_message_as_bytes = aesgcm.decrypt(resp.signature_data.AES_GCM_Response_data.nonce, resp.protobuf_message_as_bytes + resp.signature_data.AES_GCM_Response_data.tag, aad.finalize())
335349

336350
if(resp.from_destination.domain == Domain.DOMAIN_VEHICLE_SECURITY):
337-
vcsec = FromVCSECMessage.FromString(resp.protobuf_message_as_bytes)
351+
try:
352+
vcsec = FromVCSECMessage.FromString(resp.protobuf_message_as_bytes)
353+
except Exception as e:
354+
LOGGER.error("Failed to parse VCSEC message: %s %s", e, resp)
355+
raise e
338356
LOGGER.debug("VCSEC Response: %s", vcsec)
339357
if vcsec.HasField("nominalError"):
340358
LOGGER.error("Command failed with reason: %s", vcsec.nominalError.genericError)
@@ -344,6 +362,10 @@ async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> di
344362
"reason": GenericError_E.Name(vcsec.nominalError.genericError)
345363
}
346364
}
365+
elif vcsec.HasField("vehicleStatus"):
366+
return {
367+
"response": vcsec.vehicleStatus
368+
}
347369
elif vcsec.commandStatus.operationStatus == OperationStatus_E.OPERATIONSTATUS_OK:
348370
return {"response": {"result": True, "reason": ""}}
349371
elif vcsec.commandStatus.operationStatus == OperationStatus_E.OPERATIONSTATUS_WAIT:
@@ -359,7 +381,11 @@ async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> di
359381
raise SIGNED_MESSAGE_INFORMATION_FAULTS[vcsec.commandStatus.signedMessageStatus.signedMessageInformation]
360382

361383
elif(resp.from_destination.domain == Domain.DOMAIN_INFOTAINMENT):
362-
response = Response.FromString(resp.protobuf_message_as_bytes)
384+
try:
385+
response = Response.FromString(resp.protobuf_message_as_bytes)
386+
except Exception as e:
387+
LOGGER.error("Failed to parse Infotainment Response: %s %s", e, resp)
388+
raise e
363389
LOGGER.debug("Infotainment Response: %s", response)
364390
if (response.HasField("ping")):
365391
return {
@@ -534,7 +560,7 @@ async def _handshake(self, domain: Domain) -> bool:
534560
uuid=randbytes(16)
535561
)
536562

537-
await self._send(msg)
563+
await self._send(msg, "session_info")
538564
return self._sessions[domain].ready
539565

540566
async def ping(self) -> dict[str, Any]:

tesla_fleet_api/tesla/vehicle/signed.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
from tesla_fleet_api.tesla.vehicle.commands import Commands
88
from tesla_fleet_api.exceptions import (
99
MESSAGE_FAULTS,
10+
NotOnWhitelistFault,
1011
)
1112
from tesla_fleet_api.tesla.vehicle.proto.signatures_pb2 import (
13+
Session_Info_Status,
1214
SessionInfo,
1315
)
1416
from tesla_fleet_api.tesla.vehicle.proto.universal_message_pb2 import (
@@ -31,25 +33,17 @@ def __init__(self, parent: TeslaFleetApi, vin: str):
3133
super(Commands, self).__init__(parent, vin)
3234

3335

34-
async def _send(self, msg: RoutableMessage) -> RoutableMessage:
36+
async def _send(self, msg: RoutableMessage, requires: str) -> RoutableMessage:
3537
"""Serialize a message and send to the signed command endpoint."""
38+
# requires isnt used because Fleet API messages are singular
3639

3740
async with self._sessions[msg.to_destination.domain].lock:
38-
resp = await self.signed_command(
41+
json = await self.signed_command(
3942
base64.b64encode(msg.SerializeToString()).decode()
4043
)
4144

42-
resp_msg = RoutableMessage.FromString(base64.b64decode(resp["response"]))
45+
resp = RoutableMessage.FromString(base64.b64decode(json["response"]))
4346

44-
# Check UUID?
45-
# Check RoutingAdress?
47+
self.validate_msg(resp)
4648

47-
if resp_msg.session_info:
48-
self._sessions[resp_msg.from_destination.domain].update(
49-
SessionInfo.FromString(resp_msg.session_info), self.private_key
50-
)
51-
52-
if resp_msg.signedMessageStatus.signed_message_fault:
53-
raise MESSAGE_FAULTS[resp_msg.signedMessageStatus.signed_message_fault]
54-
55-
return resp_msg
49+
return resp

0 commit comments

Comments
 (0)