From f0495efaf1c8ce6b40b9543fd6dd275b2a6f00e0 Mon Sep 17 00:00:00 2001 From: Tom Scholten Date: Sun, 13 Jul 2025 19:47:05 +0200 Subject: [PATCH] Improve session handling and ssl, bump version --- airos/airos8.py | 234 +++++++++++++++++++++++++----------------------- pyproject.toml | 3 +- 2 files changed, 121 insertions(+), 116 deletions(-) diff --git a/airos/airos8.py b/airos/airos8.py index 271696d..e5309b0 100644 --- a/airos/airos8.py +++ b/airos/airos8.py @@ -2,10 +2,8 @@ from __future__ import annotations -import asyncio import json import logging -import ssl import aiohttp @@ -17,12 +15,22 @@ class AirOS8: """Set up connection to AirOS.""" - def __init__(self, host: str, username: str, password: str): + def __init__( + self, + host: str, + username: str, + password: str, + session: aiohttp.ClientSession, + verify_ssl: bool = True, + ): """Initialize AirOS8 class.""" self.username = username self.password = password self.base_url = f"https://{host}" + self.session = session + self.verify_ssl = verify_ssl + self._login_url = f"{self.base_url}/api/auth" # AirOS 8 self._status_cgi_url = f"{self.base_url}/status.cgi" # AirOS 8 self.current_csrf_token = None @@ -45,120 +53,116 @@ def __init__(self, host: str, username: str, password: str): async def login(self) -> bool: """Log in to the device assuring cookies and tokens set correctly.""" - loop = asyncio.get_running_loop() - ssl_context = await loop.run_in_executor(None, ssl.create_default_context) - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - connector = aiohttp.TCPConnector(ssl=ssl_context) - - async with aiohttp.ClientSession(connector=connector) as self.session: - # --- Step 0: Pre-inject the 'ok=1' cookie before login POST (mimics curl) --- - self.session.cookie_jar.update_cookies({"ok": "1"}) - - # --- Step 1: Attempt Login to /api/auth (This now sets all session cookies and the CSRF token) --- - login_payload = { - "username": self.username, - "password": self.password, - } - - login_request_headers = {**self._common_headers} - - post_data = None - if self._use_json_for_login_post: - login_request_headers["Content-Type"] = "application/json" - post_data = json.dumps(login_payload) - else: - login_request_headers["Content-Type"] = ( - "application/x-www-form-urlencoded; charset=UTF-8" - ) - post_data = login_payload - - try: - async with self.session.post( - self._login_url, data=post_data, headers=login_request_headers - ) as response: - if not response.cookies: - logger.exception("Empty cookies after login, bailing out.") - raise DataMissingError from None - else: - for _, morsel in response.cookies.items(): - # If the AIROS_ cookie was parsed but isn't automatically added to the jar, add it manually - if ( - morsel.key.startswith("AIROS_") - and morsel.key not in self.session.cookie_jar - ): - # `SimpleCookie`'s Morsel objects are designed to be compatible with cookie jars. - # We need to set the domain if it's missing, otherwise the cookie might not be sent. - # For IP addresses, the domain is typically blank. - # aiohttp's jar should handle it, but for explicit control: - if not morsel.get("domain"): - morsel["domain"] = ( - response.url.host - ) # Set to the host that issued it - self.session.cookie_jar.update_cookies( - { - morsel.key: morsel.output(header="")[ - len(morsel.key) + 1 : - ] - .split(";")[0] - .strip() - }, - response.url, - ) - # The update_cookies method can take a SimpleCookie morsel directly or a dict. - # The morsel.output method gives 'NAME=VALUE; Path=...; HttpOnly' - # We just need 'NAME=VALUE' or the morsel object itself. - # Let's use the morsel directly which is more robust. - # Alternatively: self.session.cookie_jar.update_cookies({morsel.key: morsel.value}) might work if it's simpler. - # Aiohttp's update_cookies takes a dict mapping name to value. - # To pass the full morsel with its attributes, we need to add it to the jar's internal structure. - # Simpler: just ensure the key-value pair is there for simple jar. - - # Let's try the direct update of the key-value - self.session.cookie_jar.update_cookies( - {morsel.key: morsel.value} - ) - - new_csrf_token = response.headers.get("X-CSRF-ID") - if new_csrf_token: - self.current_csrf_token = new_csrf_token - else: - return - - # Re-check cookies in self.session.cookie_jar AFTER potential manual injection - airos_cookie_found = False - ok_cookie_found = False - if not self.session.cookie_jar: - logger.exception( - "COOKIE JAR IS EMPTY after login POST. This is a major issue." - ) - raise DataMissingError from None - for cookie in self.session.cookie_jar: - if cookie.key.startswith("AIROS_"): - airos_cookie_found = True - if cookie.key == "ok": - ok_cookie_found = True + # --- Step 0: Pre-inject the 'ok=1' cookie before login POST (mimics curl) --- + self.session.cookie_jar.update_cookies({"ok": "1"}) - if not airos_cookie_found and not ok_cookie_found: - raise DataMissingError from None + # --- Step 1: Attempt Login to /api/auth (This now sets all session cookies and the CSRF token) --- + login_payload = { + "username": self.username, + "password": self.password, + } - response_text = await response.text() + login_request_headers = {**self._common_headers} - if response.status == 200: - try: - json.loads(response_text) - return True - except json.JSONDecodeError as err: - logger.exception("JSON Decode Error") - raise DataMissingError from err + post_data = None + if self._use_json_for_login_post: + login_request_headers["Content-Type"] = "application/json" + post_data = json.dumps(login_payload) + else: + login_request_headers["Content-Type"] = ( + "application/x-www-form-urlencoded; charset=UTF-8" + ) + post_data = login_payload - else: - log = f"Login failed with status {response.status}. Full Response: {response.text}" - logger.error(log) - raise ConnectionFailedError from None - except aiohttp.ClientError as err: - logger.exception("Error during login") - raise ConnectionFailedError from err + try: + async with self.session.post( + self._login_url, + data=post_data, + headers=login_request_headers, + ssl=self.verify_ssl, + ) as response: + if not response.cookies: + logger.exception("Empty cookies after login, bailing out.") + raise DataMissingError from None + else: + for _, morsel in response.cookies.items(): + # If the AIROS_ cookie was parsed but isn't automatically added to the jar, add it manually + if ( + morsel.key.startswith("AIROS_") + and morsel.key not in self.session.cookie_jar + ): + # `SimpleCookie`'s Morsel objects are designed to be compatible with cookie jars. + # We need to set the domain if it's missing, otherwise the cookie might not be sent. + # For IP addresses, the domain is typically blank. + # aiohttp's jar should handle it, but for explicit control: + if not morsel.get("domain"): + morsel["domain"] = ( + response.url.host + ) # Set to the host that issued it + self.session.cookie_jar.update_cookies( + { + morsel.key: morsel.output(header="")[ + len(morsel.key) + 1 : + ] + .split(";")[0] + .strip() + }, + response.url, + ) + # The update_cookies method can take a SimpleCookie morsel directly or a dict. + # The morsel.output method gives 'NAME=VALUE; Path=...; HttpOnly' + # We just need 'NAME=VALUE' or the morsel object itself. + # Let's use the morsel directly which is more robust. + # Alternatively: self.session.cookie_jar.update_cookies({morsel.key: morsel.value}) might work if it's simpler. + # Aiohttp's update_cookies takes a dict mapping name to value. + # To pass the full morsel with its attributes, we need to add it to the jar's internal structure. + # Simpler: just ensure the key-value pair is there for simple jar. + + # Let's try the direct update of the key-value + self.session.cookie_jar.update_cookies( + {morsel.key: morsel.value} + ) + + new_csrf_token = response.headers.get("X-CSRF-ID") + if new_csrf_token: + self.current_csrf_token = new_csrf_token + else: + return + + # Re-check cookies in self.session.cookie_jar AFTER potential manual injection + airos_cookie_found = False + ok_cookie_found = False + if not self.session.cookie_jar: + logger.exception( + "COOKIE JAR IS EMPTY after login POST. This is a major issue." + ) + raise DataMissingError from None + for cookie in self.session.cookie_jar: + if cookie.key.startswith("AIROS_"): + airos_cookie_found = True + if cookie.key == "ok": + ok_cookie_found = True + + if not airos_cookie_found and not ok_cookie_found: + raise DataMissingError from None + + response_text = await response.text() + + if response.status == 200: + try: + json.loads(response_text) + return True + except json.JSONDecodeError as err: + logger.exception("JSON Decode Error") + raise DataMissingError from err + + else: + log = f"Login failed with status {response.status}. Full Response: {response.text}" + logger.error(log) + raise ConnectionFailedError from None + except aiohttp.ClientError as err: + logger.exception("Error during login") + raise ConnectionFailedError from err async def status(self) -> dict: """Retrieve status from the device.""" @@ -169,7 +173,9 @@ async def status(self) -> dict: try: async with self.session.get( - self._status_cgi_url, headers=authenticated_get_headers + self._status_cgi_url, + headers=authenticated_get_headers, + ssl=self.verify_ssl, ) as response: status_response_text = await response.text() diff --git a/pyproject.toml b/pyproject.toml index 57980ec..ca203f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "airos" -version = "0.0.3a2" +version = "0.0.4a0" license = "MIT" description = "Ubiquity airOS module(s) for Python 3." readme = "README.md" @@ -22,7 +22,6 @@ maintainers = [ requires-python = ">=3.13" dependencies = [ "aiohttp", - "munch", ] [project.urls]