Skip to content

Commit 1c8d3a1

Browse files
authored
refactor: move low level layer into BTHomeData (#332)
1 parent f5341c8 commit 1c8d3a1

File tree

1 file changed

+71
-78
lines changed

1 file changed

+71
-78
lines changed

src/bthome_ble/parser.py

Lines changed: 71 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -58,46 +58,6 @@ def to_mac(addr: bytes) -> str:
5858
return ":".join(f"{i:02X}" for i in addr)
5959

6060

61-
def get_adv_info(service_data: bytes) -> int:
62-
"""Extracts the advertisement info."""
63-
return service_data[0]
64-
65-
66-
def is_encrypted(service_data: bytes) -> bool:
67-
"""Checks if the encryption flag is set."""
68-
return bool(get_adv_info(service_data) & (1 << 0)) # bit 0
69-
70-
71-
def is_mac_included(service_data: bytes) -> bool:
72-
"""Checks if the MAC is included flag is set."""
73-
# If True, the first 6 bytes contain the mac address
74-
return bool(get_adv_info(service_data) & (1 << 1)) # bit 1
75-
76-
77-
def is_sleepy_device(service_data: bytes) -> bool:
78-
"""Checks if device has sleepy flag set."""
79-
# If True, the device is only updating when trigger
80-
return bool(get_adv_info(service_data) & (1 << 2)) # bit 2
81-
82-
83-
def get_version(service_data: bytes) -> int:
84-
"""Extracts the version from the advertisement info."""
85-
return (get_adv_info(service_data) >> 5) & 7 # 3 bits (5-7)
86-
87-
88-
def get_mac(service_data: bytes) -> str:
89-
"""Extracts the MAC."""
90-
bthome_mac_reversed = service_data[1:7]
91-
return to_mac(bthome_mac_reversed[::-1])
92-
93-
94-
def get_payload(service_data: bytes) -> bytes:
95-
"""Extracts the payload (removes MAC and advertisement info)."""
96-
if is_mac_included(service_data):
97-
return service_data[7:]
98-
return service_data[1:]
99-
100-
10161
def find_bthome_uuid(service_info: BluetoothServiceInfoBleak) -> UuidType | None:
10262
"""Searches for the first bthome UUID."""
10363
# Iterates over a dictionary, and one device should use only
@@ -205,10 +165,59 @@ def __init__(
205165
self.uuid_type = uuid_type
206166
self.service_info = service_info
207167

168+
def get_address(self) -> str:
169+
"""Returns the mac address."""
170+
return self.service_info.address
171+
172+
def get_time(self) -> float:
173+
"""Returns the time stamp."""
174+
return self.service_info.time
175+
208176
def _get_service_data(self) -> bytes:
209177
"""Returns the bthome service data."""
210178
return self.service_info.service_data.get(self.uuid_type.value)
211179

180+
def _get_adv_info(self) -> int:
181+
"""Extracts the advertisement info."""
182+
return self._get_service_data()[0]
183+
184+
def _is_encrypted(self) -> bool:
185+
"""Checks if the encryption flag is set."""
186+
return bool(self._get_adv_info() & (1 << 0)) # bit 0
187+
188+
def _is_mac_included(self) -> bool:
189+
"""Checks if the MAC is included flag is set."""
190+
# If True, the first 6 bytes contain the mac address
191+
return bool(self._get_adv_info() & (1 << 1)) # bit 1
192+
193+
def _is_sleepy_device(self) -> bool:
194+
"""Checks if device has sleepy flag set."""
195+
# If True, the device is only updating when trigger
196+
return bool(self._get_adv_info() & (1 << 2)) # bit 2
197+
198+
def _get_version(self) -> int:
199+
"""Extracts the version from the advertisement info."""
200+
return (self._get_adv_info() >> 5) & 7 # 3 bits (5-7)
201+
202+
def _get_mac(self) -> str:
203+
"""Extracts the MAC."""
204+
bthome_mac_reversed = self._get_service_data()[1:7]
205+
return to_mac(bthome_mac_reversed[::-1])
206+
207+
def _get_payload(self) -> bytes:
208+
"""Extracts the payload (removes MAC and advertisement info)."""
209+
if self._is_mac_included():
210+
return self._get_service_data()[7:]
211+
return self._get_service_data()[1:]
212+
213+
def get_counter(self) -> bytes:
214+
"""Extracts the encryption counter bytes."""
215+
return self._get_service_data()[-8:-4]
216+
217+
def get_mic(self) -> bytes:
218+
"""Extracts the encryption mic."""
219+
return self._get_service_data()[-4:]
220+
212221
def get_encryption_scheme(self) -> EncryptionScheme:
213222
"""
214223
Returns the encryption schema using the UUID for V1 and the
@@ -220,18 +229,10 @@ def get_encryption_scheme(self) -> EncryptionScheme:
220229
case UuidType.V1_ENCRYPTED:
221230
return EncryptionScheme.BTHOME_BINDKEY
222231
case UuidType.V2:
223-
if is_encrypted(self._get_service_data()):
232+
if self._is_encrypted():
224233
return EncryptionScheme.BTHOME_BINDKEY
225234
return EncryptionScheme.NONE
226235

227-
def get_address(self) -> str:
228-
"""Returns the mac address."""
229-
return self.service_info.address
230-
231-
def get_time(self) -> float:
232-
"""Returns the time stamp."""
233-
return self.service_info.time
234-
235236
def get_bthome_version(self) -> BTHomeVersion:
236237
"""
237238
Returns the bhtome version based on UUID for V1 and version bits from
@@ -241,7 +242,7 @@ def get_bthome_version(self) -> BTHomeVersion:
241242
case UuidType.V1_NON_ENCRYPTED | UuidType.V1_ENCRYPTED:
242243
return BTHomeVersion.V1
243244
case UuidType.V2:
244-
sw_version = get_version(self._get_service_data())
245+
sw_version = self._get_version()
245246
if sw_version != 2:
246247
identifier = short_address(self.service_info.address)
247248
_LOGGER.error(
@@ -253,14 +254,6 @@ def get_bthome_version(self) -> BTHomeVersion:
253254
return BTHomeVersion.INVALID
254255
return BTHomeVersion.V2
255256

256-
def get_counter(self) -> bytes:
257-
"""Extracts the encryption counter bytes."""
258-
return self._get_service_data()[-8:-4]
259-
260-
def get_mic(self) -> bytes:
261-
"""Extracts the encryption mic."""
262-
return self._get_service_data()[-4:]
263-
264257
def get_name(self) -> str:
265258
"""Returns the name of the device."""
266259
name = self.service_info.name
@@ -282,7 +275,7 @@ def is_sleepy_device(self) -> bool:
282275
case BTHomeVersion.V1:
283276
return False
284277
case BTHomeVersion.V2:
285-
return is_sleepy_device(self._get_service_data())
278+
return self._is_sleepy_device()
286279
case _:
287280
raise ValueError
288281

@@ -292,7 +285,7 @@ def get_nounce_uuid(self) -> bytes:
292285
case BTHomeVersion.V1:
293286
return b"\x1e\x18"
294287
case BTHomeVersion.V2:
295-
adv_info = get_adv_info(self._get_service_data())
288+
adv_info = self._get_adv_info()
296289
return b"\xd2\xfc" + bytes([adv_info])
297290
case _:
298291
raise ValueError
@@ -303,8 +296,8 @@ def get_mac_readable(self) -> str:
303296
case BTHomeVersion.V1:
304297
return self.get_address()
305298
case BTHomeVersion.V2:
306-
if is_mac_included(self._get_service_data()):
307-
return get_mac(self._get_service_data())
299+
if self._is_mac_included():
300+
return self._get_mac()
308301
return self.get_address()
309302
case _:
310303
raise ValueError
@@ -325,10 +318,23 @@ def get_payload(self) -> bytes:
325318
case BTHomeVersion.V1:
326319
return self._get_service_data()
327320
case BTHomeVersion.V2:
328-
return get_payload(self._get_service_data())
321+
return self._get_payload()
329322
case _:
330323
raise ValueError
331324

325+
def get_encrypted_payload(self) -> bytes:
326+
"""Removes the last 8 bytes (mic and encryption counter) from payload."""
327+
return self.get_payload()[:-8]
328+
329+
def get_nonce(self) -> bytes:
330+
"""Creates the nounce for decryption."""
331+
counter = self.get_counter()
332+
mac_readable = self.get_mac_readable()
333+
bthome_mac = bytes.fromhex(mac_readable.replace(":", ""))
334+
uuid = self.get_nounce_uuid()
335+
# nonce: mac [6], uuid16 [2 (v1) or 3 (v2)], counter [4]
336+
return b"".join([bthome_mac, uuid, counter])
337+
332338
def get_minimum_payload_length(self) -> int:
333339
"""Returns the minimum payload length."""
334340
match self.get_bthome_version():
@@ -846,19 +852,6 @@ def _check_minimum_length(self, bthome_data: BTHomeData, payload: bytes) -> None
846852
)
847853
raise ValueError
848854

849-
def _get_encrypted_payload(self, bthome_data: BTHomeData) -> bytes:
850-
"""Removes the last 8 bytes (mic and encryption counter) from payload."""
851-
return bthome_data.get_payload()[:-8]
852-
853-
def _get_nonce(self, bthome_data: BTHomeData) -> bytes:
854-
"""Creates the nounce for decryption."""
855-
counter = bthome_data.get_counter()
856-
mac_readable = bthome_data.get_address()
857-
bthome_mac = bytes.fromhex(mac_readable.replace(":", ""))
858-
uuid = bthome_data.get_nounce_uuid()
859-
# nonce: mac [6], uuid16 [2 (v1) or 3 (v2)], counter [4]
860-
return b"".join([bthome_mac, uuid, counter])
861-
862855
def _handle_decryption_error(
863856
self,
864857
error: InvalidTag,
@@ -903,11 +896,11 @@ def _decrypt_bthome(
903896
new_encryption_counter = parse_uint(bthome_data.get_counter())
904897
self._check_encryption_counter(new_encryption_counter)
905898

906-
encrypted_payload = self._get_encrypted_payload(bthome_data)
899+
encrypted_payload = bthome_data.get_encrypted_payload()
907900
self._check_minimum_length(bthome_data, encrypted_payload)
908901

909902
mic = bthome_data.get_mic()
910-
nonce = self._get_nonce(bthome_data)
903+
nonce = bthome_data.get_nonce()
911904
associated_data = bthome_data.get_associated_data()
912905

913906
# decrypt the data

0 commit comments

Comments
 (0)