Skip to content

Commit dc382f7

Browse files
committed
Victron Energy Decryption
1 parent 77201b2 commit dc382f7

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

TheengsGateway/ble_gateway.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,12 @@ def handle_encrypted_advertisement(
715715
decoded_json = decodeBLE(json.dumps(data_json))
716716
if decoded_json:
717717
decoded_json = json.loads(decoded_json) # type: ignore[arg-type]
718+
# Check if the mic matches the first byte of the bindkey for bindkey verification
719+
elif mic != bindkey[:1].hex():
720+
logger.exception(
721+
"Bindkey does not seem to be correct for `%s`",
722+
get_address(decoded_json),
723+
)
718724
else:
719725
logger.exception(
720726
"Decrypted payload not supported: `%s`",

TheengsGateway/decryption.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
from Cryptodome.Cipher import AES
1212

13-
1413
class AdvertisementDecryptor(ABC):
1514
"""Abstract class that represents a decryptor for advertisements."""
1615

@@ -135,10 +134,53 @@ def replace_encrypted_data(
135134
bthome_service_data.extend(decrypted_data)
136135
data_json["servicedata"] = bthome_service_data.hex()
137136

137+
class VictronDecryptor(AdvertisementDecryptor):
138+
"""Class for decryption of Victron Energy encrypted advertisements."""
139+
140+
def compute_nonce(self, address: str, decoded_json: dict) -> bytes:
141+
"""Get the nonce from a specific address and JSON input."""
142+
# The nonce is provided in the message and needs to be padded to 8 bytes
143+
nonce = bytes.fromhex(decoded_json["ctr"])
144+
nonce = nonce.ljust(8, b"\x00") # Pad to 8 bytes with zeros
145+
return nonce
146+
147+
def decrypt(
148+
self,
149+
bindkey: bytes,
150+
address: str,
151+
decoded_json: dict,
152+
) -> bytes:
153+
"""Decrypt ciphertext from JSON input with AES CTR."""
154+
nonce = self.compute_nonce(address, decoded_json)
155+
cipher = AES.new(bindkey, AES.MODE_CTR, nonce=nonce)
156+
payload = bytes.fromhex(decoded_json["cipher"])
157+
decrypted_data = cipher.decrypt(payload)
158+
return decrypted_data
159+
160+
def replace_encrypted_data(
161+
self,
162+
decrypted_data: bytes,
163+
data_json: dict,
164+
decoded_json: dict,
165+
) -> None:
166+
"""Replace the encrypted data with decrypted payload."""
167+
# Extract the first 10 octets of the manufacturer data
168+
victron_manufacturer_data = bytearray(bytes.fromhex(decoded_json["manufacturerdata"][:20]))
169+
170+
# Replace indexes 4-5 and 14-17 with "11" and "ffff" to indicate decrypted data
171+
victron_manufacturer_data[2:3] = binascii.unhexlify("11")
172+
victron_manufacturer_data[7:9] = binascii.unhexlify("ffff")
173+
174+
# Append the decrypted payload to the manufacturer data
175+
victron_manufacturer_data.extend(decrypted_data)
176+
177+
# Update the manufacturerdata field in the JSON
178+
data_json["manufacturerdata"] = victron_manufacturer_data.hex()
138179

139180
_DECRYPTORS = {
140181
1: LYWSD03MMC_PVVXDecryptor,
141182
2: BTHomeV2Decryptor,
183+
3: VictronDecryptor,
142184
}
143185

144186

0 commit comments

Comments
 (0)