Skip to content

Commit 857597b

Browse files
dcmegliobdraco
andauthored
Refactored common curtain/tilt code to shared base class (#231)
Co-authored-by: J. Nick Koston <[email protected]>
1 parent 7894a85 commit 857597b

File tree

9 files changed

+810
-144
lines changed

9 files changed

+810
-144
lines changed

switchbot/const.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
from enum import Enum
55

6+
from .enum import StrEnum
7+
68
DEFAULT_RETRY_COUNT = 3
79
DEFAULT_RETRY_TIMEOUT = 1
810
DEFAULT_SCAN_TIMEOUT = 5
911

10-
from .enum import StrEnum
11-
1212

1313
class SwitchbotAuthenticationError(RuntimeError):
1414
"""Raised when authentication fails.

switchbot/devices/base_cover.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""Library to handle connection with Switchbot."""
2+
from __future__ import annotations
3+
4+
import logging
5+
from abc import abstractmethod
6+
from typing import Any
7+
8+
from ..models import SwitchBotAdvertisement
9+
from .device import REQ_HEADER, SwitchbotDevice, update_after_operation
10+
11+
# Cover keys
12+
COVER_COMMAND = "4501"
13+
14+
# For second element of open and close arrs we should add two bytes i.e. ff00
15+
# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
16+
# * Only for curtains 3. For other models use ff
17+
# Second byte [00] is a command (00 - open, 64 - close)
18+
POSITION_KEYS = [
19+
f"{REQ_HEADER}{COVER_COMMAND}0101",
20+
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed
21+
] # +actual_position
22+
STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]
23+
24+
COVER_EXT_SUM_KEY = f"{REQ_HEADER}460401"
25+
COVER_EXT_ADV_KEY = f"{REQ_HEADER}460402"
26+
27+
28+
_LOGGER = logging.getLogger(__name__)
29+
30+
31+
class SwitchbotBaseCover(SwitchbotDevice):
32+
"""Representation of a Switchbot Cover devices for both curtains and tilt blinds."""
33+
34+
def __init__(self, reverse: bool, *args: Any, **kwargs: Any) -> None:
35+
"""Switchbot Cover device constructor."""
36+
37+
super().__init__(*args, **kwargs)
38+
self._reverse = reverse
39+
self._settings: dict[str, Any] = {}
40+
self.ext_info_sum: dict[str, Any] = {}
41+
self.ext_info_adv: dict[str, Any] = {}
42+
self._is_opening: bool = False
43+
self._is_closing: bool = False
44+
45+
async def _send_multiple_commands(self, keys: list[str]) -> bool:
46+
"""Send multiple commands to device.
47+
48+
Since we current have no way to tell which command the device
49+
needs we send both.
50+
"""
51+
final_result = False
52+
for key in keys:
53+
result = await self._send_command(key)
54+
final_result |= self._check_command_result(result, 0, {1})
55+
return final_result
56+
57+
@update_after_operation
58+
async def stop(self) -> bool:
59+
"""Send stop command to device."""
60+
return await self._send_multiple_commands(STOP_KEYS)
61+
62+
@update_after_operation
63+
async def set_position(self, position: int, speed: int = 255) -> bool:
64+
"""Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
65+
position = (100 - position) if self._reverse else position
66+
return await self._send_multiple_commands(
67+
[
68+
f"{POSITION_KEYS[0]}{position:02X}",
69+
f"{POSITION_KEYS[1]}{speed:02X}{position:02X}",
70+
]
71+
)
72+
73+
@abstractmethod
74+
def get_position(self) -> Any:
75+
"""Return current device position."""
76+
77+
@abstractmethod
78+
async def get_basic_info(self) -> dict[str, Any] | None:
79+
"""Get device basic settings."""
80+
81+
@abstractmethod
82+
async def get_extended_info_summary(self) -> dict[str, Any] | None:
83+
"""Get extended info for all devices in chain."""
84+
85+
async def get_extended_info_adv(self) -> dict[str, Any] | None:
86+
"""Get advance page info for device chain."""
87+
88+
_data = await self._send_command(key=COVER_EXT_ADV_KEY)
89+
if not _data:
90+
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)
91+
return None
92+
93+
if _data in (b"\x07", b"\x00"):
94+
_LOGGER.error("%s: Unsuccessful, please try again", self.name)
95+
return None
96+
97+
_state_of_charge = [
98+
"not_charging",
99+
"charging_by_adapter",
100+
"charging_by_solar",
101+
"fully_charged",
102+
"solar_not_charging",
103+
"charging_error",
104+
]
105+
106+
self.ext_info_adv["device0"] = {
107+
"battery": _data[1],
108+
"firmware": _data[2] / 10.0,
109+
"stateOfCharge": _state_of_charge[_data[3]],
110+
}
111+
112+
# If grouped curtain device present.
113+
if _data[4]:
114+
self.ext_info_adv["device1"] = {
115+
"battery": _data[4],
116+
"firmware": _data[5] / 10.0,
117+
"stateOfCharge": _state_of_charge[_data[6]],
118+
}
119+
120+
return self.ext_info_adv
121+
122+
def get_light_level(self) -> Any:
123+
"""Return cached light level."""
124+
# To get actual light level call update() first.
125+
return self._get_adv_value("lightLevel")
126+
127+
def is_reversed(self) -> bool:
128+
"""Return True if curtain position is opposite from SB data."""
129+
return self._reverse
130+
131+
def is_calibrated(self) -> Any:
132+
"""Return True curtain is calibrated."""
133+
# To get actual light level call update() first.
134+
return self._get_adv_value("calibration")
135+
136+
def is_opening(self) -> bool:
137+
"""Return True if the curtain is opening."""
138+
return self._is_opening
139+
140+
def is_closing(self) -> bool:
141+
"""Return True if the curtain is closing."""
142+
return self._is_closing

switchbot/devices/base_light.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import logging
5+
import time
46
from abc import abstractmethod
57
from typing import Any
68

9+
from ..models import SwitchBotAdvertisement
710
from .device import ColorMode, SwitchbotDevice
811

912
_LOGGER = logging.getLogger(__name__)
10-
import asyncio
11-
import time
12-
13-
from ..models import SwitchBotAdvertisement
1413

1514

1615
class SwitchbotBaseLight(SwitchbotDevice):

switchbot/devices/blind_tilt.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,27 @@
1010
update_after_operation,
1111
)
1212

13-
from .curtain import CURTAIN_EXT_SUM_KEY, SwitchbotCurtain
13+
from ..models import SwitchBotAdvertisement
14+
from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover
1415

1516
_LOGGER = logging.getLogger(__name__)
1617

1718

18-
BLIND_COMMAND = "4501"
1919
OPEN_KEYS = [
20-
f"{REQ_HEADER}{BLIND_COMMAND}010132",
21-
f"{REQ_HEADER}{BLIND_COMMAND}05ff32",
20+
f"{REQ_HEADER}{COVER_COMMAND}010132",
21+
f"{REQ_HEADER}{COVER_COMMAND}05ff32",
2222
]
2323
CLOSE_DOWN_KEYS = [
24-
f"{REQ_HEADER}{BLIND_COMMAND}010100",
25-
f"{REQ_HEADER}{BLIND_COMMAND}05ff00",
24+
f"{REQ_HEADER}{COVER_COMMAND}010100",
25+
f"{REQ_HEADER}{COVER_COMMAND}05ff00",
2626
]
2727
CLOSE_UP_KEYS = [
28-
f"{REQ_HEADER}{BLIND_COMMAND}010164",
29-
f"{REQ_HEADER}{BLIND_COMMAND}05ff64",
28+
f"{REQ_HEADER}{COVER_COMMAND}010164",
29+
f"{REQ_HEADER}{COVER_COMMAND}05ff64",
3030
]
3131

3232

33-
class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice):
33+
class SwitchbotBlindTilt(SwitchbotBaseCover, SwitchbotSequenceDevice):
3434
"""Representation of a Switchbot Blind Tilt."""
3535

3636
# The position of the blind is saved returned with 0 = closed down, 50 = open and 100 = closed up.
@@ -43,23 +43,52 @@ class SwitchbotBlindTilt(SwitchbotCurtain, SwitchbotSequenceDevice):
4343

4444
def __init__(self, *args: Any, **kwargs: Any) -> None:
4545
"""Switchbot Blind Tilt/woBlindTilt constructor."""
46-
super().__init__(*args, **kwargs)
47-
4846
self._reverse: bool = kwargs.pop("reverse_mode", False)
47+
super().__init__(self._reverse, *args, **kwargs)
48+
49+
def _set_parsed_data(
50+
self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
51+
) -> None:
52+
"""Set data."""
53+
in_motion = data["inMotion"]
54+
previous_tilt = self._get_adv_value("tilt")
55+
new_tilt = data["tilt"]
56+
self._update_motion_direction(in_motion, previous_tilt, new_tilt)
57+
super()._set_parsed_data(advertisement, data)
58+
59+
def _update_motion_direction(
60+
self, in_motion: bool, previous_tilt: int | None, new_tilt: int
61+
) -> None:
62+
"""Update opening/closing status based on movement."""
63+
if previous_tilt is None:
64+
return
65+
if in_motion is False:
66+
self._is_closing = self._is_opening = False
67+
return
68+
69+
if new_tilt != previous_tilt:
70+
self._is_opening = new_tilt > previous_tilt
71+
self._is_closing = new_tilt < previous_tilt
4972

5073
@update_after_operation
5174
async def open(self) -> bool:
5275
"""Send open command."""
76+
self._is_opening = True
77+
self._is_closing = False
5378
return await self._send_multiple_commands(OPEN_KEYS)
5479

5580
@update_after_operation
5681
async def close_up(self) -> bool:
5782
"""Send close up command."""
83+
self._is_opening = False
84+
self._is_closing = True
5885
return await self._send_multiple_commands(CLOSE_UP_KEYS)
5986

6087
@update_after_operation
6188
async def close_down(self) -> bool:
6289
"""Send close down command."""
90+
self._is_opening = False
91+
self._is_closing = True
6392
return await self._send_multiple_commands(CLOSE_DOWN_KEYS)
6493

6594
# The aim of this is to close to the nearest endpoint.
@@ -114,8 +143,8 @@ async def get_basic_info(self) -> dict[str, Any] | None:
114143
}
115144

116145
async def get_extended_info_summary(self) -> dict[str, Any] | None:
117-
"""Get basic info for all devices in chain."""
118-
_data = await self._send_command(key=CURTAIN_EXT_SUM_KEY)
146+
"""Get extended info for all devices in chain."""
147+
_data = await self._send_command(key=COVER_EXT_SUM_KEY)
119148

120149
if not _data:
121150
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)

0 commit comments

Comments
 (0)