Skip to content

Commit 16664f6

Browse files
greyeeebdraco
andauthored
Add support Relay Switch 1 and Relay Switch 1PM (#263)
Co-authored-by: J. Nick Koston <[email protected]>
1 parent 97c7ebc commit 16664f6

File tree

5 files changed

+206
-0
lines changed

5 files changed

+206
-0
lines changed

switchbot/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from .devices.light_strip import SwitchbotLightStrip
2727
from .devices.lock import SwitchbotLock
2828
from .devices.plug import SwitchbotPlugMini
29+
from .devices.relay_switch import SwitchbotRelaySwitch
2930
from .discovery import GetSwitchbotDevices
3031
from .models import SwitchBotAdvertisement
3132

@@ -54,4 +55,5 @@
5455
"SwitchbotModel",
5556
"SwitchbotLock",
5657
"SwitchbotBlindTilt",
58+
"SwitchbotRelaySwitch",
5759
]

switchbot/adv_parser.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
from .adv_parsers.meter import process_wosensorth, process_wosensorth_c
2424
from .adv_parsers.motion import process_wopresence
2525
from .adv_parsers.plug import process_woplugmini
26+
from .adv_parsers.relay_switch import (
27+
process_worelay_switch_1plus,
28+
process_worelay_switch_1pm,
29+
)
2630
from .const import SwitchbotModel
2731
from .models import SwitchBotAdvertisement
2832

@@ -173,6 +177,18 @@ class SwitchbotSupportedType(TypedDict):
173177
"func": process_woblindtilt,
174178
"manufacturer_id": 2409,
175179
},
180+
"<": {
181+
"modelName": SwitchbotModel.RelaySwitch1PM,
182+
"modelFriendlyName": "Relay Switch 1PM",
183+
"func": process_worelay_switch_1pm,
184+
"manufacturer_id": 2409,
185+
},
186+
";": {
187+
"modelName": SwitchbotModel.RelaySwitch1Plus,
188+
"modelFriendlyName": "Relay Switch 1",
189+
"func": process_worelay_switch_1plus,
190+
"manufacturer_id": 2409,
191+
},
176192
}
177193

178194
_SWITCHBOT_MODEL_TO_CHAR = {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""Relay Switch adv parser."""
2+
from __future__ import annotations
3+
4+
5+
def process_worelay_switch_1pm(
6+
data: bytes | None, mfr_data: bytes | None
7+
) -> dict[str, bool | int]:
8+
"""Process WoStrip services data."""
9+
if mfr_data is None:
10+
return {}
11+
return {
12+
"switchMode": True, # for compatibility, useless
13+
"sequence_number": mfr_data[6],
14+
"isOn": bool(mfr_data[7] & 0b10000000),
15+
"power": ((mfr_data[10] << 8) + mfr_data[11]) / 10,
16+
"voltage": 0,
17+
"current": 0,
18+
}
19+
20+
21+
def process_worelay_switch_1plus(
22+
data: bytes | None, mfr_data: bytes | None
23+
) -> dict[str, bool | int]:
24+
"""Process WoStrip services data."""
25+
if mfr_data is None:
26+
return {}
27+
return {
28+
"switchMode": True, # for compatibility, useless
29+
"sequence_number": mfr_data[6],
30+
"isOn": bool(mfr_data[7] & 0b10000000),
31+
}

switchbot/const.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class SwitchbotModel(StrEnum):
5252
LOCK_PRO = "WoLockPro"
5353
BLIND_TILT = "WoBlindTilt"
5454
HUB2 = "WoHub2"
55+
RelaySwitch1PM = "Relay Switch 1PM"
56+
RelaySwitch1Plus = "Relay Switch 1"
5557

5658

5759
class LockStatus(Enum):

switchbot/devices/relay_switch.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import time
2+
from typing import Any
3+
4+
from bleak.backends.device import BLEDevice
5+
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
6+
7+
from ..const import SwitchbotModel
8+
from .device import SwitchbotSequenceDevice
9+
10+
COMMAND_HEADER = "57"
11+
COMMAND_GET_CK_IV = f"{COMMAND_HEADER}0f2103"
12+
COMMAND_TURN_OFF = f"{COMMAND_HEADER}0f70010000"
13+
COMMAND_TURN_ON = f"{COMMAND_HEADER}0f70010100"
14+
COMMAND_TOGGLE = f"{COMMAND_HEADER}0f70010200"
15+
COMMAND_GET_VOLTAGE_AND_CURRENT = f"{COMMAND_HEADER}0f7106000000"
16+
PASSIVE_POLL_INTERVAL = 1 * 60
17+
18+
19+
class SwitchbotRelaySwitch(SwitchbotSequenceDevice):
20+
"""Representation of a Switchbot relay switch 1pm."""
21+
22+
def __init__(
23+
self,
24+
device: BLEDevice,
25+
key_id: str,
26+
encryption_key: str,
27+
interface: int = 0,
28+
model: SwitchbotModel = SwitchbotModel.RelaySwitch1PM,
29+
**kwargs: Any,
30+
) -> None:
31+
if len(key_id) == 0:
32+
raise ValueError("key_id is missing")
33+
elif len(key_id) != 2:
34+
raise ValueError("key_id is invalid")
35+
if len(encryption_key) == 0:
36+
raise ValueError("encryption_key is missing")
37+
elif len(encryption_key) != 32:
38+
raise ValueError("encryption_key is invalid")
39+
self._iv = None
40+
self._cipher = None
41+
self._key_id = key_id
42+
self._encryption_key = bytearray.fromhex(encryption_key)
43+
self._model: SwitchbotModel = model
44+
super().__init__(device, None, interface, **kwargs)
45+
46+
async def update(self, interface: int | None = None) -> None:
47+
"""Update state of device."""
48+
if info := await self.get_voltage_and_current():
49+
self._last_full_update = time.monotonic()
50+
self._update_parsed_data(info)
51+
self._fire_callbacks()
52+
53+
async def get_voltage_and_current(self) -> dict[str, Any] | None:
54+
"""Get voltage and current because advtisement don't have these"""
55+
result = await self._send_command(COMMAND_GET_VOLTAGE_AND_CURRENT)
56+
ok = self._check_command_result(result, 0, {1})
57+
if ok:
58+
return {
59+
"voltage": (result[9] << 8) + result[10],
60+
"current": (result[11] << 8) + result[12],
61+
}
62+
return None
63+
64+
def poll_needed(self, seconds_since_last_poll: float | None) -> bool:
65+
"""Return if device needs polling."""
66+
if (
67+
seconds_since_last_poll is not None
68+
and seconds_since_last_poll < PASSIVE_POLL_INTERVAL
69+
):
70+
return False
71+
time_since_last_full_update = time.monotonic() - self._last_full_update
72+
if time_since_last_full_update < PASSIVE_POLL_INTERVAL:
73+
return False
74+
return True
75+
76+
async def turn_on(self) -> bool:
77+
"""Turn device on."""
78+
result = await self._send_command(COMMAND_TURN_ON)
79+
ok = self._check_command_result(result, 0, {1})
80+
if ok:
81+
self._override_state({"isOn": True})
82+
self._fire_callbacks()
83+
return ok
84+
85+
async def turn_off(self) -> bool:
86+
"""Turn device off."""
87+
result = await self._send_command(COMMAND_TURN_OFF)
88+
ok = self._check_command_result(result, 0, {1})
89+
if ok:
90+
self._override_state({"isOn": False})
91+
self._fire_callbacks()
92+
return ok
93+
94+
async def async_toggle(self, **kwargs) -> bool:
95+
"""Toggle device."""
96+
result = await self._send_command(COMMAND_TOGGLE)
97+
status = self._check_command_result(result, 0, {1})
98+
return status
99+
100+
def is_on(self) -> bool | None:
101+
"""Return switch state from cache."""
102+
return self._get_adv_value("isOn")
103+
104+
async def _send_command(
105+
self, key: str, retry: int | None = None, encrypt: bool = True
106+
) -> bytes | None:
107+
if not encrypt:
108+
return await super()._send_command(key[:2] + "000000" + key[2:], retry)
109+
110+
result = await self._ensure_encryption_initialized()
111+
if not result:
112+
return None
113+
114+
encrypted = (
115+
key[:2] + self._key_id + self._iv[0:2].hex() + self._encrypt(key[2:])
116+
)
117+
result = await super()._send_command(encrypted, retry)
118+
return result[:1] + self._decrypt(result[4:])
119+
120+
async def _ensure_encryption_initialized(self) -> bool:
121+
if self._iv is not None:
122+
return True
123+
124+
result = await self._send_command(
125+
COMMAND_GET_CK_IV + self._key_id, encrypt=False
126+
)
127+
ok = self._check_command_result(result, 0, {1})
128+
if ok:
129+
self._iv = result[4:]
130+
131+
return ok
132+
133+
async def _execute_disconnect(self) -> None:
134+
await super()._execute_disconnect()
135+
self._iv = None
136+
self._cipher = None
137+
138+
def _get_cipher(self) -> Cipher:
139+
if self._cipher is None:
140+
self._cipher = Cipher(
141+
algorithms.AES128(self._encryption_key), modes.CTR(self._iv)
142+
)
143+
return self._cipher
144+
145+
def _encrypt(self, data: str) -> str:
146+
if len(data) == 0:
147+
return ""
148+
encryptor = self._get_cipher().encryptor()
149+
return (encryptor.update(bytearray.fromhex(data)) + encryptor.finalize()).hex()
150+
151+
def _decrypt(self, data: bytearray) -> bytes:
152+
if len(data) == 0:
153+
return b""
154+
decryptor = self._get_cipher().decryptor()
155+
return decryptor.update(data) + decryptor.finalize()

0 commit comments

Comments
 (0)