Skip to content

Commit 9c969d2

Browse files
dsypniewskibdraco
andauthored
Added lock model with lock and unlock support (#164)
Co-authored-by: J. Nick Koston <[email protected]>
1 parent a804c78 commit 9c969d2

File tree

6 files changed

+340
-9
lines changed

6 files changed

+340
-9
lines changed

scripts/get_encryption_key.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#!/usr/bin/env python3
2+
import base64
3+
import getpass
4+
import hashlib
5+
import hmac
6+
import json
7+
import sys
8+
9+
import boto3
10+
import requests
11+
12+
# Those values have been obtained from the following files in SwitchBot Android app
13+
# That's how you can verify them yourself
14+
# /assets/switchbot_config.json
15+
# /res/raw/amplifyconfiguration.json
16+
# /res/raw/awsconfiguration.json
17+
SWITCHBOT_INTERNAL_API_BASE_URL = (
18+
"https://l9ren7efdj.execute-api.us-east-1.amazonaws.com"
19+
)
20+
SWITCHBOT_COGNITO_POOL = {
21+
"PoolId": "us-east-1_x1fixo5LC",
22+
"AppClientId": "66r90hdllaj4nnlne4qna0muls",
23+
"AppClientSecret": "1v3v7vfjsiggiupkeuqvsovg084e3msbefpj9rgh611u30uug6t8",
24+
"Region": "us-east-1",
25+
}
26+
27+
28+
def main():
29+
if len(sys.argv) < 3:
30+
print(f"Usage: {sys.argv[0]} <device_mac> <username> [<password>]")
31+
exit(1)
32+
33+
device_mac = sys.argv[1].replace(":", "").replace("-", "").upper()
34+
username = sys.argv[2]
35+
if len(sys.argv) == 3:
36+
password = getpass.getpass()
37+
else:
38+
password = sys.argv[3]
39+
40+
msg = bytes(username + SWITCHBOT_COGNITO_POOL["AppClientId"], "utf-8")
41+
secret_hash = base64.b64encode(
42+
hmac.new(
43+
SWITCHBOT_COGNITO_POOL["AppClientSecret"].encode(),
44+
msg,
45+
digestmod=hashlib.sha256,
46+
).digest()
47+
).decode()
48+
49+
cognito_idp_client = boto3.client(
50+
"cognito-idp", region_name=SWITCHBOT_COGNITO_POOL["Region"]
51+
)
52+
auth_response = None
53+
try:
54+
auth_response = cognito_idp_client.initiate_auth(
55+
ClientId=SWITCHBOT_COGNITO_POOL["AppClientId"],
56+
AuthFlow="USER_PASSWORD_AUTH",
57+
AuthParameters={
58+
"USERNAME": username,
59+
"PASSWORD": password,
60+
"SECRET_HASH": secret_hash,
61+
},
62+
)
63+
except cognito_idp_client.exceptions.NotAuthorizedException as e:
64+
print(f"Error: Failed to authenticate - {e}")
65+
exit(1)
66+
except BaseException as e:
67+
print(f"Error: Unexpected error during authentication - {e}")
68+
exit(1)
69+
70+
if (
71+
auth_response is None
72+
or "AuthenticationResult" not in auth_response
73+
or "AccessToken" not in auth_response["AuthenticationResult"]
74+
):
75+
print(f"Error: unexpected authentication result")
76+
exit(1)
77+
78+
access_token = auth_response["AuthenticationResult"]["AccessToken"]
79+
key_response = requests.post(
80+
url=SWITCHBOT_INTERNAL_API_BASE_URL + "/developStage/keys/v1/communicate",
81+
headers={"authorization": access_token},
82+
json={"device_mac": device_mac, "keyType": "user"},
83+
)
84+
key_response_content = json.loads(key_response.content)
85+
if key_response_content["statusCode"] != 100:
86+
print(
87+
"Error: {} ({})".format(
88+
key_response_content["message"], key_response_content["statusCode"]
89+
)
90+
)
91+
exit(1)
92+
93+
print("Key ID: " + key_response_content["body"]["communicationKey"]["keyId"])
94+
print("Encryption key: " + key_response_content["body"]["communicationKey"]["key"])
95+
96+
97+
if __name__ == "__main__":
98+
main()

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
setup(
44
name="PySwitchbot",
55
packages=["switchbot", "switchbot.devices", "switchbot.adv_parsers"],
6-
install_requires=["async_timeout>=4.0.1", "bleak>=0.17.0", "bleak-retry-connector>=2.9.0"],
6+
install_requires=["async_timeout>=4.0.1", "bleak>=0.17.0", "bleak-retry-connector>=2.9.0", "cryptography>=38.0.3"],
77
version="0.30.1",
88
description="A library to communicate with Switchbot",
99
author="Daniel Hjelseth Hoyer",

switchbot/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .devices.device import ColorMode, SwitchbotDevice
1414
from .devices.humidifier import SwitchbotHumidifier
1515
from .devices.light_strip import SwitchbotLightStrip
16+
from .devices.lock import SwitchbotLock
1617
from .devices.plug import SwitchbotPlugMini
1718
from .discovery import GetSwitchbotDevices
1819
from .models import SwitchBotAdvertisement
@@ -35,4 +36,5 @@
3536
"SwitchbotPlugMini",
3637
"SwitchbotSupportedType",
3738
"SwitchbotModel",
39+
"SwitchbotLock",
3840
]

switchbot/adv_parsers/lock.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def process_wolock(data: bytes | None, mfr_data: bytes | None) -> dict[str, bool
1919
return {
2020
"battery": data[2] & 0b01111111 if data else None,
2121
"calibration": bool(mfr_data[7] & 0b10000000),
22-
"status": LockStatus(mfr_data[7] & 0b01110000),
22+
"status": LockStatus((mfr_data[7] & 0b01110000) >> 4),
2323
"update_from_secondary_lock": bool(mfr_data[7] & 0b00001000),
2424
"door_open": bool(mfr_data[7] & 0b00000100),
2525
"double_lock_mode": bool(mfr_data[8] & 0b10000000),

switchbot/const.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ class SwitchbotModel(StrEnum):
2626

2727

2828
class LockStatus(Enum):
29-
LOCKED = 0b0000000
30-
UNLOCKED = 0b0010000
31-
LOCKING = 0b0100000
32-
UNLOCKING = 0b0110000
33-
LOCKING_STOP = 0b1000000
34-
UNLOCKING_STOP = 0b1010000
35-
NOT_FULLY_LOCKED = 0b1100000 # Only EU lock type
29+
LOCKED = 0
30+
UNLOCKED = 1
31+
LOCKING = 2
32+
UNLOCKING = 3
33+
LOCKING_STOP = 4 # LOCKING_BLOCKED
34+
UNLOCKING_STOP = 5 # UNLOCKING_BLOCKED
35+
NOT_FULLY_LOCKED = 6 # LATCH_LOCKED - Only EU lock type

switchbot/devices/lock.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,232 @@
1+
"""Library to handle connection with Switchbot Lock."""
12
from __future__ import annotations
3+
4+
import asyncio
5+
import logging
6+
from typing import Any
7+
8+
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
9+
10+
from ..const import LockStatus
11+
from ..models import SwitchBotAdvertisement
12+
from .device import SwitchbotDevice
13+
14+
COMMAND_HEADER = "57"
15+
COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
16+
COMMAND_LOCK_INFO = f"{COMMAND_HEADER}0f4f8101"
17+
COMMAND_UNLOCK = f"{COMMAND_HEADER}0f4e01011080"
18+
COMMAND_LOCK = f"{COMMAND_HEADER}0f4e01011000"
19+
COMMAND_ENABLE_NOTIFICATIONS = f"{COMMAND_HEADER}0e01001e00008101"
20+
COMMAND_DISABLE_NOTIFICATIONS = f"{COMMAND_HEADER}0e00"
21+
22+
MOVING_STATUSES = {LockStatus.LOCKING, LockStatus.UNLOCKING}
23+
BLOCKED_STATUSES = {LockStatus.LOCKING_STOP, LockStatus.UNLOCKING_STOP}
24+
REST_STATUSES = {LockStatus.LOCKED, LockStatus.UNLOCKED, LockStatus.NOT_FULLY_LOCKED}
25+
26+
_LOGGER = logging.getLogger(__name__)
27+
28+
29+
class SwitchbotLock(SwitchbotDevice):
30+
"""Representation of a Switchbot Lock."""
31+
32+
def __init__(
33+
self,
34+
advertisement: SwitchBotAdvertisement,
35+
key_id: str,
36+
encryption_key: str,
37+
interface: int = 0,
38+
**kwargs: Any,
39+
) -> None:
40+
if len(key_id) == 0:
41+
raise ValueError("key_id is missing")
42+
elif len(key_id) != 2:
43+
raise ValueError("key_id is invalid")
44+
if len(encryption_key) == 0:
45+
raise ValueError("encryption_key is missing")
46+
elif len(encryption_key) != 32:
47+
raise ValueError("encryption_key is invalid")
48+
self._iv = None
49+
self._cipher = None
50+
self._key_id = key_id
51+
self._encryption_key = bytearray.fromhex(encryption_key)
52+
self._notifications_enabled: bool = False
53+
super().__init__(advertisement.device, None, interface, **kwargs)
54+
self.update_from_advertisement(advertisement)
55+
56+
async def lock(self) -> bool:
57+
"""Send lock command."""
58+
return await self._lock_unlock(
59+
COMMAND_LOCK, {LockStatus.LOCKED, LockStatus.LOCKING}
60+
)
61+
62+
async def unlock(self) -> bool:
63+
"""Send unlock command."""
64+
return await self._lock_unlock(
65+
COMMAND_UNLOCK, {LockStatus.UNLOCKED, LockStatus.UNLOCKING}
66+
)
67+
68+
async def _lock_unlock(
69+
self, command: str, ignore_statuses: set[LockStatus]
70+
) -> bool:
71+
status = self.get_lock_status()
72+
if status is None:
73+
await self.update()
74+
status = self.get_lock_status()
75+
if status in ignore_statuses:
76+
return True
77+
78+
await self._enable_notifications()
79+
result = await self._send_command(command)
80+
if not self._check_command_result(result, 0, {1}):
81+
return False
82+
return True
83+
84+
async def get_basic_info(self) -> dict[str, Any] | None:
85+
"""Get device basic status."""
86+
lock_raw_data = await self._get_lock_info()
87+
if not lock_raw_data:
88+
return None
89+
90+
basic_data = await self._get_basic_info()
91+
if not basic_data:
92+
return None
93+
94+
lock_data = self._parse_lock_data(lock_raw_data[1:])
95+
lock_data.update(battery=basic_data[1], firmware=basic_data[2] / 10.0)
96+
97+
return lock_data
98+
99+
def is_calibrated(self) -> Any:
100+
"""Return True if lock is calibrated."""
101+
return self._get_adv_value("calibration")
102+
103+
def get_lock_status(self) -> LockStatus:
104+
"""Return lock status."""
105+
return self._get_adv_value("status")
106+
107+
def is_door_open(self) -> bool:
108+
"""Return True if door is open."""
109+
return self._get_adv_value("door_open")
110+
111+
def is_unclosed_alarm_on(self) -> bool:
112+
"""Return True if unclosed door alarm is on."""
113+
return self._get_adv_value("unclosed_alarm")
114+
115+
def is_unlocked_alarm_on(self) -> bool:
116+
"""Return True if lock unlocked alarm is on."""
117+
return self._get_adv_value("unlocked_alarm")
118+
119+
def is_auto_lock_paused(self) -> bool:
120+
"""Return True if auto lock is paused."""
121+
return self._get_adv_value("auto_lock_paused")
122+
123+
async def _get_lock_info(self) -> bytes | None:
124+
"""Return lock info of device."""
125+
_data = await self._send_command(key=COMMAND_LOCK_INFO, retry=self._retry_count)
126+
127+
if not self._check_command_result(_data, 0, {1}):
128+
_LOGGER.error("Unsuccessful, please try again")
129+
return None
130+
131+
return _data
132+
133+
async def _enable_notifications(self) -> bool:
134+
if self._notifications_enabled:
135+
return True
136+
result = await self._send_command(COMMAND_ENABLE_NOTIFICATIONS)
137+
if self._check_command_result(result, 0, {1}):
138+
self._notifications_enabled = True
139+
return self._notifications_enabled
140+
141+
async def _disable_notifications(self) -> bool:
142+
if not self._notifications_enabled:
143+
return True
144+
result = await self._send_command(COMMAND_DISABLE_NOTIFICATIONS)
145+
if self._check_command_result(result, 0, {1}):
146+
self._notifications_enabled = False
147+
return not self._notifications_enabled
148+
149+
def _notification_handler(self, _sender: int, data: bytearray) -> None:
150+
if self._notifications_enabled and self._check_command_result(data, 0, {0xF}):
151+
self._update_lock_status(data)
152+
else:
153+
super()._notification_handler(_sender, data)
154+
155+
def _update_lock_status(self, data: bytearray) -> None:
156+
data = self._decrypt(data[4:])
157+
lock_data = self._parse_lock_data(data)
158+
current_status = self.get_lock_status()
159+
if (
160+
lock_data["status"] != current_status or current_status not in REST_STATUSES
161+
) and (
162+
lock_data["status"] in REST_STATUSES
163+
or lock_data["status"] in BLOCKED_STATUSES
164+
):
165+
asyncio.create_task(self._disable_notifications())
166+
167+
self._update_parsed_data(lock_data)
168+
169+
@staticmethod
170+
def _parse_lock_data(data: bytes) -> dict[str, Any]:
171+
return {
172+
"calibration": bool(data[0] & 0b10000000),
173+
"status": LockStatus((data[0] & 0b01110000) >> 4),
174+
"door_open": bool(data[0] & 0b00000100),
175+
"unclosed_alarm": bool(data[1] & 0b00100000),
176+
"unlocked_alarm": bool(data[1] & 0b00010000),
177+
}
178+
179+
async def _send_command(
180+
self, key: str, retry: int | None = None, encrypt: bool = True
181+
) -> bytes | None:
182+
if not encrypt:
183+
return await super()._send_command(key[:2] + "000000" + key[2:], retry)
184+
185+
result = await self._ensure_encryption_initialized()
186+
if not result:
187+
_LOGGER.error("Failed to initialize encryption")
188+
return None
189+
190+
encrypted = (
191+
key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
192+
)
193+
result = await super()._send_command(encrypted, retry)
194+
return result[:1] + self._decrypt(result[4:])
195+
196+
async def _ensure_encryption_initialized(self) -> bool:
197+
if self._iv is not None:
198+
return True
199+
200+
result = await self._send_command(
201+
COMMAND_GET_CK_IV + self._key_id, encrypt=False
202+
)
203+
ok = self._check_command_result(result, 0, {0x01})
204+
if ok:
205+
self._iv = result[4:]
206+
207+
return ok
208+
209+
async def _execute_disconnect(self) -> None:
210+
await super()._execute_disconnect()
211+
self._iv = None
212+
self._cipher = None
213+
self._notifications_enabled = False
214+
215+
def _get_cipher(self) -> Cipher:
216+
if self._cipher is None:
217+
self._cipher = Cipher(
218+
algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
219+
)
220+
return self._cipher
221+
222+
def _encrypt(self, data: str) -> str:
223+
if len(data) == 0:
224+
return ""
225+
encryptor = self._get_cipher().encryptor()
226+
return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
227+
228+
def _decrypt(self, data: bytearray) -> bytes:
229+
if len(data) == 0:
230+
return b""
231+
decryptor = self._get_cipher().decryptor()
232+
return decryptor.update(data) + decryptor.finalize()

0 commit comments

Comments
 (0)