Skip to content

Commit b510273

Browse files
Add support for switchbot air purifier (#329)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent cea7ea0 commit b510273

File tree

8 files changed

+665
-0
lines changed

8 files changed

+665
-0
lines changed

switchbot/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010

1111
from .adv_parser import SwitchbotSupportedType, parse_advertisement_data
1212
from .const import (
13+
AirPurifierMode,
1314
FanMode,
1415
LockStatus,
1516
SwitchbotAccountConnectionError,
1617
SwitchbotApiError,
1718
SwitchbotAuthenticationError,
1819
SwitchbotModel,
1920
)
21+
from .devices.air_purifier import SwitchbotAirPurifier
2022
from .devices.base_light import SwitchbotBaseLight
2123
from .devices.blind_tilt import SwitchbotBlindTilt
2224
from .devices.bot import Switchbot
@@ -37,6 +39,7 @@
3739
from .models import SwitchBotAdvertisement
3840

3941
__all__ = [
42+
"AirPurifierMode",
4043
"ColorMode",
4144
"FanMode",
4245
"GetSwitchbotDevices",
@@ -45,6 +48,7 @@
4548
"Switchbot",
4649
"Switchbot",
4750
"SwitchbotAccountConnectionError",
51+
"SwitchbotAirPurifier",
4852
"SwitchbotApiError",
4953
"SwitchbotAuthenticationError",
5054
"SwitchbotBaseLight",

switchbot/adv_parser.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from bleak.backends.device import BLEDevice
1111
from bleak.backends.scanner import AdvertisementData
1212

13+
from .adv_parsers.air_purifier import process_air_purifier
1314
from .adv_parsers.blind_tilt import process_woblindtilt
1415
from .adv_parsers.bot import process_wohand
1516
from .adv_parsers.bulb import process_color_bulb
@@ -268,6 +269,30 @@ class SwitchbotSupportedType(TypedDict):
268269
"func": process_vacuum_k,
269270
"manufacturer_id": 2409,
270271
},
272+
"*": {
273+
"modelName": SwitchbotModel.AIR_PURIFIER,
274+
"modelFriendlyName": "Air Purifier",
275+
"func": process_air_purifier,
276+
"manufacturer_id": 2409,
277+
},
278+
"+": {
279+
"modelName": SwitchbotModel.AIR_PURIFIER,
280+
"modelFriendlyName": "Air Purifier",
281+
"func": process_air_purifier,
282+
"manufacturer_id": 2409,
283+
},
284+
"7": {
285+
"modelName": SwitchbotModel.AIR_PURIFIER,
286+
"modelFriendlyName": "Air Purifier",
287+
"func": process_air_purifier,
288+
"manufacturer_id": 2409,
289+
},
290+
"8": {
291+
"modelName": SwitchbotModel.AIR_PURIFIER,
292+
"modelFriendlyName": "Air Purifier",
293+
"func": process_air_purifier,
294+
"manufacturer_id": 2409,
295+
},
271296
}
272297

273298
_SWITCHBOT_MODEL_TO_CHAR = {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Air Purifier adv parser."""
2+
3+
from __future__ import annotations
4+
5+
import struct
6+
7+
from ..const.air_purifier import AirPurifierMode, AirQualityLevel
8+
9+
10+
def process_air_purifier(
11+
data: bytes | None, mfr_data: bytes | None
12+
) -> dict[str, bool | int]:
13+
"""Process air purifier services data."""
14+
if mfr_data is None:
15+
return {}
16+
device_data = mfr_data[6:]
17+
18+
_seq_num = device_data[0]
19+
_isOn = bool(device_data[1] & 0b10000000)
20+
_mode = device_data[1] & 0b00000111
21+
_is_aqi_valid = bool(device_data[2] & 0b00000100)
22+
_child_lock = bool(device_data[2] & 0b00000010)
23+
_speed = device_data[3] & 0b01111111
24+
_aqi_level = (device_data[4] & 0b00000110) >> 1
25+
_aqi_level = AirQualityLevel(_aqi_level).name.lower()
26+
_work_time = struct.unpack(">H", device_data[5:7])[0]
27+
_err_code = device_data[7]
28+
29+
return {
30+
"isOn": _isOn,
31+
"mode": get_air_purifier_mode(_mode, _speed),
32+
"isAqiValid": _is_aqi_valid,
33+
"child_lock": _child_lock,
34+
"speed": _speed,
35+
"aqi_level": _aqi_level,
36+
"filter element working time": _work_time,
37+
"err_code": _err_code,
38+
"sequence_number": _seq_num,
39+
}
40+
41+
42+
def get_air_purifier_mode(mode: int, speed: int) -> str | None:
43+
if mode == 1:
44+
if 0 <= speed <= 33:
45+
return "level_1"
46+
if 34 <= speed <= 66:
47+
return "level_2"
48+
return "level_3"
49+
if 1 < mode <= 4:
50+
mode += 2
51+
return AirPurifierMode(mode).name.lower()
52+
return None

switchbot/const/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
from ..enum import StrEnum
6+
from .air_purifier import AirPurifierMode
67
from .fan import FanMode
78

89
# Preserve old LockStatus export for backwards compatibility
@@ -72,12 +73,14 @@ class SwitchbotModel(StrEnum):
7273
K10_VACUUM = "K10+ Vacuum"
7374
K10_PRO_VACUUM = "K10+ Pro Vacuum"
7475
K10_PRO_COMBO_VACUUM = "K10+ Pro Combo Vacuum"
76+
AIR_PURIFIER = "Air Purifier"
7577

7678

7779
__all__ = [
7880
"DEFAULT_RETRY_COUNT",
7981
"DEFAULT_RETRY_TIMEOUT",
8082
"DEFAULT_SCAN_TIMEOUT",
83+
"AirPurifierMode",
8184
"FanMode",
8285
"LockStatus",
8386
"SwitchbotAccountConnectionError",

switchbot/const/air_purifier.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
from enum import Enum
4+
5+
6+
class AirPurifierMode(Enum):
7+
LEVEL_1 = 1
8+
LEVEL_2 = 2
9+
LEVEL_3 = 3
10+
AUTO = 4
11+
PET = 5
12+
SLEEP = 6
13+
14+
@classmethod
15+
def get_modes(cls) -> list[str]:
16+
return [mode.name.lower() for mode in cls]
17+
18+
19+
class AirQualityLevel(Enum):
20+
EXCELLENT = 0
21+
GOOD = 1
22+
MODERATE = 2
23+
UNHEALTHY = 3

switchbot/devices/air_purifier.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""Library to handle connection with Switchbot."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
import struct
7+
from typing import Any
8+
9+
from bleak.backends.device import BLEDevice
10+
11+
from ..adv_parsers.air_purifier import get_air_purifier_mode
12+
from ..const import SwitchbotModel
13+
from ..const.air_purifier import AirPurifierMode, AirQualityLevel
14+
from .device import (
15+
SwitchbotEncryptedDevice,
16+
SwitchbotSequenceDevice,
17+
update_after_operation,
18+
)
19+
20+
_LOGGER = logging.getLogger(__name__)
21+
22+
23+
COMMAND_HEAD = "570f4c"
24+
COMMAND_TURN_OFF = f"{COMMAND_HEAD}010000"
25+
COMMAND_TURN_ON = f"{COMMAND_HEAD}010100"
26+
COMMAND_SET_MODE = {
27+
AirPurifierMode.LEVEL_1.name.lower(): f"{COMMAND_HEAD}01010100",
28+
AirPurifierMode.LEVEL_2.name.lower(): f"{COMMAND_HEAD}01010132",
29+
AirPurifierMode.LEVEL_3.name.lower(): f"{COMMAND_HEAD}01010164",
30+
AirPurifierMode.AUTO.name.lower(): f"{COMMAND_HEAD}01010200",
31+
AirPurifierMode.PET.name.lower(): f"{COMMAND_HEAD}01010300",
32+
AirPurifierMode.SLEEP.name.lower(): f"{COMMAND_HEAD}01010400",
33+
}
34+
DEVICE_GET_BASIC_SETTINGS_KEY = "570f4d81"
35+
36+
37+
class SwitchbotAirPurifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
38+
"""Representation of a Switchbot Air Purifier."""
39+
40+
def __init__(
41+
self,
42+
device: BLEDevice,
43+
key_id: str,
44+
encryption_key: str,
45+
interface: int = 0,
46+
model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
47+
**kwargs: Any,
48+
) -> None:
49+
super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
50+
51+
@classmethod
52+
async def verify_encryption_key(
53+
cls,
54+
device: BLEDevice,
55+
key_id: str,
56+
encryption_key: str,
57+
model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
58+
**kwargs: Any,
59+
) -> bool:
60+
return await super().verify_encryption_key(
61+
device, key_id, encryption_key, model, **kwargs
62+
)
63+
64+
async def get_basic_info(self) -> dict[str, Any] | None:
65+
"""Get device basic settings."""
66+
if not (_data := await self._get_basic_info()):
67+
return None
68+
69+
_LOGGER.debug("data: %s", _data)
70+
isOn = bool(_data[2] & 0b10000000)
71+
version_info = (_data[2] & 0b00110000) >> 4
72+
_mode = _data[2] & 0b00000111
73+
isAqiValid = bool(_data[3] & 0b00000100)
74+
child_lock = bool(_data[3] & 0b00000010)
75+
_aqi_level = (_data[4] & 0b00000110) >> 1
76+
aqi_level = AirQualityLevel(_aqi_level).name.lower()
77+
speed = _data[6] & 0b01111111
78+
pm25 = struct.unpack("<H", _data[12:14])[0] & 0xFFF
79+
firmware = _data[15] / 10.0
80+
mode = get_air_purifier_mode(_mode, speed)
81+
82+
return {
83+
"isOn": isOn,
84+
"version_info": version_info,
85+
"mode": mode,
86+
"isAqiValid": isAqiValid,
87+
"child_lock": child_lock,
88+
"aqi_level": aqi_level,
89+
"speed": speed,
90+
"pm25": pm25,
91+
"firmware": firmware,
92+
}
93+
94+
async def _get_basic_info(self) -> bytes | None:
95+
"""Return basic info of device."""
96+
_data = await self._send_command(
97+
key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
98+
)
99+
100+
if _data in (b"\x07", b"\x00"):
101+
_LOGGER.error("Unsuccessful, please try again")
102+
return None
103+
104+
return _data
105+
106+
@update_after_operation
107+
async def set_preset_mode(self, preset_mode: str) -> bool:
108+
"""Send command to set air purifier preset_mode."""
109+
result = await self._send_command(COMMAND_SET_MODE[preset_mode])
110+
return self._check_command_result(result, 0, {1})
111+
112+
@update_after_operation
113+
async def turn_on(self) -> bool:
114+
"""Turn on the air purifier."""
115+
result = await self._send_command(COMMAND_TURN_ON)
116+
return self._check_command_result(result, 0, {1})
117+
118+
@update_after_operation
119+
async def turn_off(self) -> bool:
120+
"""Turn off the air purifier."""
121+
result = await self._send_command(COMMAND_TURN_OFF)
122+
return self._check_command_result(result, 0, {1})
123+
124+
def get_current_percentage(self) -> Any:
125+
"""Return cached percentage."""
126+
return self._get_adv_value("speed")
127+
128+
def is_on(self) -> bool | None:
129+
"""Return air purifier state from cache."""
130+
return self._get_adv_value("isOn")
131+
132+
def get_current_aqi_level(self) -> Any:
133+
"""Return cached aqi level."""
134+
return self._get_adv_value("aqi_level")
135+
136+
def get_current_pm25(self) -> Any:
137+
"""Return cached pm25."""
138+
return self._get_adv_value("pm25")
139+
140+
def get_current_mode(self) -> Any:
141+
"""Return cached mode."""
142+
return self._get_adv_value("mode")

0 commit comments

Comments
 (0)