diff --git a/examples/home_assistant/config.py b/examples/home_assistant/config.py new file mode 100644 index 0000000..d5a5c27 --- /dev/null +++ b/examples/home_assistant/config.py @@ -0,0 +1,53 @@ +# config.py +# +# ip address and port of your home assistant instance +HA_HOST = "http://192.168.1.24:8123" +# +# tokens are created in the home assistant interfacea interface - when logged in: +# Click bottom left on name +# -> Security tab +# -> scroll down +# -> Create token +HA_TOKEN = "" +# +# entities such as light.back_door must exist in your home assistant instance +# Settings +# -> Devices and services +# -> Entities tab +# -> Entity ID + +# Light entities +LIGHT_ENTITIES = { + "back_door": { + "entity_id": "light.back_door", + "display_name": "Back Door Light 1" + }, + "back_door_2": { + "entity_id": "light.back_door_2", + "display_name": "Back Door Light 2" + } +} + +# Sensor entities +SENSORS = { + "motion": { + "entity_id": "binary_sensor.back_door_02_motion", + "display_name": "Back Door Motion" + }, + "dark": { + "entity_id": "binary_sensor.back_door_02_is_dark", + "display_name": "Back Door Darkness" + }, + "voltage": { + "entity_id": "sensor.back_door_02_voltage", + "display_name": "Back Door Voltage" + } +} + +# Camera entities +CAMERAS = { + "back_door": { + "entity_id": "camera.back_door_02_high_resolution_channel", + "display_name": "Back Door Camera" + } +} \ No newline at end of file diff --git a/examples/home_assistant/hampws.py b/examples/home_assistant/hampws.py new file mode 100644 index 0000000..1986cb6 --- /dev/null +++ b/examples/home_assistant/hampws.py @@ -0,0 +1,457 @@ +import usocket as socket +import json +import urandom as random +import ubinascii as base64 +import ustruct as struct +import uhashlib as hashlib +import uselect + +class HAMPWS: + GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' + + def __init__(self, host, token): + """Initialize Home Assistant MicroPython Web Socket""" + print("Initializing HAMPWS...") + self.host = host.replace('http://', '').split(':')[0] + print(f"Parsed host: {self.host}") + self.token = token + self.socket = None + self.connected = False + self.authenticated = False + self.message_id = 1 + self.poller = None + + def _setup_poller(self): + """Setup the socket poller""" + self.poller = uselect.poll() + self.poller.register(self.socket, uselect.POLLIN) + + def _generate_key(self): + """Generate a random 16-byte key for the WebSocket handshake""" + random_bytes = bytes(random.getrandbits(8) for _ in range(16)) + return base64.b2a_base64(random_bytes).decode().strip() + + def _create_accept_key(self, key): + """Create the accept key for WebSocket handshake verification""" + accept_key = key + self.GUID + sha1 = hashlib.sha1(accept_key.encode()) + return base64.b2a_base64(sha1.digest()).decode().strip() + + def _send_frame(self, data, opcode=0x1): + """Send a WebSocket frame""" + if isinstance(data, str): + data = data.encode('utf-8') + + length = len(data) + frame = bytearray() + + # Fin bit and opcode + frame.append(0x80 | opcode) + + # Payload length and masking bit + if length < 126: + frame.append(0x80 | length) + elif length < 65536: + frame.append(0x80 | 126) + frame.extend(struct.pack('>H', length)) + else: + frame.append(0x80 | 127) + frame.extend(struct.pack('>Q', length)) + + # Masking key + mask = bytes(random.getrandbits(8) for _ in range(4)) + frame.extend(mask) + + # Masked payload + masked_data = bytearray(length) + for i in range(length): + masked_data[i] = data[i] ^ mask[i % 4] + + frame.extend(masked_data) + total_sent = 0 + while total_sent < len(frame): + sent = self.socket.send(frame[total_sent:]) + if sent == 0: + raise RuntimeError("Socket connection broken") + total_sent += sent + + def _recv_frame(self, blocking=False): + """Receive a WebSocket frame + Args: + blocking (bool): If True, wait for data. If False, return None if no data available + """ + try: + # Check if data is available (only in non-blocking mode) + if not blocking and not self.poller.poll(0): + return None + + # Read header + header = self.socket.recv(2) + if not header or len(header) < 2: + return None + + # Parse header + fin = (header[0] & 0x80) == 0x80 + opcode = header[0] & 0x0F + masked = (header[1] & 0x80) == 0x80 + length = header[1] & 0x7F + + # Extended payload length + if length == 126: + length_bytes = self.socket.recv(2) + if len(length_bytes) < 2: + return None + length = struct.unpack('>H', length_bytes)[0] + elif length == 127: + length_bytes = self.socket.recv(8) + if len(length_bytes) < 8: + return None + length = struct.unpack('>Q', length_bytes)[0] + + # Masking key if present + if masked: + mask = self.socket.recv(4) + if len(mask) < 4: + return None + + # Read payload + payload = bytearray() + remaining = length + while remaining > 0: + chunk = self.socket.recv(min(remaining, 4096)) + if not chunk: + return None + payload.extend(chunk) + remaining -= len(chunk) + + # Unmask if needed + if masked: + unmasked = bytearray(length) + for i in range(length): + unmasked[i] = payload[i] ^ mask[i % 4] + payload = unmasked + + # Handle control frames + if opcode == 0x8: # Close + self.connected = False + return None + elif opcode == 0x9: # Ping + self._send_frame(payload, 0xA) # Send Pong + return self._recv_frame(blocking) # Get next message + elif opcode == 0xA: # Pong + return self._recv_frame(blocking) # Get next message + + return payload.decode('utf-8') + + except Exception as e: + print(f"Error receiving frame: {e}") + self.connected = False + return None + + def connect(self): + """Establish WebSocket connection with Home Assistant""" + try: + print(f"Starting connection to {self.host}...") + print("Creating socket...") + self.socket = socket.socket() + + print("Getting address info...") + try: + addr_info = socket.getaddrinfo(self.host, 8123) + print(f"Address info: {addr_info}") + addr = addr_info[0][-1] + print(f"Using address: {addr}") + except Exception as e: + print(f"Error getting address info: {e}") + return False + + print("Attempting socket connection...") + try: + self.socket.connect(addr) + print("Socket connected successfully") + except Exception as e: + print(f"Connection error: {e}") + return False + + # Set up the poller after connection + self._setup_poller() + + # Generate WebSocket key + key = self._generate_key() + + # Send WebSocket upgrade request + request = ( + f"GET /api/websocket HTTP/1.1\r\n" + f"Host: {self.host}:8123\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + f"Sec-WebSocket-Key: {key}\r\n" + "Sec-WebSocket-Version: 13\r\n" + "\r\n" + ) + + print("Sending upgrade request...") + total_sent = 0 + request_bytes = request.encode() + while total_sent < len(request_bytes): + sent = self.socket.send(request_bytes[total_sent:]) + if sent == 0: + raise RuntimeError("Socket connection broken") + total_sent += sent + + # Read response headers + print("Reading response...") + response = "" + while True: + chunk = self.socket.recv(4096) + if not chunk: + print("No response received") + return False + response += chunk.decode() + if "\r\n\r\n" in response: + break + + # Verify upgrade response + if "HTTP/1.1 101" not in response: + print("Invalid upgrade response:", response) + return False + + self.connected = True + print("WebSocket connection established") + + # Use blocking reads during authentication + # Wait for auth required message + print("Waiting for auth request...") + auth_req = self._recv_frame(blocking=True) + if not auth_req: + print("No auth request received") + return False + print("Auth request received:", auth_req) + + # Authenticate + print("Sending auth message...") + auth_msg = { + "type": "auth", + "access_token": self.token + } + self._send_frame(json.dumps(auth_msg)) + + # Wait for auth response + print("Waiting for auth response...") + response = self._recv_frame(blocking=True) # Use blocking read for auth + if response: + resp_data = json.loads(response) + if resp_data.get('type') == 'auth_ok': + self.authenticated = True + print("Authentication successful") + return True + print("Authentication failed:", resp_data.get('message', 'Unknown error')) + + return False + + except Exception as e: + print(f"Connection error: {e}") + if self.socket: + try: + self.socket.close() + except: + pass + self.socket = None + self.connected = False + return False + + def read_message(self): + """Read next message from WebSocket, non-blocking""" + try: + if not self.connected: + return None + + message = self._recv_frame(blocking=False) + if message: + return json.loads(message) + return None + + except Exception as e: + print(f"Read error: {e}") + self.connected = False + return None + + def call_service(self, domain, service, service_data=None, target=None): + """Call a Home Assistant service + + Args: + domain: Service domain (e.g. 'light') + service: Service name (e.g. 'turn_on') + service_data: Optional service data dictionary + target: Optional target dictionary (e.g. {"entity_id": "light.kitchen"}) + + Returns: + bool: True if successful + """ + if not self.authenticated: + print("Not authenticated") + return False + + try: + msg = { + "id": self.message_id, + "type": "call_service", + "domain": domain, + "service": service + } + if service_data: + msg["service_data"] = service_data + if target: + msg["target"] = target + + self._send_frame(json.dumps(msg)) + self.message_id += 1 + + # Wait for result + response = self._recv_frame() + if response: + resp_data = json.loads(response) + return resp_data.get('success', False) + + return False + + except Exception as e: + print(f"Service call error: {e}") + return False + + def subscribe_events(self, event_type=None): + """Subscribe to Home Assistant events + + Args: + event_type: Optional event type to filter (e.g. 'state_changed') + + Returns: + int: Subscription ID if successful, None if failed + """ + if not self.authenticated: + print("Not authenticated") + return None + + try: + msg = { + "id": self.message_id, + "type": "subscribe_events", + } + if event_type: + msg["event_type"] = event_type + + self._send_frame(json.dumps(msg)) + + # Wait for result - use blocking read for subscription response + response = self._recv_frame(blocking=True) + if response: + resp_data = json.loads(response) + if resp_data.get('success', False): + sub_id = self.message_id + self.message_id += 1 + print(f"Successfully subscribed to events: {event_type}") + return sub_id + else: + print(f"Failed to subscribe: {resp_data.get('error', 'Unknown error')}") + + return None + + except Exception as e: + print(f"Subscribe error: {e}") + return None + + def subscribe_trigger(self, trigger_config): + """Subscribe to a Home Assistant trigger + + Args: + trigger_config: Trigger configuration dictionary + + Returns: + int: Subscription ID if successful, None if failed + """ + if not self.authenticated: + print("Not authenticated") + return None + + try: + msg = { + "id": self.message_id, + "type": "subscribe_trigger", + "trigger": trigger_config + } + self._send_frame(json.dumps(msg)) + + # Wait for result + response = self._recv_frame() + if response: + resp_data = json.loads(response) + if resp_data.get('success'): + sub_id = self.message_id + self.message_id += 1 + return sub_id + + return None + + except Exception as e: + print(f"Subscribe error: {e}") + return None + + def get_state(self, entity_id): + """Get the state of a Home Assistant entity + + Args: + entity_id: The entity ID to get state for (e.g. 'light.kitchen') + + Returns: + dict: Entity state data if successful, None if failed + """ + if not self.authenticated: + print("Not authenticated") + return None + + try: + # Send get_states command for specific entity + msg_id = self.message_id + msg = { + "id": msg_id, + "type": "get_states" # Get all states, we'll filter for our entity + } + + print(f"Sending state request for {entity_id}") + self._send_frame(json.dumps(msg)) + self.message_id += 1 + + # Wait for result with blocking read + response = self._recv_frame(blocking=True) + if response: + resp_data = json.loads(response) + print(f"Got response: {resp_data}") # Debug output + + # Check that this is the response to our query + if resp_data.get('id') == msg_id: + if resp_data.get('success') and resp_data.get('result'): + # Find our entity in the results + for state in resp_data['result']: + if state.get('entity_id') == entity_id: + print(f"Found state for {entity_id}: {state}") # Debug output + return state + else: + print(f"Get state error: {resp_data.get('error', 'Unknown error')}") + + return None + + except Exception as e: + print(f"Get state error: {e}") + return None + + def close(self): + """Close the WebSocket connection""" + if self.socket: + try: + self._send_frame("", 0x8) # Send close frame + self.socket.close() + except: + pass + self.socket = None + self.connected = False + self.authenticated = False diff --git a/examples/home_assistant/hampws_light.py b/examples/home_assistant/hampws_light.py new file mode 100644 index 0000000..8a88083 --- /dev/null +++ b/examples/home_assistant/hampws_light.py @@ -0,0 +1,109 @@ +from config import HA_HOST, HA_TOKEN, LIGHT_ENTITIES +from hampws import HAMPWS +from presto import Presto +from touch import Button + +# Initialize Presto +presto = Presto() +display = presto.display +touch = presto.touch +WIDTH, HEIGHT = display.get_bounds() + +# Define colors +WHITE = display.create_pen(255, 255, 255) +BLACK = display.create_pen(0, 0, 0) +YELLOW = display.create_pen(255, 255, 0) + +# Create button +button = Button(10, 35, 100, 50) + +# Initialize Home Assistant connection +ws = HAMPWS(HA_HOST, HA_TOKEN) +if not ws.connect(): + raise Exception("Failed to connect to Home Assistant") + +# Get the first light entity +first_light = list(LIGHT_ENTITIES.values())[0]['entity_id'] + +def get_light_state(): + """Get the current state of the light""" + state = ws.get_state(first_light) + print(f"Raw state data: {state}") # Debug output + if state: + state_value = state.get('state', '').lower() + print(f"State value: {state_value}") # Debug output + return state_value == 'on' + return False + +def toggle_light(current_state): + """Toggle the light state""" + service = "turn_off" if current_state else "turn_on" + ws.call_service("light", service, + target={"entity_id": first_light}) + +try: + # Subscribe to state changes + print("Subscribing to state changes...") + sub_id = ws.subscribe_events("state_changed") + if not sub_id: + print("Warning: Failed to subscribe to state changes. Continuing without state updates.") + + # Initial light state + print(f"\nGetting initial light state...") # Debug output + light_state = get_light_state() + print(f"Initial light state parsed as: {'ON' if light_state else 'OFF'}") + + # Touch tracking + was_pressed = False + + while True: + # Handle touch first + touch.poll() + + # Check if button is currently pressed + is_pressed = touch.state and button.is_pressed() + + # Only toggle on new presses + if is_pressed and not was_pressed: + toggle_light(light_state) + light_state = not light_state # Immediately update display state + print(f"Light toggled to: {'ON' if light_state else 'OFF'}") + + # Update previous state + was_pressed = is_pressed + + # Clear display + display.set_pen(WHITE) + display.clear() + + # Draw button with current state color + display.set_pen(YELLOW if light_state else BLACK) + display.rectangle(*button.bounds) + + # Draw text + display.set_pen(BLACK) + display.text("Light Control", 10, 10, scale=2) + state_text = "ON" if light_state else "OFF" + display.text(state_text, button.x + 35, button.y + 15, scale=2) + + # Update display + presto.update() + + # Check for state changes + if sub_id: + message = ws.read_message() + if message and message.get('type') == 'event' and \ + message.get('event', {}).get('event_type') == 'state_changed': + data = message['event']['data'] + entity_id = data.get('entity_id', '') + if entity_id == first_light: + new_state = data.get('new_state', {}).get('state', '') + light_state = (new_state.lower() == 'on') + print(f"Light state changed to: {'ON' if light_state else 'OFF'}") + +except KeyboardInterrupt: + print("\nProgram stopped by user") +except Exception as e: + print(f"Error occurred: {e}") +finally: + ws.close() diff --git a/examples/home_assistant/hampws_test.py b/examples/home_assistant/hampws_test.py new file mode 100644 index 0000000..87cee49 --- /dev/null +++ b/examples/home_assistant/hampws_test.py @@ -0,0 +1,298 @@ +from config import ( + HA_HOST, HA_TOKEN, + LIGHT_ENTITIES, SENSORS, CAMERAS +) +from hampws import HAMPWS +import time +import json + +def test_paired_lights(): + """Test controlling paired back door lights""" + print("\n=== Testing Paired Lights Control ===") + ws = HAMPWS(HA_HOST, HA_TOKEN) + + if not ws.connect(): + print("Failed to connect") + return + + try: + # Get both light entities + light_entities = [light['entity_id'] for light in LIGHT_ENTITIES.values()] + + msg_id = ws.message_id + + # Turn both lights on + print(f"\nTurning on both back door lights...") + ws._send_frame(json.dumps({ + "id": msg_id, + "type": "call_service", + "domain": "light", + "service": "turn_on", + "target": {"entity_id": light_entities}, + "service_data": {"brightness": 255} + })) + ws.message_id += 1 + + # Wait for response and process state updates + start_time = time.time() + success = False + while time.time() - start_time < 2: # Wait up to 2 seconds + message = ws.read_message() + if message: + print(f"Received message: {message}") + if message.get('id') == msg_id: + success = message.get('success', False) + print("Turn on command Success!" if success else "Turn on command Failed!") + time.sleep(0.1) + + msg_id = ws.message_id + + # Dim both lights + print("\nDimming both lights to 50%...") + ws._send_frame(json.dumps({ + "id": msg_id, + "type": "call_service", + "domain": "light", + "service": "turn_on", + "target": {"entity_id": light_entities}, + "service_data": {"brightness": 127} + })) + ws.message_id += 1 + + # Wait for response and process state updates + start_time = time.time() + success = False + while time.time() - start_time < 2: + message = ws.read_message() + if message: + print(f"Received message: {message}") + if message.get('id') == msg_id: + success = message.get('success', False) + print("Dim command Success!" if success else "Dim command Failed!") + time.sleep(0.1) + + msg_id = ws.message_id + + # Turn both lights off + print("\nTurning off both lights...") + ws._send_frame(json.dumps({ + "id": msg_id, + "type": "call_service", + "domain": "light", + "service": "turn_off", + "target": {"entity_id": light_entities} + })) + ws.message_id += 1 + + # Wait for response and process state updates + start_time = time.time() + success = False + while time.time() - start_time < 2: + message = ws.read_message() + if message: + print(f"Received message: {message}") + if message.get('id') == msg_id: + success = message.get('success', False) + print("Turn off command Success!" if success else "Turn off command Failed!") + time.sleep(0.1) + + # Final message processing + start_time = time.time() + while time.time() - start_time < 2: + message = ws.read_message() + if message: + print(f"Received message: {message}") + time.sleep(0.1) + + finally: + ws.close() + +def test_state_subscription(): + """Test subscribing to state changes with entity filtering""" + print("\n=== Testing State Change Subscription ===") + ws = HAMPWS(HA_HOST, HA_TOKEN) + + if not ws.connect(): + print("Failed to connect") + return + + try: + # Subscribe to state changes + sub_id = ws.subscribe_events("state_changed") + if sub_id: + print("Subscribed to state changes!") + print("Listening for state changes (Ctrl+C to stop)...") + + # Create set of interesting entities + monitored_entities = set() + for entities in [LIGHT_ENTITIES, SENSORS, CAMERAS]: + for entity in entities.values(): + monitored_entities.add(entity['entity_id']) + + # Set a timeout for the test + start_time = time.time() + max_duration = 30 # Run for 30 seconds + + while time.time() - start_time < max_duration: + message = ws.read_message() + if message: + if message.get('type') == 'event' and \ + message.get('event', {}).get('event_type') == 'state_changed': + data = message['event']['data'] + entity_id = data.get('entity_id', '') + + # Only show changes for our monitored entities + if entity_id in monitored_entities: + new_state = data.get('new_state', {}).get('state', '') + attributes = data.get('new_state', {}).get('attributes', {}) + + # Format output based on entity type + if entity_id.startswith('light.'): + brightness = attributes.get('brightness', 0) + print(f"Light change: {entity_id} -> {new_state} (brightness: {brightness})") + elif entity_id.startswith('binary_sensor.'): + print(f"Sensor change: {entity_id} -> {new_state}") + elif entity_id.startswith('camera.'): + print(f"Camera change: {entity_id} -> {new_state}") + else: + print(f"State change: {entity_id} -> {new_state}") + + time.sleep(0.1) + + except KeyboardInterrupt: + print("\nStopping state subscription test...") + finally: + ws.close() + +def test_smart_motion(): + """Test advanced motion detection with camera integration""" + print("\n=== Testing Smart Motion Detection ===") + ws = HAMPWS(HA_HOST, HA_TOKEN) + + if not ws.connect(): + print("Failed to connect") + return + + try: + # Set up trigger for motion sensor + trigger_config = { + "platform": "state", + "entity_id": SENSORS['motion']['entity_id'], + "from": "off", + "to": "on" + } + + trigger_msg_id = ws.message_id # Store message ID for trigger subscription + ws._send_frame(json.dumps({ + "id": trigger_msg_id, + "type": "subscribe_trigger", + "trigger": trigger_config + })) + ws.message_id += 1 + + # Also subscribe to state changes + state_msg_id = ws.message_id # Store message ID for state subscription + ws._send_frame(json.dumps({ + "id": state_msg_id, + "type": "subscribe_events", + "event_type": "state_changed" + })) + ws.message_id += 1 + + print("Waiting for subscription confirmations...") + + # Track subscription states + trigger_sub_confirmed = False + state_sub_confirmed = False + + # Track entity states + is_dark = None + camera_state = None + + # Set timeouts + start_time = time.time() + max_duration = 30 # Run for 30 seconds + subscription_timeout = time.time() + 5 # 5 seconds to confirm subscriptions + + print("Monitoring for smart motion events...") + print("Will track motion, darkness state, and camera recording state") + + while time.time() - start_time < max_duration: + message = ws.read_message() + + if message: + print(f"Received message: {message}") # Debug output + + # Handle subscription confirmations + if message.get('type') == 'result': + msg_id = message.get('id') + if msg_id == trigger_msg_id and message.get('success'): + trigger_sub_confirmed = True + print("Motion trigger subscription confirmed!") + elif msg_id == state_msg_id and message.get('success'): + state_sub_confirmed = True + print("State change subscription confirmed!") + + # Once subscribed, handle events + elif message.get('type') == 'event': + if message.get('event', {}).get('event_type') == 'state_changed': + # Track darkness and camera states + data = message['event']['data'] + entity_id = data.get('entity_id', '') + new_state = data.get('new_state', {}).get('state', '') + + if entity_id == SENSORS['dark']['entity_id']: + is_dark = (new_state == 'on') + print(f"Darkness state: {'Dark' if is_dark else 'Light'}") + + elif entity_id == CAMERAS['back_door']['entity_id']: + camera_state = new_state + print(f"Camera state: {camera_state}") + + # Handle motion trigger events + elif trigger_sub_confirmed: + print("\nMotion detected!") + print(f"Current conditions:") + print(f"- Dark: {is_dark}") + print(f"- Camera: {camera_state}") + + # Automatically turn on lights if dark + if is_dark: + print("Dark condition detected - turning on lights...") + light_entities = [light['entity_id'] for light in LIGHT_ENTITIES.values()] + ws.call_service("light", "turn_on", + target={"entity_id": light_entities}, + service_data={"brightness": 255}) + + # Check if subscriptions are confirmed within timeout + if not (trigger_sub_confirmed and state_sub_confirmed): + if time.time() > subscription_timeout: + print("Timeout waiting for subscription confirmations") + return + + time.sleep(0.1) + + except KeyboardInterrupt: + print("\nStopping smart motion test...") + finally: + ws.close() + +def main(): + """Run all tests""" + try: + # Test paired light control + test_paired_lights() + + # Test state change subscription + test_state_subscription() + + # Test smart motion detection + test_smart_motion() + + except KeyboardInterrupt: + print("\nTests stopped by user") + except Exception as e: + print(f"Error during tests: {e}") + +if __name__ == "__main__": + main()