Skip to content

Commit 90659c1

Browse files
committed
Victron Energy Decryption
1 parent 77201b2 commit 90659c1

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

TheengsGateway/ble_gateway.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,12 @@ def handle_encrypted_advertisement(
715715
decoded_json = decodeBLE(json.dumps(data_json))
716716
if decoded_json:
717717
decoded_json = json.loads(decoded_json) # type: ignore[arg-type]
718+
# Check if the mic matches the first byte of the bindkey for bindkey verification
719+
elif mic != bindkey[:1].hex():
720+
logger.exception(
721+
"Bindkey does not seem to be correct for `%s`",
722+
get_address(decoded_json),
723+
)
718724
else:
719725
logger.exception(
720726
"Decrypted payload not supported: `%s`",

TheengsGateway/decryption.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,55 @@ def replace_encrypted_data(
135135
bthome_service_data.extend(decrypted_data)
136136
data_json["servicedata"] = bthome_service_data.hex()
137137

138+
class VictronDecryptor(AdvertisementDecryptor):
139+
"""Class for decryption of Victron Energy encrypted advertisements."""
140+
141+
def compute_nonce(self, address: str, decoded_json: dict) -> bytes:
142+
"""Get the nonce from a specific address and JSON input."""
143+
# The nonce is provided in the message and needs to be padded to 8 bytes
144+
nonce = bytes.fromhex(decoded_json["ctr"])
145+
nonce = nonce.ljust(8, b"\x00") # Pad to 8 bytes with zeros
146+
return nonce
147+
148+
def decrypt(
149+
self,
150+
bindkey: bytes,
151+
address: str,
152+
decoded_json: dict,
153+
) -> bytes:
154+
"""Decrypt ciphertext from JSON input with AES CTR."""
155+
nonce = self.compute_nonce(address, decoded_json)
156+
cipher = AES.new(bindkey, AES.MODE_CTR, nonce=nonce)
157+
payload = bytes.fromhex(decoded_json["cipher"])
158+
decrypted_data = cipher.decrypt(payload)
159+
return decrypted_data
160+
161+
def replace_encrypted_data(
162+
self,
163+
decrypted_data: bytes,
164+
data_json: dict,
165+
decoded_json: dict,
166+
) -> None:
167+
"""Replace the encrypted data with decrypted payload."""
168+
# Extract the first 10 octets of the manufacturer data
169+
victron_manufacturer_data = bytearray(bytes.fromhex(decoded_json["manufacturerdata"][:20]))
170+
171+
# Replace indexes 4-5 and 14-17 with "11" and "ffff" to indicate decrypted data
172+
victron_manufacturer_data[2:3] = binascii.unhexlify("11")
173+
victron_manufacturer_data[7:9] = binascii.unhexlify("ffff")
174+
175+
# Append the decrypted payload to the manufacturer data
176+
victron_manufacturer_data.extend(decrypted_data)
177+
178+
# Update the manufacturerdata field in the JSON
179+
data_json["manufacturerdata"] = victron_manufacturer_data.hex()
138180

139181
_DECRYPTORS = {
140182
1: LYWSD03MMC_PVVXDecryptor,
141183
2: BTHomeV2Decryptor,
184+
3: VictronDecryptor,
142185
}
143186

144-
145187
class UnsupportedEncryptionError(Exception):
146188
"""Exception raised when trying to decrypt an unsupported device."""
147189

0 commit comments

Comments
 (0)