44import logging
55from typing import Any
66
7+ from switchbot .models import SwitchBotAdvertisement
8+
79from .device import REQ_HEADER , SwitchbotDevice , update_after_operation
810
911# Curtain keys
@@ -54,6 +56,18 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
5456 self ._settings : dict [str , Any ] = {}
5557 self .ext_info_sum : dict [str , Any ] = {}
5658 self .ext_info_adv : dict [str , Any ] = {}
59+ self ._is_opening : bool = False
60+ self ._is_closing : bool = False
61+
62+ def _set_parsed_data (
63+ self , advertisement : SwitchBotAdvertisement , data : dict [str , Any ]
64+ ) -> None :
65+ """Set data."""
66+ in_motion = data ["inMotion" ]
67+ previous_position = self ._get_adv_value ("position" )
68+ new_position = data ["position" ]
69+ self ._update_motion_direction (in_motion , previous_position , new_position )
70+ super ()._set_parsed_data (advertisement , data )
5771
5872 async def _send_multiple_commands (self , keys : list [str ]) -> bool :
5973 """Send multiple commands to device.
@@ -70,26 +84,32 @@ async def _send_multiple_commands(self, keys: list[str]) -> bool:
7084 @update_after_operation
7185 async def open (self , speed : int = 255 ) -> bool :
7286 """Send open command. Speed 255 - normal, 1 - slow"""
87+ self ._is_opening = True
88+ self ._is_closing = False
7389 return await self ._send_multiple_commands (
7490 [OPEN_KEYS [0 ], f"{ OPEN_KEYS [1 ]} { speed :02X} 00" ]
7591 )
7692
7793 @update_after_operation
7894 async def close (self , speed : int = 255 ) -> bool :
7995 """Send close command. Speed 255 - normal, 1 - slow"""
96+ self ._is_closing = True
97+ self ._is_opening = False
8098 return await self ._send_multiple_commands (
8199 [CLOSE_KEYS [0 ], f"{ CLOSE_KEYS [1 ]} { speed :02X} 64" ]
82100 )
83101
84102 @update_after_operation
85103 async def stop (self ) -> bool :
86104 """Send stop command to device."""
105+ self ._is_opening = self ._is_closing = False
87106 return await self ._send_multiple_commands (STOP_KEYS )
88107
89108 @update_after_operation
90109 async def set_position (self , position : int , speed : int = 255 ) -> bool :
91110 """Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
92111 position = (100 - position ) if self ._reverse else position
112+ self ._update_motion_direction (True , self ._get_adv_value ("position" ), position )
93113 return await self ._send_multiple_commands (
94114 [
95115 f"{ POSITION_KEYS [0 ]} { position :02X} " ,
@@ -108,6 +128,13 @@ async def get_basic_info(self) -> dict[str, Any] | None:
108128 return None
109129
110130 _position = max (min (_data [6 ], 100 ), 0 )
131+ _direction_adjusted_position = (100 - _position ) if self ._reverse else _position
132+ _previous_position = self ._get_adv_value ("position" )
133+ _in_motion = bool (_data [5 ] & 0b01000011 )
134+ self ._update_motion_direction (
135+ _in_motion , _previous_position , _direction_adjusted_position
136+ )
137+
111138 return {
112139 "battery" : _data [1 ],
113140 "firmware" : _data [2 ] / 10.0 ,
@@ -121,11 +148,25 @@ async def get_basic_info(self) -> dict[str, Any] | None:
121148 "solarPanel" : bool (_data [5 ] & 0b00001000 ),
122149 "calibration" : bool (_data [5 ] & 0b00000100 ),
123150 "calibrated" : bool (_data [5 ] & 0b00000100 ),
124- "inMotion" : bool ( _data [ 5 ] & 0b01000011 ) ,
125- "position" : ( 100 - _position ) if self . _reverse else _position ,
151+ "inMotion" : _in_motion ,
152+ "position" : _direction_adjusted_position ,
126153 "timers" : _data [7 ],
127154 }
128155
156+ def _update_motion_direction (
157+ self , in_motion : bool , previous_position : int | None , new_position : int
158+ ) -> None :
159+ """Update opening/closing status based on movement."""
160+ if previous_position is None :
161+ return
162+ if in_motion is False :
163+ self ._is_closing = self ._is_opening = False
164+ return
165+
166+ if new_position != previous_position :
167+ self ._is_opening = new_position > previous_position
168+ self ._is_closing = new_position < previous_position
169+
129170 async def get_extended_info_summary (self ) -> dict [str , Any ] | None :
130171 """Get basic info for all devices in chain."""
131172 _data = await self ._send_command (key = CURTAIN_EXT_SUM_KEY )
@@ -210,3 +251,11 @@ def is_calibrated(self) -> Any:
210251 """Return True curtain is calibrated."""
211252 # To get actual light level call update() first.
212253 return self ._get_adv_value ("calibration" )
254+
255+ def is_opening (self ) -> bool :
256+ """Return True if the curtain is opening."""
257+ return self ._is_opening
258+
259+ def is_closing (self ) -> bool :
260+ """Return True if the curtain is closing."""
261+ return self ._is_closing
0 commit comments