Skip to content

Commit f6f1450

Browse files
committed
Buffer improvements
1 parent 3bb67a5 commit f6f1450

File tree

2 files changed

+90
-44
lines changed

2 files changed

+90
-44
lines changed

tesla_fleet_api/tesla/vehicle/bluetooth.py

Lines changed: 90 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
import hashlib
44
import asyncio
5-
from typing import TYPE_CHECKING
5+
import struct
6+
from typing import TYPE_CHECKING, Callable, Any
67
from google.protobuf.message import DecodeError
78
from bleak_retry_connector import establish_connection, MAX_CONNECT_ATTEMPTS
89
from bleak import BleakClient, BleakScanner
@@ -75,6 +76,70 @@ def prependLength(message: bytes) -> bytearray:
7576
"""Prepend a 2-byte length to the payload."""
7677
return bytearray([len(message) >> 8, len(message) & 0xFF]) + message
7778

79+
class ReassemblingBuffer:
80+
"""
81+
Reassembles bytearray streams where the first two bytes indicate the length of the message.
82+
Handles potential packet corruption by discarding *entire* packets and retrying.
83+
Uses a callback to process parsed messages.
84+
"""
85+
86+
def __init__(self, callback: Callable[[RoutableMessage], None]):
87+
"""
88+
Initializes the buffer.
89+
90+
Args:
91+
message_type: The protobuf message type (e.g., RoutableMessage) to parse the assembled data.
92+
callback: A function that will be called with each parsed message.
93+
"""
94+
self.buffer = bytearray()
95+
self.expected_length = None
96+
self.packet_starts = []
97+
self.callback = callback
98+
99+
def receive_data(self, data: bytearray):
100+
"""
101+
Receives a chunk of bytearray data and attempts to assemble a complete message.
102+
103+
Args:
104+
data: The received bytearray data.
105+
"""
106+
self.packet_starts.append(len(self.buffer))
107+
self.buffer.extend(data)
108+
109+
while True:
110+
if self.expected_length is None and len(self.buffer) >= 2:
111+
self.expected_length = struct.unpack(">H", self.buffer[:2])[0] + 2
112+
113+
LOGGER.info(f"Buffer length: {len(self.buffer)}, Packet starts: {self.packet_starts}, Expected length: {self.expected_length}")
114+
115+
if self.expected_length is not None and self.expected_length > 1024:
116+
LOGGER.warning(f"Expected length too large: {self.expected_length}")
117+
self.discard_packet()
118+
119+
elif self.expected_length is not None and len(self.buffer) >= self.expected_length:
120+
try:
121+
message = RoutableMessage()
122+
message.ParseFromString(bytes(self.buffer[2:self.expected_length]))
123+
self.buffer = self.buffer[self.expected_length:]
124+
self.packet_starts = [x - self.expected_length for x in self.packet_starts if x >= self.expected_length]
125+
self.expected_length = None
126+
self.callback(message) # Call the callback with the parsed message
127+
128+
except DecodeError:
129+
self.discard_packet()
130+
else:
131+
return
132+
133+
def discard_packet(self):
134+
self.packet_starts.pop(0)
135+
if len(self.packet_starts) > 0:
136+
self.buffer = self.buffer[self.packet_starts[0]:]
137+
self.packet_starts = [x - self.packet_starts[0] for x in self.packet_starts]
138+
else:
139+
self.buffer = bytearray()
140+
self.packet_starts = []
141+
self.expected_length = None
142+
78143
class VehicleBluetooth(Commands):
79144
"""Class describing the Tesla Fleet API vehicle endpoints and commands for a specific vehicle with command signing."""
80145

@@ -83,8 +148,7 @@ class VehicleBluetooth(Commands):
83148
client: BleakClient | None = None
84149
_queues: dict[Domain, asyncio.Queue]
85150
_ekey: ec.EllipticCurvePublicKey
86-
_recv: bytearray = bytearray()
87-
_recv_len: int = 0
151+
_buffer: ReassemblingBuffer
88152
_auth_method = "aes"
89153

90154
def __init__(
@@ -98,6 +162,7 @@ def __init__(
98162
}
99163
self.device = device
100164
self._connect_lock = asyncio.Lock()
165+
self._buffer = ReassemblingBuffer(self._on_message)
101166

102167
async def find_vehicle(self, name: str | None = None, address: str | None = None, scanner: BleakScanner | None = None) -> BLEDevice:
103168
"""Find the Tesla BLE device."""
@@ -146,6 +211,7 @@ async def connect_if_needed(self, max_attempts: int = MAX_CONNECT_ATTEMPTS) -> N
146211
"""Connect to the Tesla BLE device if not already connected."""
147212
async with self._connect_lock:
148213
if not self.client or not self.client.is_connected:
214+
LOGGER.info(f"Reconnecting to {self.ble_name}")
149215
await self.connect(max_attempts=max_attempts)
150216

151217
async def __aenter__(self) -> VehicleBluetooth:
@@ -157,70 +223,44 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
157223
"""Exit the async context."""
158224
await self.disconnect()
159225

160-
async def _on_notify(self,sender: BleakGATTCharacteristic,data : bytearray) -> None:
226+
def _on_notify(self,sender: BleakGATTCharacteristic, data: bytearray) -> None:
161227
"""Receive data from the Tesla BLE device."""
162-
if self._recv_len:
163-
self._recv += data
164-
else:
165-
self._recv_len = int.from_bytes(data[:2], 'big')
166-
if self._recv_len > 1024:
167-
LOGGER.error("Parsed very large message length")
168-
self._recv = bytearray()
169-
self._recv_len = 0
170-
return
171-
self._recv = data[2:]
172-
#while len(self._recv) > self._recv_len:
173-
#
174-
# # Maybe this needs to trigger a reset
175-
# await self._on_message(bytes(self._recv[:self._recv_len]))
176-
# self._recv_len = int.from_bytes(self._recv[self._recv_len:self._recv_len+2], 'big')
177-
# self._recv = self._recv[self._recv_len+2:]
178-
# continue
179-
if len(self._recv) >= self._recv_len:
180-
if len(self._recv) > self._recv_len:
181-
LOGGER.debug(f"Received more data than expected: {len(self._recv)} > {self._recv_len}")
182-
try:
183-
msg = RoutableMessage.FromString(bytes(self._recv[:self._recv_len]))
184-
await self._on_message(msg)
185-
self._recv = bytearray()
186-
self._recv_len = 0
187-
except DecodeError:
188-
# Attempt parsing the whole payload
189-
msg = RoutableMessage.FromString(bytes(self._recv))
190-
LOGGER.warn(f"Parsed more data than length: {len(self._recv)} > {self._recv_len}")
191-
await self._on_message(msg)
192-
self._recv = bytearray()
193-
self._recv_len = 0
194-
195-
async def _on_message(self, msg: RoutableMessage) -> None:
228+
if sender.uuid != READ_UUID:
229+
LOGGER.error(f"Unexpected sender: {sender}")
230+
return
231+
self._buffer.receive_data(data)
232+
233+
def _on_message(self, msg: RoutableMessage) -> None:
196234
"""Receive messages from the Tesla BLE data."""
197235

198236
if(msg.to_destination.routing_address != self._from_destination):
199237
# Ignore ephemeral key broadcasts
200238
return
201239

202-
LOGGER.debug(f"Received response: {msg}")
203-
await self._queues[msg.from_destination.domain].put(msg)
240+
LOGGER.info(f"Received response: {msg}")
241+
self._queues[msg.from_destination.domain].put_nowait(msg)
204242

205-
async def _send(self, msg: RoutableMessage, requires: str, timeout: int = 2) -> RoutableMessage:
243+
async def _send(self, msg: RoutableMessage, requires: str, timeout: int = 5) -> RoutableMessage:
206244
"""Serialize a message and send to the vehicle and wait for a response."""
207245

208246
domain = msg.to_destination.domain
209247
async with self._sessions[domain].lock:
210-
LOGGER.debug(f"Sending message {msg}")
248+
LOGGER.info(f"Sending message {msg}")
211249

212250
payload = prependLength(msg.SerializeToString())
213251

214252
# Empty the queue before sending the message
215253
while not self._queues[domain].empty():
216-
await self._queues[domain].get()
254+
msg = await self._queues[domain].get()
255+
LOGGER.warning(f"Discarded message {msg}")
217256

218257
await self.connect_if_needed()
219258
assert self.client is not None
220259
await self.client.write_gatt_char(WRITE_UUID, payload, True)
221260

222261
# Process the response
223262
async with asyncio.timeout(timeout):
263+
LOGGER.info(f"Waiting for response with {requires}")
224264
while True:
225265
resp = await self._queues[domain].get()
226266
LOGGER.debug(f"Received message {resp}")
@@ -229,6 +269,8 @@ async def _send(self, msg: RoutableMessage, requires: str, timeout: int = 2) ->
229269

230270
if resp.HasField(requires):
231271
return resp
272+
else:
273+
LOGGER.warning(f"Ignoring message since it does not contain the required field {requires}, {resp.HasField(requires)}")
232274

233275
async def query_display_name(self, max_attempts=5) -> str | None:
234276
"""Read the device name via GATT characteristic if available"""
@@ -270,6 +312,11 @@ async def query_version(self) -> int | None:
270312
LOGGER.error(f"Failed to read device version: {e}")
271313
return None
272314

315+
async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> dict[str, Any]:
316+
"""Serialize a message and send to the signed command endpoint."""
317+
await self.connect_if_needed()
318+
return await super()._command(domain, command, attempt)
319+
273320
async def pair(self, role: Role = Role.ROLE_OWNER, form: KeyFormFactor = KeyFormFactor.KEY_FORM_FACTOR_CLOUD_KEY, timeout: int = 60):
274321
"""Pair the key."""
275322

tesla_fleet_api/tesla/vehicle/commands.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ def validate_msg(self, msg: RoutableMessage) -> None:
269269
if msg.signedMessageStatus.signed_message_fault > 0:
270270
raise MESSAGE_FAULTS[msg.signedMessageStatus.signed_message_fault]
271271

272-
@abstractmethod
273272
async def _command(self, domain: Domain, command: bytes, attempt: int = 0) -> dict[str, Any]:
274273
"""Serialize a message and send to the signed command endpoint."""
275274
session = self._sessions[domain]

0 commit comments

Comments
 (0)