diff --git a/pyproject.toml b/pyproject.toml index 8b1c6a530..5b0e1a031 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,9 +9,11 @@ dependencies = ["homeassistant>=2025.12.2", "numpy>=2.3.2", "highspy>=1.12.0"] [dependency-groups] dev = [ + "beautifulsoup4>=4.14.3", "freezegun>=1.5.2", "matplotlib>=3.10.8", "networkx>=3.4.2", + "playwright>=1.57.0", "mdformat-beautysh>=1.0.0", "mdformat-config>=0.2.1", "mdformat-footnote>=0.1.1", @@ -134,6 +136,9 @@ addopts = ["--strict-markers", "--strict-config"] filterwarnings = ["error"] markers = [ "scenario: marks tests as scenario tests (select with '-m \"scenario\"')", + "browser: marks tests as browser automation tests (select with '-m \"browser\"')", + "guide: marks tests as guide screenshot tests (select with '-m \"guide\"')", + "enable_socket: marks tests that need real socket access", ] [tool.coverage.run] diff --git a/tests/conftest.py b/tests/conftest.py index 838864ee9..68833d3a2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -119,6 +119,17 @@ def configure_logging() -> None: @pytest.fixture(autouse=True) -def auto_enable_custom_integrations(enable_custom_integrations: None) -> bool: - """Enable loading custom integrations in all tests.""" +def auto_enable_custom_integrations(request: pytest.FixtureRequest) -> bool: + """Enable loading custom integrations in all tests. + + Skip for guide tests which use their own HA instance without + the pytest-homeassistant-custom-component fixtures. + """ + # Skip for guide tests - they don't use the hass fixture + if "guide" in (mark.name for mark in request.node.iter_markers()): + return True + + # For non-guide tests, request the enable_custom_integrations fixture + # which depends on the hass fixture + enable_custom_integrations = request.getfixturevalue("enable_custom_integrations") return enable_custom_integrations is None diff --git a/tests/guides/.gitignore b/tests/guides/.gitignore new file mode 100644 index 000000000..11e1340c9 --- /dev/null +++ b/tests/guides/.gitignore @@ -0,0 +1,5 @@ +# Screenshot outputs (generated by guide tests) +*/screenshots* + +# Log files +*.log diff --git a/tests/guides/__init__.py b/tests/guides/__init__.py new file mode 100644 index 000000000..846010e68 --- /dev/null +++ b/tests/guides/__init__.py @@ -0,0 +1 @@ +"""Guide tests with browser automation and live Home Assistant.""" diff --git a/tests/guides/conftest.py b/tests/guides/conftest.py new file mode 100644 index 000000000..3d674093c --- /dev/null +++ b/tests/guides/conftest.py @@ -0,0 +1,22 @@ +"""Configuration for guide tests. + +Guide tests are designed to run Home Assistant in-process with browser +automation. They do NOT use pytest-homeassistant-custom-component fixtures +since they need a full HTTP server for Playwright. + +These tests should be run with: + uv run python tests/guides/.py + +Or via pytest with explicit socket enabling: + uv run pytest tests/guides/ --force-enable-socket -m guide +""" + +import pytest + + +def pytest_configure(config: pytest.Config) -> None: + """Register guide marker.""" + config.addinivalue_line( + "markers", + "guide: mark test as a guide test (runs full HA with HTTP, needs network)", + ) diff --git a/tests/guides/ha_runner.py b/tests/guides/ha_runner.py new file mode 100644 index 000000000..9a1331765 --- /dev/null +++ b/tests/guides/ha_runner.py @@ -0,0 +1,548 @@ +"""In-process Home Assistant runner for guide tests. + +This module provides a way to run Home Assistant entirely in-process with +an HTTP server on an ephemeral port, allowing browser automation via Playwright. + +The key insight is that we can: +1. Create a HomeAssistant instance with a minimal temp config directory +2. Set up the HTTP, frontend, and auth components programmatically +3. Load entity states directly via hass.states.async_set() +4. Run the event loop in a background thread +5. Access the HA instance from the main thread for Playwright automation +6. Pre-create an owner user and auth token to bypass onboarding UI + +This avoids needing config files, YAML, or packages - just load states from JSON. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Generator +from contextlib import closing, contextmanager +from dataclasses import dataclass +import json +from pathlib import Path +import shutil +import socket +import tempfile +import threading +from typing import TYPE_CHECKING, Any +import warnings + +from homeassistant import loader +from homeassistant.auth import auth_manager_from_config +from homeassistant.auth.models import Credentials +from homeassistant.config_entries import ConfigEntries +from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.core import CoreState, HomeAssistant +from homeassistant.helpers import area_registry as ar +from homeassistant.helpers import category_registry as cr +from homeassistant.helpers import device_registry as dr +from homeassistant.helpers import entity, translation +from homeassistant.helpers import entity_registry as er +from homeassistant.helpers import floor_registry as fr +from homeassistant.helpers import issue_registry as ir +from homeassistant.helpers import label_registry as lr +from homeassistant.helpers import restore_state as rs +from homeassistant.setup import async_setup_component + +if TYPE_CHECKING: + from playwright.sync_api import BrowserContext + +# Path to project root +PROJECT_ROOT = Path(__file__).parent.parent.parent + +# Client ID for refresh tokens (matches HA frontend) +CLIENT_ID = "http://127.0.0.1/" + + +def _find_free_port() -> int: + """Find a free port on localhost.""" + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("127.0.0.1", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +@dataclass +class LiveHomeAssistant: + """A running Home Assistant instance with HTTP server. + + Provides methods to interact with the HA instance from outside + the event loop thread. + """ + + hass: HomeAssistant + url: str + port: int + loop: asyncio.AbstractEventLoop + access_token: str + refresh_token_id: str + _stop_event: asyncio.Event + + def set_state( + self, + entity_id: str, + state: str, + attributes: dict[str, Any] | None = None, + ) -> None: + """Set an entity state. + + Args: + entity_id: Entity ID like "sensor.power" + state: State value + attributes: Optional attributes dict + + """ + + async def _set() -> None: + self.hass.states.async_set(entity_id, state, attributes or {}) + + future = asyncio.run_coroutine_threadsafe(_set(), self.loop) + future.result(timeout=5) + + def set_states(self, states: list[dict[str, Any]]) -> None: + """Set multiple entity states. + + Args: + states: List of dicts with entity_id, state, and optional attributes + + """ + + async def _set_all() -> None: + for state_data in states: + self.hass.states.async_set( + state_data["entity_id"], + state_data["state"], + state_data.get("attributes", {}), + ) + await self.hass.async_block_till_done() + + future = asyncio.run_coroutine_threadsafe(_set_all(), self.loop) + future.result(timeout=30) + + def load_states_from_file(self, states_file: Path) -> None: + """Load entity states from a JSON file. + + Args: + states_file: Path to JSON file with state definitions + + """ + with states_file.open() as f: + states = json.load(f) + self.set_states(states) + + def run_coro(self, coro: Any, timeout: float = 30) -> Any: + """Run a coroutine on the HA event loop. + + Args: + coro: Coroutine to run + timeout: Maximum seconds to wait + + Returns: + Result of the coroutine + + """ + future = asyncio.run_coroutine_threadsafe(coro, self.loop) + return future.result(timeout=timeout) + + def inject_auth(self, context: BrowserContext, *, dark_mode: bool = False) -> None: + """Inject authentication into a Playwright browser context. + + Sets up the Authorization header for all requests so HA frontend + is pre-authenticated. Must be called before navigating to HA. + + Args: + context: Playwright BrowserContext to inject auth into + dark_mode: Whether to set dark mode theme preference + + """ + # Import here to avoid requiring playwright for non-browser usage + from playwright.sync_api import Request, Route # noqa: PLC0415 + + # Add Authorization header to all API requests + # HA frontend uses websocket for most communication but REST for some + def add_auth_header(route: Route, request: Request) -> None: + headers = { + **request.headers, + "Authorization": f"Bearer {self.access_token}", + } + route.continue_(headers=headers) + + context.route("**/*", add_auth_header) + + # Also set up localStorage token storage for frontend JS + # HA stores auth data in localStorage under 'hassTokens' + # Format matches what home-assistant-js-websocket expects + token_data = { + "hassUrl": self.url, + "clientId": CLIENT_ID, + "access_token": self.access_token, + "refresh_token": self.refresh_token_id, + "token_type": "Bearer", + "expires_in": 1800, # 30 minutes + } + + # Build localStorage init script + theme_js = "" + if dark_mode: + # HA stores theme preference in localStorage as { theme: string, dark: boolean } + theme_data = {"theme": "default", "dark": True} + theme_js = f""" + localStorage.setItem('selectedTheme', JSON.stringify({json.dumps(theme_data)})); + """ + + init_script = f""" + localStorage.setItem('hassTokens', JSON.stringify({json.dumps(token_data)})); + {theme_js} + """ + context.add_init_script(init_script) + + def call_service( + self, + domain: str, + service: str, + service_data: dict[str, Any] | None = None, + *, + blocking: bool = True, + ) -> None: + """Call a Home Assistant service. + + Args: + domain: Service domain (e.g., "frontend") + service: Service name (e.g., "set_theme") + service_data: Optional service data dict + blocking: Whether to wait for service completion + + """ + + async def _call() -> None: + await self.hass.services.async_call( + domain, + service, + service_data or {}, + blocking=blocking, + ) + + future = asyncio.run_coroutine_threadsafe(_call(), self.loop) + future.result(timeout=10) + + def stop(self) -> None: + """Signal the HA instance to stop via thread-safe call.""" + self.loop.call_soon_threadsafe(self._stop_event.set) + + +async def _setup_home_assistant_async( + port: int, + config_dir: str, +) -> tuple[HomeAssistant, str, str]: + """Set up a Home Assistant instance with HTTP server and pre-authenticated user. + + This creates a minimal HA instance with just the components needed + for browser automation: http, frontend, auth, websocket_api. + Onboarding is bypassed by creating an owner user programmatically. + + Returns: + Tuple of (HomeAssistant instance, access_token, refresh_token_id) + + """ + # Pre-populate onboarding storage to mark all steps complete + # This MUST be done BEFORE the HomeAssistant instance is created, + # because the StoreManager scans the storage directory during initialization + # and caches which files exist. If we write the file after that scan, + # the onboarding component won't see it. + storage_dir = Path(config_dir) / ".storage" + storage_dir.mkdir(exist_ok=True) + onboarding_storage = storage_dir / "onboarding" + onboarding_data = { + "version": 4, + "minor_version": 1, + "key": "onboarding", + "data": {"done": ["user", "core_config", "analytics", "integration"]}, + } + onboarding_storage.write_text(json.dumps(onboarding_data)) + + hass = HomeAssistant(config_dir) + + # Basic configuration + hass.config.location_name = "Test Home" + hass.config.latitude = 32.87336 + hass.config.longitude = -117.22743 + hass.config.elevation = 0 + await hass.config.async_set_time_zone("UTC") + hass.config.skip_pip = True + hass.config.skip_pip_packages = [] + + # Set up loader - don't set DATA_CUSTOM_COMPONENTS, let loader discover them + loader.async_setup(hass) + + # Set up config entries + hass.config_entries = ConfigEntries(hass, {"_": "placeholder"}) + hass.bus.async_listen_once( + EVENT_HOMEASSISTANT_STOP, + hass.config_entries._async_shutdown, + ) + + # Set up essential helpers + entity.async_setup(hass) + + # Translation cache + hass.data[translation.TRANSLATION_FLATTEN_CACHE] = translation._TranslationCache(hass) + + # Load registries + await ar.async_load(hass) + await cr.async_load(hass) + await dr.async_load(hass) + await er.async_load(hass) + await fr.async_load(hass) + await ir.async_load(hass) + await lr.async_load(hass) + await rs.async_load(hass) + + # Set up auth with homeassistant provider + hass.auth = await auth_manager_from_config( + hass, + provider_configs=[{"type": "homeassistant"}], + module_configs=[], + ) + + # Get the homeassistant auth provider to add a user with password. + # We configure the provider as "homeassistant" type above, so index 0 is always HassAuthProvider. + # Pyright can't narrow AuthProvider to HassAuthProvider due to incomplete type stubs + # in Home Assistant - async_add_auth exists on HassAuthProvider but not AuthProvider. + provider = hass.auth.auth_providers[0] + await provider.async_add_auth("testuser", "testpass") # pyright: ignore[reportAttributeAccessIssue] + + # Create owner user to bypass onboarding + # First non-system user automatically becomes owner + owner = await hass.auth.async_create_user( + name="Test User", + group_ids=["system-admin"], + ) + + # Create credential and link to user + credential = Credentials( + id="test-credential", + auth_provider_type="homeassistant", + auth_provider_id=None, + data={"username": "testuser"}, + is_new=False, + ) + await hass.auth.async_link_user(owner, credential) + + # Create refresh token and access token + refresh_token = await hass.auth.async_create_refresh_token( + owner, + CLIENT_ID, + credential=credential, + ) + access_token = hass.auth.async_create_access_token(refresh_token) + # Store refresh token ID for frontend auth + refresh_token_id = refresh_token.id + + # Set up HTTP on ephemeral port + http_config = { + "server_port": port, + } + + # Suppress aiohttp.web_exceptions.NotAppKeyWarning which is raised as an error + # in newer versions of aiohttp when HA sets app["hass"] = hass + # This is a compatibility issue between HA and newer aiohttp versions + warnings.filterwarnings("ignore", category=DeprecationWarning, module="aiohttp") + try: + # NotAppKeyWarning only exists in newer aiohttp versions + from aiohttp.web_exceptions import NotAppKeyWarning # noqa: PLC0415 + + warnings.filterwarnings("ignore", category=NotAppKeyWarning) + except ImportError: + pass # Older aiohttp doesn't have this + + # Set up components in order (onboarding will see all steps done and skip + # because we pre-populated the storage file) + assert await async_setup_component(hass, "http", {"http": http_config}) + assert await async_setup_component(hass, "websocket_api", {}) + assert await async_setup_component(hass, "auth", {}) + assert await async_setup_component(hass, "onboarding", {}) + + # Verify onboarding is bypassed + # Import here because component must be set up first + from homeassistant.components.onboarding import async_is_onboarded # noqa: PLC0415 + + if not async_is_onboarded(hass): + msg = "Onboarding bypass failed - check storage file format and timing" + raise RuntimeError(msg) + + assert await async_setup_component(hass, "frontend", {}) + assert await async_setup_component(hass, "config", {}) + + # Mark as running + hass.set_state(CoreState.running) + + # Start the HTTP server + await hass.http.start() + + return hass, access_token, refresh_token_id + + +def _run_hass_thread( + port: int, + config_dir: str, + hass_holder: list[HomeAssistant], + token_holder: list[tuple[str, str]], + loop_holder: list[asyncio.AbstractEventLoop], + ready_event: threading.Event, + async_stop_event_holder: list[asyncio.Event], + error_holder: list[Exception], +) -> None: + """Run Home Assistant in a thread with its own event loop.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop_holder.append(loop) + + async def _run() -> None: + # Create asyncio.Event for clean shutdown signaling + async_stop_event = asyncio.Event() + async_stop_event_holder.append(async_stop_event) + + try: + hass, access_token, refresh_token_id = await _setup_home_assistant_async(port, config_dir) + hass_holder.append(hass) + token_holder.append((access_token, refresh_token_id)) + ready_event.set() + + # Wait for stop signal from main thread + await async_stop_event.wait() + + # Shutdown - async_stop will handle HTTP server via event handler + await hass.async_stop(force=True) + + except Exception as e: + error_holder.append(e) + ready_event.set() # Unblock the main thread + + try: + loop.run_until_complete(_run()) + finally: + loop.close() + + +@contextmanager +def live_home_assistant( + timeout: float = 60.0, +) -> Generator[LiveHomeAssistant]: + """Context manager for a live Home Assistant instance. + + Starts HA on an ephemeral port in a background thread and yields + a LiveHomeAssistant instance for interaction. The instance includes + a pre-authenticated access_token for browser automation. + + Args: + timeout: Maximum seconds to wait for HA to start + + Yields: + LiveHomeAssistant instance with access_token for auth + + Example: + with live_home_assistant() as hass: + hass.set_states([ + {"entity_id": "sensor.power", "state": "1500", "attributes": {...}} + ]) + # Inject auth into Playwright context + hass.inject_auth(browser_context) + # Use Playwright to interact with hass.url + + """ + port = _find_free_port() + + # Create a temporary config directory (HA requires one even if minimal) + with tempfile.TemporaryDirectory(prefix="haeo_guide_") as temp_dir: + config_dir = temp_dir + + # Create custom_components symlink for HAEO + custom_components = Path(temp_dir) / "custom_components" + custom_components.mkdir() + haeo_source = PROJECT_ROOT / "custom_components" / "haeo" + haeo_target = custom_components / "haeo" + haeo_target.symlink_to(haeo_source) + + # Copy SingleFile bundle to www directory for HTML captures + www_dir = Path(temp_dir) / "www" + www_dir.mkdir() + singlefile_bundle = PROJECT_ROOT / "node_modules" / "single-file-cli" / "lib" / "single-file-bundle.js" + if singlefile_bundle.exists(): + shutil.copy2(singlefile_bundle, www_dir / "single-file-bundle.js") + + hass_holder: list[HomeAssistant] = [] + token_holder: list[tuple[str, str]] = [] + loop_holder: list[asyncio.AbstractEventLoop] = [] + error_holder: list[Exception] = [] + async_stop_event_holder: list[asyncio.Event] = [] + ready_event = threading.Event() + + thread = threading.Thread( + target=_run_hass_thread, + args=( + port, + config_dir, + hass_holder, + token_holder, + loop_holder, + ready_event, + async_stop_event_holder, + error_holder, + ), + daemon=True, + ) + thread.start() + + # Wait for HA to be ready + if not ready_event.wait(timeout=timeout): + # Signal stop via thread-safe call if loop exists + if loop_holder and async_stop_event_holder: + loop_holder[0].call_soon_threadsafe(async_stop_event_holder[0].set) + thread.join(timeout=5) + msg = f"Home Assistant did not start within {timeout}s" + raise TimeoutError(msg) + + # Check for errors during startup + if error_holder: + if loop_holder and async_stop_event_holder: + loop_holder[0].call_soon_threadsafe(async_stop_event_holder[0].set) + thread.join(timeout=5) + raise error_holder[0] + + hass = hass_holder[0] + access_token, refresh_token_id = token_holder[0] + loop = loop_holder[0] + async_stop_event = async_stop_event_holder[0] + + instance = LiveHomeAssistant( + hass=hass, + url=f"http://127.0.0.1:{port}", + port=port, + loop=loop, + access_token=access_token, + refresh_token_id=refresh_token_id, + _stop_event=async_stop_event, + ) + + try: + yield instance + finally: + # Signal stop via thread-safe call to the async event loop + loop.call_soon_threadsafe(async_stop_event.set) + thread.join(timeout=10) + + +def load_states_from_json(path: Path) -> list[dict[str, Any]]: + """Load entity states from a JSON file. + + Args: + path: Path to JSON file + + Returns: + List of state dictionaries + + """ + with path.open() as f: + return json.load(f) diff --git a/tests/guides/primitives/__init__.py b/tests/guides/primitives/__init__.py new file mode 100644 index 000000000..91f2d883a --- /dev/null +++ b/tests/guides/primitives/__init__.py @@ -0,0 +1,58 @@ +"""Guide primitives for browser automation. + +This package provides two layers of primitives: + +1. HAPage - Low-level Home Assistant UI interactions (may change between HA versions) +2. HAEO element functions - High-level HAEO element configuration + +Screenshots are automatically collected using ScreenshotContext with hierarchical naming. + +Example usage: + from tests.guides.primitives import HAPage, screenshot_context + from tests.guides.primitives.haeo import add_integration, add_battery + + with screenshot_context(output_dir) as ctx: + add_integration(page, network_name="My System") + add_battery(page, name="Battery", connection="Inverter", ...) + # ctx.screenshots contains OrderedDict of all captured images +""" + +from tests.guides.primitives.ha_page import HAPage +from tests.guides.primitives.haeo import ( + Entity, + add_battery, + add_grid, + add_integration, + add_inverter, + add_load, + add_node, + add_solar, + login, + verify_setup, +) +from tests.guides.primitives.capture import ( + ScreenshotContext, + guide_step, + screenshot_context, +) + +__all__ = [ + # Screenshot context + "ScreenshotContext", + "guide_step", + "screenshot_context", + # Low-level primitives + "HAPage", + # Entity type + "Entity", + # HAEO element primitives + "add_battery", + "add_grid", + "add_integration", + "add_inverter", + "add_load", + "add_node", + "add_solar", + "login", + "verify_setup", +] diff --git a/tests/guides/primitives/capture.py b/tests/guides/primitives/capture.py new file mode 100644 index 000000000..05e003f3b --- /dev/null +++ b/tests/guides/primitives/capture.py @@ -0,0 +1,163 @@ +"""Screenshot collection context for guide automation. + +Provides hierarchical screenshot naming through a context stack. +Screenshots are named based on the function call hierarchy, e.g.: + add_grid.select_entity.Import_Price.search +""" + +from __future__ import annotations + +from collections import OrderedDict +from contextlib import contextmanager +from dataclasses import dataclass, field +from functools import wraps +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, TypeVar + +if TYPE_CHECKING: + from collections.abc import Iterator + +# Thread-local storage for the current context +_current_context: ScreenshotContext | None = None + +F = TypeVar("F", bound=Callable[..., Any]) + + +@dataclass +class ScreenshotContext: + """Context for collecting screenshots with hierarchical naming. + + Screenshots are stored in an OrderedDict with keys like: + "add_grid.fill_textbox.Grid_Name" + "add_grid.select_entity.Import_Price.search" + + The naming hierarchy is built from: + 1. The decorated function name (e.g., "add_grid") + 2. The HAPage method name (e.g., "select_entity") + 3. The element/field name (e.g., "Import_Price") + 4. Optional step suffix (e.g., "search", "result") + """ + + output_dir: Path + screenshots: OrderedDict[str, Path] = field(default_factory=OrderedDict) + _stack: list[str] = field(default_factory=list) + _step_number: int = 0 + + @staticmethod + def current() -> ScreenshotContext | None: + """Get the current active context.""" + return _current_context + + @staticmethod + def require() -> ScreenshotContext: + """Get the current context, raising if none is active.""" + ctx = _current_context + if ctx is None: + msg = "No ScreenshotContext is active" + raise RuntimeError(msg) + return ctx + + def push(self, name: str) -> None: + """Push a name onto the context stack.""" + self._stack.append(name) + + def pop(self) -> None: + """Pop a name from the context stack.""" + if self._stack: + self._stack.pop() + + def capture(self, page: Any, step: str) -> Path: + """Capture a screenshot with hierarchical naming. + + Args: + page: Playwright page object + step: Step name within current context (e.g., "click", "result") + + Returns: + Path to the saved screenshot + + """ + self._step_number += 1 + + # Build hierarchical name from stack + step + # Sanitize names: replace spaces with underscores, remove special chars + parts = [self._sanitize(p) for p in self._stack] + parts.append(self._sanitize(step)) + name = ".".join(parts) + + # Include step number for ordering + filename = f"{self._step_number:03d}_{name}.png" + path = self.output_dir / filename + + page.screenshot(path=str(path), animations="disabled") + self.screenshots[name] = path + + return path + + @staticmethod + def _sanitize(name: str) -> str: + """Sanitize a name for use in filenames.""" + # Replace spaces and special chars with underscores + result = name.replace(" ", "_").replace("-", "_") + # Remove any remaining problematic characters + return "".join(c for c in result if c.isalnum() or c == "_") + + @contextmanager + def scope(self, name: str) -> Iterator[None]: + """Context manager to add a scope level.""" + self.push(name) + try: + yield + finally: + self.pop() + + +@contextmanager +def screenshot_context(output_dir: Path) -> Iterator[ScreenshotContext]: + """Create a screenshot collection context. + + Usage: + with screenshot_context(output_dir) as ctx: + add_integration(page, "My System") + add_battery(page, ...) + # ctx.screenshots contains all captured screenshots + + """ + global _current_context + + output_dir.mkdir(parents=True, exist_ok=True) + ctx = ScreenshotContext(output_dir=output_dir) + + previous = _current_context + _current_context = ctx + try: + yield ctx + finally: + _current_context = previous + + +def guide_step(func: F) -> F: + """Decorator for HAEO primitive functions. + + Wraps the function to push its name onto the screenshot context stack, + so all screenshots taken within the function get hierarchical names. + + Usage: + @guide_step + def add_battery(page: HAPage, name: str, ...): + page.fill_textbox("Battery Name", name) # → "add_battery.fill_textbox.Battery_Name" + ... + + """ + + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + ctx = ScreenshotContext.current() + if ctx is None: + # No context active, just call the function + return func(*args, **kwargs) + + with ctx.scope(func.__name__): + return func(*args, **kwargs) + + return wrapper # type: ignore[return-value] diff --git a/tests/guides/primitives/ha_page.py b/tests/guides/primitives/ha_page.py new file mode 100644 index 000000000..ca90e574c --- /dev/null +++ b/tests/guides/primitives/ha_page.py @@ -0,0 +1,372 @@ +"""Low-level Home Assistant UI primitives. + +This module contains primitives for interacting with the Home Assistant UI. +These may need updates when Home Assistant changes its frontend. + +The HAPage class wraps a Playwright Page with HA-specific interactions +like entity pickers, dialogs, and screenshot capture with indicators. + +Screenshots are automatically collected using the ScreenshotContext. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from .capture import ScreenshotContext + +if TYPE_CHECKING: + from playwright.sync_api import Page + +_LOGGER = logging.getLogger(__name__) + +# Timeouts +DEFAULT_TIMEOUT = 5000 # 5 seconds max +SEARCH_TIMEOUT = 10000 # 10 seconds for search results + +# Load JavaScript from external file +_JS_DIR = Path(__file__).parent / "js" +_CLICK_INDICATOR_JS = (_JS_DIR / "click_indicator.js").read_text() + + +@dataclass +class HAPage: + """Low-level Home Assistant page interactions. + + All methods automatically capture screenshots using the active ScreenshotContext. + Screenshot names are built hierarchically from the context stack. + """ + + page: Page + url: str + + # region: Screenshot Capture + + def _capture(self, step: str) -> None: + """Capture screenshot with current context naming.""" + ctx = ScreenshotContext.current() + if ctx: + ctx.capture(self.page, step) + + def _capture_with_indicator(self, step: str, locator: Any) -> None: + """Capture screenshot with click indicator on target element.""" + self._show_click_indicator(locator) + self._capture(step) + self._remove_click_indicator() + + def _show_click_indicator(self, locator: Any) -> None: + """Show click indicator overlay at target element.""" + self._remove_click_indicator() + + element = locator.element_handle(timeout=1000) + if not element: + return + + clickable_selector = ( + "button, [role='button'], [role='option'], [role='listitem'], a, " + "ha-list-item, ha-combo-box-item, mwc-list-item, md-item, " + "ha-button, ha-icon-button, .mdc-text-field, ha-textfield, " + "input, select, ha-select, ha-integration-list-item" + ) + + element.evaluate(_CLICK_INDICATOR_JS, clickable_selector) + + def _remove_click_indicator(self) -> None: + """Remove click indicator overlay.""" + self.page.evaluate(""" + const overlay = document.getElementById('click-indicator-overlay'); + if (overlay) { + try { overlay.hidePopover(); } catch (e) {} + overlay.remove(); + } + """) + + def _scroll_into_view(self, locator: Any) -> None: + """Scroll element into view.""" + locator.scroll_into_view_if_needed(timeout=DEFAULT_TIMEOUT) + + # endregion + + # region: Navigation + + def goto(self, path: str) -> None: + """Navigate to a path within Home Assistant.""" + full_url = f"{self.url}{path}" if path.startswith("/") else path + self.page.goto(full_url) + self.page.wait_for_load_state("networkidle") + + def wait_for_load(self) -> None: + """Wait for page to finish loading.""" + self.page.wait_for_load_state("networkidle") + + # endregion + + # region: Form Interactions + + def click_button(self, name: str) -> None: + """Click a button by accessible name.""" + ctx = ScreenshotContext.current() + + button = self.page.get_by_role("button", name=name) + button.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + if ctx: + with ctx.scope(f"click_{name}"): + self._scroll_into_view(button) + self._capture_with_indicator("target", button) + button.click(timeout=DEFAULT_TIMEOUT) + self.page.wait_for_load_state("domcontentloaded") + self._capture("result") + else: + button.click(timeout=DEFAULT_TIMEOUT) + + def fill_textbox(self, name: str, value: str) -> None: + """Fill a textbox by accessible name.""" + textbox = self.page.get_by_role("textbox", name=name) + + current_value = textbox.input_value(timeout=DEFAULT_TIMEOUT) + if current_value == value: + return + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope(f"fill_{name}"): + self._scroll_into_view(textbox) + self._capture_with_indicator("field", textbox) + textbox.fill(value) + self._capture("filled") + else: + textbox.fill(value) + + def fill_spinbutton(self, name: str, value: str) -> None: + """Fill a spinbutton by accessible name.""" + spinbutton = self.page.get_by_role("spinbutton", name=name) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope(f"fill_{name}"): + self._scroll_into_view(spinbutton) + self._capture_with_indicator("field", spinbutton) + spinbutton.clear() + spinbutton.fill(value) + self._capture("filled") + else: + spinbutton.clear() + spinbutton.fill(value) + + def select_combobox(self, combobox_name: str, option_text: str) -> None: + """Select option from combobox dropdown.""" + combobox = self.page.get_by_role("combobox", name=combobox_name) + combobox.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope(f"select_{combobox_name}"): + self._scroll_into_view(combobox) + self._capture_with_indicator("dropdown", combobox) + combobox.click() + + option = self.page.get_by_role("option", name=option_text) + option.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + self._scroll_into_view(option) + self._capture_with_indicator("option", option) + + option.click() + option.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + self._capture("selected") + else: + combobox.click() + option = self.page.get_by_role("option", name=option_text) + option.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + option.click() + option.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + + # endregion + + # region: Entity Picker + + def select_entity(self, field_label: str, search_term: str, entity_name: str) -> None: + """Select entity from HA entity picker dialog.""" + selector = self.page.locator(f"ha-selector:has-text('{field_label}')") + picker = selector.locator("ha-combo-box-item").first + picker.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope(f"entity_{field_label}"): + self._scroll_into_view(picker) + self._capture_with_indicator("picker", picker) + + picker.click() + + dialog = self.page.get_by_role("dialog", name="Select option") + dialog.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + search_input = dialog.get_by_role("textbox", name="Search") + search_input.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + self._capture_with_indicator("search_box", search_input) + + search_input.fill(search_term) + + result_item = dialog.locator(f":text('{entity_name}')").first + result_item.wait_for(state="visible", timeout=SEARCH_TIMEOUT) + self._capture("search_results") + self._scroll_into_view(result_item) + self._capture_with_indicator("select", result_item) + + result_item.click(timeout=DEFAULT_TIMEOUT) + dialog.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + self._capture("selected") + else: + self._select_entity_no_capture(picker, search_term, entity_name) + + def add_another_entity(self, field_label: str, search_term: str, entity_name: str) -> None: + """Add another entity to multi-select field.""" + selector = self.page.locator(f"ha-selector:has-text('{field_label}')") + add_btn = selector.get_by_role("button", name="Add entity") + add_btn.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope(f"add_entity_{field_label}"): + self._scroll_into_view(add_btn) + self._capture_with_indicator("add_button", add_btn) + + add_btn.click(timeout=DEFAULT_TIMEOUT) + + dialog = self.page.get_by_role("dialog", name="Select option") + dialog.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + search_input = dialog.get_by_role("textbox", name="Search") + search_input.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + self._capture_with_indicator("search_box", search_input) + + search_input.fill(search_term) + + result_item = dialog.locator(f":text('{entity_name}')").first + result_item.wait_for(state="visible", timeout=SEARCH_TIMEOUT) + self._capture("search_results") + self._scroll_into_view(result_item) + self._capture_with_indicator("select", result_item) + + result_item.click(timeout=DEFAULT_TIMEOUT) + dialog.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + self._capture("selected") + else: + add_btn.click(timeout=DEFAULT_TIMEOUT) + dialog = self.page.get_by_role("dialog", name="Select option") + dialog.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + self._select_entity_no_capture( + dialog.get_by_role("textbox", name="Search"), + search_term, + entity_name, + already_in_dialog=True, + ) + + def _select_entity_no_capture( + self, + picker_or_search: Any, + search_term: str, + entity_name: str, + *, + already_in_dialog: bool = False, + ) -> None: + """Entity selection without screenshots.""" + if not already_in_dialog: + picker_or_search.click() + dialog = self.page.get_by_role("dialog", name="Select option") + dialog.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + search_input = dialog.get_by_role("textbox", name="Search") + else: + search_input = picker_or_search + dialog = self.page.get_by_role("dialog", name="Select option") + + search_input.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + search_input.fill(search_term) + + result_item = dialog.locator(f":text('{entity_name}')").first + result_item.wait_for(state="visible", timeout=SEARCH_TIMEOUT) + result_item.click(timeout=DEFAULT_TIMEOUT) + dialog.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + + # endregion + + # region: Dialogs + + def close_element_dialog(self) -> None: + """Close element creation success dialog.""" + button = self.page.get_by_role("button", name="Finish") + button.wait_for(state="visible", timeout=SEARCH_TIMEOUT) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope("finish_dialog"): + self._scroll_into_view(button) + self._capture_with_indicator("button", button) + button.click(timeout=DEFAULT_TIMEOUT) + button.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + else: + button.click(timeout=DEFAULT_TIMEOUT) + button.wait_for(state="hidden", timeout=DEFAULT_TIMEOUT) + + _LOGGER.info("Dialog closed successfully") + + def wait_for_dialog(self, title: str) -> None: + """Wait for dialog with given title to appear.""" + dialog = self.page.get_by_title(title) + dialog.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + self._capture("dialog_opened") + + def submit(self) -> None: + """Click Submit button.""" + self.click_button("Submit") + + # endregion + + # region: Integration Search + + def search_integration(self, integration_name: str) -> None: + """Search for and select integration from add dialog.""" + search_box = self.page.get_by_role("textbox", name="Search for a brand name") + search_box.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope("search_integration"): + self._capture("dialog") + self._capture_with_indicator("search_box", search_box) + + search_box.click() + search_box.fill(integration_name) + + item = self.page.locator("ha-integration-list-item", has_text=integration_name) + item.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + self._capture("results") + self._capture_with_indicator("select", item) + + item.click(timeout=DEFAULT_TIMEOUT) + else: + search_box.click() + search_box.fill(integration_name) + item = self.page.locator("ha-integration-list-item", has_text=integration_name) + item.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + item.click(timeout=DEFAULT_TIMEOUT) + + def click_add_integration(self) -> None: + """Click the Add integration button.""" + add_btn = self.page.locator("ha-button").get_by_role("button", name="Add integration") + add_btn.wait_for(state="visible", timeout=DEFAULT_TIMEOUT) + + ctx = ScreenshotContext.current() + if ctx: + with ctx.scope("add_integration"): + self._capture("page") + self._capture_with_indicator("button", add_btn) + add_btn.click() + else: + add_btn.click() + + # endregion diff --git a/tests/guides/primitives/haeo.py b/tests/guides/primitives/haeo.py new file mode 100644 index 000000000..93daf04a9 --- /dev/null +++ b/tests/guides/primitives/haeo.py @@ -0,0 +1,313 @@ +"""HAEO element primitives for guide automation. + +High-level functions that add elements to an HAEO network. +Each function is decorated with @guide_step for automatic screenshot naming. + +All entity parameters are tuples of (search_term, display_name). +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from .capture import ScreenshotContext, guide_step + +if TYPE_CHECKING: + from .ha_page import HAPage + +_LOGGER = logging.getLogger(__name__) + +# Type alias for entity selection: (search_term, display_name) +Entity = tuple[str, str] + + +@guide_step +def login(page: HAPage) -> None: + """Log in to Home Assistant.""" + _LOGGER.info("Logging in...") + page.goto("/") + + if "/auth/authorize" in page.page.url: + page.fill_textbox("Username", "testuser") + page.fill_textbox("Password", "testpass") + page.click_button("Log in") + page.page.wait_for_url("**/lovelace/**", timeout=10000) + + _LOGGER.info("Logged in") + + +@guide_step +def add_integration(page: HAPage, *, network_name: str) -> None: + """Add HAEO integration to Home Assistant.""" + _LOGGER.info("Adding HAEO integration: %s", network_name) + + page.goto("/config/integrations") + page.click_add_integration() + page.search_integration("HAEO") + + page.wait_for_dialog("HAEO Setup") + page.fill_textbox("System Name", network_name) + page.submit() + page.wait_for_load() + + page.goto("/config/integrations/integration/haeo") + page._capture("integration_page") + + _LOGGER.info("HAEO integration added") + + +@guide_step +def add_inverter( + page: HAPage, + *, + name: str, + connection: str, + max_power_dc_to_ac: Entity, + max_power_ac_to_dc: Entity, +) -> None: + """Add inverter element to HAEO network.""" + _LOGGER.info("Adding Inverter: %s", name) + + page.goto("/config/integrations/integration/haeo") + page.click_button("Inverter") + page.wait_for_dialog("Inverter Configuration") + + page.fill_textbox("Inverter Name", name) + page.select_combobox("AC Connection", connection) + + page.select_entity("Max DC to AC Power", max_power_dc_to_ac[0], max_power_dc_to_ac[1]) + page.select_entity("Max AC to DC Power", max_power_ac_to_dc[0], max_power_ac_to_dc[1]) + + page.submit() + page.close_element_dialog() + + _LOGGER.info("Inverter added: %s", name) + + +@guide_step +def add_battery( + page: HAPage, + *, + name: str, + connection: str, + capacity: Entity, + initial_soc: Entity, + max_charge_power: Entity | None = None, + max_discharge_power: Entity | None = None, + min_charge_level: int | None = None, + max_charge_level: int | None = None, +) -> None: + """Add battery element to HAEO network.""" + _LOGGER.info("Adding Battery: %s", name) + + page.goto("/config/integrations/integration/haeo") + page.click_button("Battery") + page.wait_for_dialog("Battery Configuration") + + page.fill_textbox("Battery Name", name) + page.select_combobox("Connection", connection) + + page.select_entity("Capacity", capacity[0], capacity[1]) + page.select_entity("State of Charge", initial_soc[0], initial_soc[1]) + + if max_charge_power: + page.select_entity("Max Charging Power", max_charge_power[0], max_charge_power[1]) + + if max_discharge_power: + page.select_entity("Max Discharging Power", max_discharge_power[0], max_discharge_power[1]) + + page.submit() + + # Step 2: min/max charge levels if present + _handle_step2(page, min_charge_level=min_charge_level, max_charge_level=max_charge_level) + + page.close_element_dialog() + + _LOGGER.info("Battery added: %s", name) + + +@guide_step +def add_solar( + page: HAPage, + *, + name: str, + connection: str, + forecasts: list[Entity], +) -> None: + """Add solar element to HAEO network.""" + _LOGGER.info("Adding Solar: %s", name) + + page.goto("/config/integrations/integration/haeo") + page.click_button("Solar") + page.wait_for_dialog("Solar Configuration") + + page.fill_textbox("Solar Name", name) + page.select_combobox("Connection", connection) + + # First forecast + if forecasts: + first = forecasts[0] + page.select_entity("Forecast", first[0], first[1]) + + # Additional forecasts + for forecast in forecasts[1:]: + page.add_another_entity("Forecast", forecast[0], forecast[1]) + + page.submit() + page.close_element_dialog() + + _LOGGER.info("Solar added: %s", name) + + +@guide_step +def add_grid( + page: HAPage, + *, + name: str, + connection: str, + import_prices: list[Entity], + export_prices: list[Entity], + import_limit: float | None = None, + export_limit: float | None = None, +) -> None: + """Add grid element to HAEO network.""" + _LOGGER.info("Adding Grid: %s", name) + + page.goto("/config/integrations/integration/haeo") + page.click_button("Grid") + page.wait_for_dialog("Grid Configuration") + + page.fill_textbox("Grid Name", name) + page.select_combobox("Connection", connection) + + # Import prices + if import_prices: + first = import_prices[0] + page.select_entity("Import Price", first[0], first[1]) + for price in import_prices[1:]: + page.add_another_entity("Import Price", price[0], price[1]) + + # Export prices + if export_prices: + first = export_prices[0] + page.select_entity("Export Price", first[0], first[1]) + for price in export_prices[1:]: + page.add_another_entity("Export Price", price[0], price[1]) + + page.submit() + + # Step 2: import/export limits + _handle_step2(page, import_limit=import_limit, export_limit=export_limit) + + page.close_element_dialog() + + _LOGGER.info("Grid added: %s", name) + + +@guide_step +def add_load( + page: HAPage, + *, + name: str, + connection: str, + forecast: Entity | None = None, + constant_value: float | None = None, +) -> None: + """Add load element to HAEO network.""" + _LOGGER.info("Adding Load: %s", name) + + page.goto("/config/integrations/integration/haeo") + page.click_button("Load") + page.wait_for_dialog("Load Configuration") + + page.fill_textbox("Load Name", name) + page.select_combobox("Connection", connection) + + if forecast: + page.select_entity("Forecast", forecast[0], forecast[1]) + + page.submit() + + # Step 2: constant value if configurable entity was selected + _handle_step2(page, constant_value=constant_value) + + page.close_element_dialog() + + _LOGGER.info("Load added: %s", name) + + +@guide_step +def add_node(page: HAPage, *, name: str) -> None: + """Add node element to HAEO network.""" + _LOGGER.info("Adding Node: %s", name) + + page.goto("/config/integrations/integration/haeo") + page.click_button("Node") + page.wait_for_dialog("Node Configuration") + + page.fill_textbox("Node Name", name) + + page.submit() + page.close_element_dialog() + + _LOGGER.info("Node added: %s", name) + + +@guide_step +def verify_setup(page: HAPage) -> None: + """Verify the HAEO setup is complete.""" + _LOGGER.info("Verifying setup...") + + page.goto("/config/integrations/integration/haeo") + page.page.get_by_role("button", name="Inverter").first.wait_for(state="visible", timeout=5000) + page._capture("final_overview") + + _LOGGER.info("Setup verified") + + +# Helper functions + + +def _handle_step2(page: HAPage, **fields: Any) -> None: + """Handle step 2 spinbuttons if present.""" + submit = page.page.get_by_role("button", name="Submit") + if submit.count() == 0: + return + + try: + if not submit.is_visible(timeout=1000): + return + except Exception: + return + + # Map field names to form labels + field_mappings = { + "min_charge_level": "Min Charge Level", + "max_charge_level": "Max Charge Level", + "import_limit": "Import Limit", + "export_limit": "Export Limit", + "constant_value": "Forecast", + } + + ctx = ScreenshotContext.current() + if ctx: + ctx.push("step2") + + try: + for field_name, value in fields.items(): + if value is not None: + label = field_mappings.get(field_name) + if label: + spinbutton = page.page.get_by_role("spinbutton", name=label) + if spinbutton.count() > 0: + try: + if spinbutton.is_visible(timeout=1000): + page.fill_spinbutton(label, str(value)) + except Exception: + pass + + page.submit() + finally: + if ctx: + ctx.pop() diff --git a/tests/guides/primitives/js/click_indicator.js b/tests/guides/primitives/js/click_indicator.js new file mode 100644 index 000000000..c66e7e361 --- /dev/null +++ b/tests/guides/primitives/js/click_indicator.js @@ -0,0 +1,91 @@ +/** + * Click indicator overlay script for screenshot capture. + * + * Creates a visual overlay positioned at the target element's bounding box + * using the popover API for top-layer placement. This avoids clipping issues + * from parent overflow:hidden. + */ +(el, clickableSelector) => { + if (el.focus) { + try { + el.focus(); + } catch (e) {} + } + + let target = el; + const minSize = 20; + const rect = el.getBoundingClientRect(); + + if (rect.width < minSize || rect.height < minSize) { + const clickableParent = el.closest(clickableSelector); + if (clickableParent) target = clickableParent; + } + + const mdcTextField = el.closest("label.mdc-text-field"); + if (mdcTextField) target = mdcTextField; + + const comboBoxRow = el.closest(".combo-box-row"); + if (comboBoxRow) target = comboBoxRow; + + const comboBoxItem = el.closest("ha-combo-box-item"); + if (comboBoxItem) { + const row = comboBoxItem.closest(".combo-box-row"); + target = row || comboBoxItem; + } + + const entityListItem = el.closest( + "ha-list-item, mwc-list-item, md-list-item, " + + '[role="listitem"], [role="option"]' + ); + if (entityListItem) { + const listItemParent = entityListItem.closest( + "ha-list-item, mwc-list-item, md-list-item, md-item" + ); + target = listItemParent || entityListItem; + } + + const haListItem = el.closest("ha-list-item"); + if (haListItem) target = haListItem; + + const mdItem = el.closest("md-item"); + if (mdItem) target = mdItem; + + const integrationItem = el.closest("ha-integration-list-item"); + if (integrationItem) target = integrationItem; + + const roleItem = el.closest('[role="listitem"], [role="option"]'); + if (roleItem) { + const haWrapper = roleItem.closest( + "ha-list-item, md-item, mwc-list-item, .combo-box-row" + ); + target = haWrapper || roleItem; + } + + const targetRect = target.getBoundingClientRect(); + const computedStyle = getComputedStyle(target); + const borderRadius = computedStyle.borderRadius || "0px"; + + const overlay = document.createElement("div"); + overlay.id = "click-indicator-overlay"; + overlay.setAttribute("popover", "manual"); + overlay.style.cssText = ` + position: fixed; + left: ${targetRect.left - 3}px; + top: ${targetRect.top - 3}px; + width: ${targetRect.width + 6}px; + height: ${targetRect.height + 6}px; + border: 3px solid rgba(255, 0, 0, 0.9); + border-radius: ${borderRadius}; + box-shadow: 0 0 15px 5px rgba(255, 0, 0, 0.4); + pointer-events: none; + z-index: 2147483647; + margin: 0; + padding: 0; + background: transparent; + box-sizing: border-box; + `; + document.body.appendChild(overlay); + try { + overlay.showPopover(); + } catch (e) {} +}; diff --git a/tests/guides/sigenergy/__init__.py b/tests/guides/sigenergy/__init__.py new file mode 100644 index 000000000..f11237165 --- /dev/null +++ b/tests/guides/sigenergy/__init__.py @@ -0,0 +1 @@ +"""Sigenergy system setup guide tests.""" diff --git a/tests/guides/sigenergy/guide.py b/tests/guides/sigenergy/guide.py new file mode 100644 index 000000000..aa9a72b9f --- /dev/null +++ b/tests/guides/sigenergy/guide.py @@ -0,0 +1,189 @@ +"""Sigenergy system setup guide. + +This guide walks through setting up a complete Sigenergy home battery system. +All configuration is inline - you can follow along step by step. + +Run with: + uv run python tests/guides/sigenergy/guide.py + +Screenshots are automatically captured and named hierarchically based on +the function call stack, e.g. "add_battery.entity_Capacity.search_results" +""" + +from __future__ import annotations + +import logging +from collections import OrderedDict +from pathlib import Path +import shutil +import sys +from typing import Any + +from playwright.sync_api import sync_playwright + +from tests.guides.ha_runner import LiveHomeAssistant, live_home_assistant +from tests.guides.primitives import ( + HAPage, + add_battery, + add_grid, + add_integration, + add_inverter, + add_load, + add_solar, + login, + screenshot_context, + verify_setup, +) + +_LOGGER = logging.getLogger(__name__) + +# Paths +GUIDE_DIR = Path(__file__).parent +PROJECT_ROOT = GUIDE_DIR.parent.parent.parent +INPUTS_FILE = PROJECT_ROOT / "tests" / "scenarios" / "scenario1" / "inputs.json" +SCREENSHOTS_DIR = GUIDE_DIR / "screenshots" + + +def run_guide( + hass: LiveHomeAssistant, + output_dir: Path, + *, + headless: bool = True, + dark_mode: bool = False, +) -> OrderedDict[str, Path]: + """Run the Sigenergy setup guide. + + Returns an OrderedDict of screenshot names to paths. + """ + with sync_playwright() as p: + browser = p.chromium.launch( + headless=headless, + args=["--remote-debugging-port=9222"], + ) + context = browser.new_context(viewport={"width": 1280, "height": 800}) + hass.inject_auth(context, dark_mode=dark_mode) + page_obj = context.new_page() + page_obj.set_default_timeout(5000) + + try: + page = HAPage(page=page_obj, url=hass.url) + + with screenshot_context(output_dir) as ctx: + # Step 1: Login + login(page) + + # Step 2: Add HAEO integration with network name + add_integration( + page, + network_name="Sigenergy System", + ) + + # Step 3: Add Inverter + # The inverter connects the DC side (battery, solar) to AC (switchboard) + add_inverter( + page, + name="Inverter", + connection="Switchboard", + max_power_dc_to_ac=("max active power", "Sigen Plant Max Active Power"), + max_power_ac_to_dc=("max active power", "Sigen Plant Max Active Power"), + ) + + # Step 4: Add Battery + # A Sigenergy SigenStor battery with typical home storage capacity + add_battery( + page, + name="Battery", + connection="Inverter", + capacity=("rated energy", "Rated Energy Capacity"), + initial_soc=("state of charge", "Battery State of Charge"), + max_charge_power=("rated charging", "Rated Charging Power"), + max_discharge_power=("rated discharging", "Rated Discharging Power"), + min_charge_level=10, + max_charge_level=100, + ) + + # Step 5: Add Solar + # Multiple solar arrays facing different directions + add_solar( + page, + name="Solar", + connection="Inverter", + forecasts=[ + ("east solar today", "East solar production forecast"), + ("north solar today", "North solar production forecast"), + ("south solar today", "South solar prediction forecast"), + ("west solar today", "West solar production forecast"), + ], + ) + + # Step 6: Add Grid + # Grid connection with Amber Electric pricing + add_grid( + page, + name="Grid", + connection="Switchboard", + import_prices=[ + ("general price", "Home - General Price"), + ("general forecast", "Home - General Forecast"), + ], + export_prices=[ + ("feed in price", "Home - Feed In Price"), + ("feed in forecast", "Home - Feed In Forecast"), + ], + import_limit=55, + export_limit=30, + ) + + # Step 7: Add Load + # Constant base load (things that are always on) + add_load( + page, + name="Constant Load", + connection="Switchboard", + forecast=("configurable", "Configurable Entity"), + constant_value=1, + ) + + # Step 8: Verify setup + verify_setup(page) + + return ctx.screenshots + + except Exception: + _LOGGER.exception("Error running guide") + error_path = output_dir / "error_state.png" + page_obj.screenshot(path=str(error_path)) + raise + + finally: + browser.close() + + +def main() -> None: + """Run the guide as a standalone script.""" + pause_mode = "--pause" in sys.argv + headless = not pause_mode + + logging.basicConfig(level=logging.INFO, format="%(message)s") + + _LOGGER.info("Sigenergy System Setup Guide") + _LOGGER.info("=" * 50) + + if SCREENSHOTS_DIR.exists(): + shutil.rmtree(SCREENSHOTS_DIR) + SCREENSHOTS_DIR.mkdir(parents=True) + + with live_home_assistant(timeout=120) as hass: + _LOGGER.info("Home Assistant running at %s", hass.url) + hass.load_states_from_file(INPUTS_FILE) + + screenshots = run_guide(hass, SCREENSHOTS_DIR, headless=headless) + + _LOGGER.info("=" * 50) + _LOGGER.info("Guide complete! %d screenshots captured:", len(screenshots)) + for name in screenshots: + _LOGGER.info(" %s", name) + + +if __name__ == "__main__": + main() diff --git a/tests/guides/sigenergy/test_sigenergy_guide.py b/tests/guides/sigenergy/test_sigenergy_guide.py new file mode 100644 index 000000000..e5cef01f6 --- /dev/null +++ b/tests/guides/sigenergy/test_sigenergy_guide.py @@ -0,0 +1,106 @@ +"""Pytest test for Sigenergy guide. + +This test runs the Sigenergy setup guide through Playwright browser automation, +validating that all configuration steps complete successfully and capturing +screenshots for documentation. + +Run with: + uv run pytest tests/guides/sigenergy/test_sigenergy_guide.py -m guide -v +""" + +from __future__ import annotations + +from collections.abc import Generator +import datetime +import logging +import shutil + +from homeassistant.config_entries import ConfigEntry +from homeassistant.util import dt as dt_util +import pytest + +from tests.guides.ha_runner import live_home_assistant +from tests.guides.sigenergy.guide import INPUTS_FILE, SCREENSHOTS_DIR, run_guide + +_LOGGER = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True) +def _restore_timezone() -> Generator[None]: # pyright: ignore[reportUnusedFunction] + """Restore dt_util.DEFAULT_TIME_ZONE after test. + + The live HA instance sets the timezone to ZoneInfo('UTC') via + async_set_time_zone(), but pytest-homeassistant-custom-component + expects datetime.timezone.utc at teardown. + """ + yield + # Reset to datetime.UTC which is what the pytest plugin expects + dt_util.set_default_time_zone(datetime.UTC) + + +@pytest.mark.guide +@pytest.mark.enable_socket +@pytest.mark.timeout(300) # 5 minutes for full guide run +@pytest.mark.parametrize("dark_mode", [False, True], ids=["light", "dark"]) +def test_sigenergy_guide(dark_mode: bool) -> None: + """Test the complete Sigenergy setup guide. + + This test: + 1. Starts a fresh Home Assistant instance with pre-authenticated user + 2. Loads entity states from scenario1 inputs.json + 3. Runs through all guide steps using Playwright + 4. Captures screenshots at each step + 5. Validates that all elements were created successfully + """ + # Use separate directories for light and dark mode screenshots + mode_suffix = "dark" if dark_mode else "light" + output_dir = SCREENSHOTS_DIR.parent / f"screenshots_{mode_suffix}" + + # Clean and create output directory + if output_dir.exists(): + shutil.rmtree(output_dir) + output_dir.mkdir(parents=True) + + with live_home_assistant(timeout=120) as hass: + # Load entity states from scenario1 + hass.load_states_from_file(INPUTS_FILE) + + # Run the guide + results = run_guide(hass, output_dir, headless=True, dark_mode=dark_mode) + + # Validate results + assert len(results) > 0, "No screenshots captured" + + # Verify expected elements were created by checking config entries + # Note: async_entries is synchronous despite its name (HA convention for callback methods) + + async def get_entries() -> list[ConfigEntry]: + return hass.hass.config_entries.async_entries("haeo") + + config_entries = hass.run_coro(get_entries()) + assert len(config_entries) > 0, "No HAEO config entries created" + + # Check that we have the hub entry + hub_entry = config_entries[0] + assert hub_entry.title == "Sigenergy System" + + # Check subentries for elements + subentries = list(hub_entry.subentries.values()) + element_names = {se.title for se in subentries} + + expected_elements = { + "Switchboard", + "Inverter", + "Battery", + "Solar", + "Grid", + "Constant Load", + } + assert expected_elements <= element_names, f"Missing elements: {expected_elements - element_names}" + + _LOGGER.info( + "Guide test passed (%s mode): %d screenshots saved to %s", + mode_suffix, + len(results), + output_dir, + ) diff --git a/uv.lock b/uv.lock index 7bd5b1e54..82cd12682 100644 --- a/uv.lock +++ b/uv.lock @@ -353,6 +353,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/44/d2ef5e87509158ad2187f4dd0852df80695bb1ee0cfe0a684727b01a69e0/bcrypt-5.0.0-cp39-abi3-win_arm64.whl", hash = "sha256:f2347d3534e76bf50bca5500989d6c1d05ed64b440408057a37673282c654927", size = 144953, upload-time = "2025-09-25T19:50:37.32Z" }, ] +[[package]] +name = "beautifulsoup4" +version = "4.14.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "soupsieve" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c3/b0/1c6a16426d389813b48d95e26898aff79abbde42ad353958ad95cc8c9b21/beautifulsoup4-4.14.3.tar.gz", hash = "sha256:6292b1c5186d356bba669ef9f7f051757099565ad9ada5dd630bd9de5fa7fb86", size = 627737, upload-time = "2025-11-30T15:08:26.084Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1a/39/47f9197bdd44df24d67ac8893641e16f386c984a0619ef2ee4c51fbbc019/beautifulsoup4-4.14.3-py3-none-any.whl", hash = "sha256:0918bfe44902e6ad8d57732ba310582e98da931428d231a5ecb9e7c703a735bb", size = 107721, upload-time = "2025-11-30T15:08:24.087Z" }, +] + [[package]] name = "beautysh" version = "6.4.2" @@ -1018,6 +1031,7 @@ dependencies = [ [package.dev-dependencies] dev = [ + { name = "beautifulsoup4" }, { name = "freezegun" }, { name = "matplotlib" }, { name = "mdformat" }, @@ -1032,6 +1046,7 @@ dev = [ { name = "mdformat-simple-breaks" }, { name = "mdformat-tables" }, { name = "networkx" }, + { name = "playwright" }, { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, @@ -1051,6 +1066,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ + { name = "beautifulsoup4", specifier = ">=4.14.3" }, { name = "freezegun", specifier = ">=1.5.2" }, { name = "matplotlib", specifier = ">=3.10.8" }, { name = "mdformat", specifier = ">=0.7.22" }, @@ -1065,6 +1081,7 @@ dev = [ { name = "mdformat-simple-breaks", specifier = ">=0.0.1" }, { name = "mdformat-tables", specifier = ">=1.0.0" }, { name = "networkx", specifier = ">=3.4.2" }, + { name = "playwright", specifier = ">=1.57.0" }, { name = "pyright", specifier = ">=1.1.407" }, { name = "pytest", specifier = ">=9.0.0" }, { name = "pytest-cov", specifier = ">=7.0.0" }, @@ -1969,6 +1986,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b9/a5/f9f143b420e53a296869636d1c3bdc144be498ca3136a113f52b53ea2b02/pipdeptree-2.26.1-py3-none-any.whl", hash = "sha256:3849d62a2ed641256afac3058c4f9b85ac4a47e9d8c991ee17a8f3d230c5cffb", size = 32802, upload-time = "2025-04-20T03:27:40.413Z" }, ] +[[package]] +name = "playwright" +version = "1.57.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "greenlet" }, + { name = "pyee" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/ed/b6/e17543cea8290ae4dced10be21d5a43c360096aa2cce0aa7039e60c50df3/playwright-1.57.0-py3-none-macosx_10_13_x86_64.whl", hash = "sha256:9351c1ac3dfd9b3820fe7fc4340d96c0d3736bb68097b9b7a69bd45d25e9370c", size = 41985039, upload-time = "2025-12-09T08:06:18.408Z" }, + { url = "https://files.pythonhosted.org/packages/8b/04/ef95b67e1ff59c080b2effd1a9a96984d6953f667c91dfe9d77c838fc956/playwright-1.57.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:a4a9d65027bce48eeba842408bcc1421502dfd7e41e28d207e94260fa93ca67e", size = 40775575, upload-time = "2025-12-09T08:06:22.105Z" }, + { url = "https://files.pythonhosted.org/packages/60/bd/5563850322a663956c927eefcf1457d12917e8f118c214410e815f2147d1/playwright-1.57.0-py3-none-macosx_11_0_universal2.whl", hash = "sha256:99104771abc4eafee48f47dac2369e0015516dc1ce8c409807d2dd440828b9a4", size = 41985042, upload-time = "2025-12-09T08:06:25.357Z" }, + { url = "https://files.pythonhosted.org/packages/56/61/3a803cb5ae0321715bfd5247ea871d25b32c8f372aeb70550a90c5f586df/playwright-1.57.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:284ed5a706b7c389a06caa431b2f0ba9ac4130113c3a779767dda758c2497bb1", size = 45975252, upload-time = "2025-12-09T08:06:29.186Z" }, + { url = "https://files.pythonhosted.org/packages/83/d7/b72eb59dfbea0013a7f9731878df8c670f5f35318cedb010c8a30292c118/playwright-1.57.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38a1bae6c0a07839cdeaddbc0756b3b2b85e476c07945f64ece08f1f956a86f1", size = 45706917, upload-time = "2025-12-09T08:06:32.549Z" }, + { url = "https://files.pythonhosted.org/packages/e4/09/3fc9ebd7c95ee54ba6a68d5c0bc23e449f7235f4603fc60534a364934c16/playwright-1.57.0-py3-none-win32.whl", hash = "sha256:1dd93b265688da46e91ecb0606d36f777f8eadcf7fbef12f6426b20bf0c9137c", size = 36553860, upload-time = "2025-12-09T08:06:35.864Z" }, + { url = "https://files.pythonhosted.org/packages/58/d4/dcdfd2a33096aeda6ca0d15584800443dd2be64becca8f315634044b135b/playwright-1.57.0-py3-none-win_amd64.whl", hash = "sha256:6caefb08ed2c6f29d33b8088d05d09376946e49a73be19271c8cd5384b82b14c", size = 36553864, upload-time = "2025-12-09T08:06:38.915Z" }, + { url = "https://files.pythonhosted.org/packages/6a/60/fe31d7e6b8907789dcb0584f88be741ba388413e4fbce35f1eba4e3073de/playwright-1.57.0-py3-none-win_arm64.whl", hash = "sha256:5f065f5a133dbc15e6e7c71e7bc04f258195755b1c32a432b792e28338c8335e", size = 32837940, upload-time = "2025-12-09T08:06:42.268Z" }, +] + [[package]] name = "pluggy" version = "1.6.0" @@ -2142,6 +2178,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fc/de/b20f4ab954d6d399499c33ec4fafc46d9551e11dc1858fb7f5dca0748ceb/pydantic_core-2.41.4-cp313-cp313t-win_arm64.whl", hash = "sha256:19f3684868309db5263a11bace3c45d93f6f24afa2ffe75a647583df22a2ff89", size = 1970034, upload-time = "2025-10-14T10:21:30.869Z" }, ] +[[package]] +name = "pyee" +version = "13.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/03/1fd98d5841cd7964a27d729ccf2199602fe05eb7a405c1462eb7277945ed/pyee-13.0.0.tar.gz", hash = "sha256:b391e3c5a434d1f5118a25615001dbc8f669cf410ab67d04c4d4e07c55481c37", size = 31250, upload-time = "2025-03-17T18:53:15.955Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9b/4d/b9add7c84060d4c1906abe9a7e5359f2a60f7a9a4f67268b2766673427d8/pyee-13.0.0-py3-none-any.whl", hash = "sha256:48195a3cddb3b1515ce0695ed76036b5ccc2ef3a9f963ff9f77aec0139845498", size = 15730, upload-time = "2025-03-17T18:53:14.532Z" }, +] + [[package]] name = "pygments" version = "2.19.2" @@ -2808,6 +2856,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/98/1b/83ff83003994bc8b56483c75a710de588896c167c7c42d66d059a2eb48dc/snitun-0.45.1-py3-none-any.whl", hash = "sha256:c1fa4536320ec3126926ade775c429e20664db1bc61d8fec0e181dc393d36ab4", size = 51236, upload-time = "2025-09-25T05:24:06.412Z" }, ] +[[package]] +name = "soupsieve" +version = "2.8.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/89/23/adf3796d740536d63a6fbda113d07e60c734b6ed5d3058d1e47fc0495e47/soupsieve-2.8.1.tar.gz", hash = "sha256:4cf733bc50fa805f5df4b8ef4740fc0e0fa6218cf3006269afd3f9d6d80fd350", size = 117856, upload-time = "2025-12-18T13:50:34.655Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/48/f3/b67d6ea49ca9154453b6d70b34ea22f3996b9fa55da105a79d8732227adc/soupsieve-2.8.1-py3-none-any.whl", hash = "sha256:a11fe2a6f3d76ab3cf2de04eb339c1be5b506a8a47f2ceb6d139803177f85434", size = 36710, upload-time = "2025-12-18T13:50:33.267Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.41"