Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 120 additions & 114 deletions airos/airos8.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

from __future__ import annotations

import asyncio
import json
import logging
import ssl

import aiohttp

Expand All @@ -17,12 +15,22 @@
class AirOS8:
"""Set up connection to AirOS."""

def __init__(self, host: str, username: str, password: str):
def __init__(
self,
host: str,
username: str,
password: str,
session: aiohttp.ClientSession,
verify_ssl: bool = True,
):
"""Initialize AirOS8 class."""
self.username = username
self.password = password
self.base_url = f"https://{host}"

self.session = session
self.verify_ssl = verify_ssl

self._login_url = f"{self.base_url}/api/auth" # AirOS 8
self._status_cgi_url = f"{self.base_url}/status.cgi" # AirOS 8
self.current_csrf_token = None
Expand All @@ -45,120 +53,116 @@ def __init__(self, host: str, username: str, password: str):

async def login(self) -> bool:
"""Log in to the device assuring cookies and tokens set correctly."""
loop = asyncio.get_running_loop()
ssl_context = await loop.run_in_executor(None, ssl.create_default_context)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_context)

async with aiohttp.ClientSession(connector=connector) as self.session:
# --- Step 0: Pre-inject the 'ok=1' cookie before login POST (mimics curl) ---
self.session.cookie_jar.update_cookies({"ok": "1"})

# --- Step 1: Attempt Login to /api/auth (This now sets all session cookies and the CSRF token) ---
login_payload = {
"username": self.username,
"password": self.password,
}

login_request_headers = {**self._common_headers}

post_data = None
if self._use_json_for_login_post:
login_request_headers["Content-Type"] = "application/json"
post_data = json.dumps(login_payload)
else:
login_request_headers["Content-Type"] = (
"application/x-www-form-urlencoded; charset=UTF-8"
)
post_data = login_payload

try:
async with self.session.post(
self._login_url, data=post_data, headers=login_request_headers
) as response:
if not response.cookies:
logger.exception("Empty cookies after login, bailing out.")
raise DataMissingError from None
else:
for _, morsel in response.cookies.items():
# If the AIROS_ cookie was parsed but isn't automatically added to the jar, add it manually
if (
morsel.key.startswith("AIROS_")
and morsel.key not in self.session.cookie_jar
):
# `SimpleCookie`'s Morsel objects are designed to be compatible with cookie jars.
# We need to set the domain if it's missing, otherwise the cookie might not be sent.
# For IP addresses, the domain is typically blank.
# aiohttp's jar should handle it, but for explicit control:
if not morsel.get("domain"):
morsel["domain"] = (
response.url.host
) # Set to the host that issued it
self.session.cookie_jar.update_cookies(
{
morsel.key: morsel.output(header="")[
len(morsel.key) + 1 :
]
.split(";")[0]
.strip()
},
response.url,
)
# The update_cookies method can take a SimpleCookie morsel directly or a dict.
# The morsel.output method gives 'NAME=VALUE; Path=...; HttpOnly'
# We just need 'NAME=VALUE' or the morsel object itself.
# Let's use the morsel directly which is more robust.
# Alternatively: self.session.cookie_jar.update_cookies({morsel.key: morsel.value}) might work if it's simpler.
# Aiohttp's update_cookies takes a dict mapping name to value.
# To pass the full morsel with its attributes, we need to add it to the jar's internal structure.
# Simpler: just ensure the key-value pair is there for simple jar.

# Let's try the direct update of the key-value
self.session.cookie_jar.update_cookies(
{morsel.key: morsel.value}
)

new_csrf_token = response.headers.get("X-CSRF-ID")
if new_csrf_token:
self.current_csrf_token = new_csrf_token
else:
return

# Re-check cookies in self.session.cookie_jar AFTER potential manual injection
airos_cookie_found = False
ok_cookie_found = False
if not self.session.cookie_jar:
logger.exception(
"COOKIE JAR IS EMPTY after login POST. This is a major issue."
)
raise DataMissingError from None
for cookie in self.session.cookie_jar:
if cookie.key.startswith("AIROS_"):
airos_cookie_found = True
if cookie.key == "ok":
ok_cookie_found = True
# --- Step 0: Pre-inject the 'ok=1' cookie before login POST (mimics curl) ---
self.session.cookie_jar.update_cookies({"ok": "1"})

if not airos_cookie_found and not ok_cookie_found:
raise DataMissingError from None
# --- Step 1: Attempt Login to /api/auth (This now sets all session cookies and the CSRF token) ---
login_payload = {
"username": self.username,
"password": self.password,
}

response_text = await response.text()
login_request_headers = {**self._common_headers}

if response.status == 200:
try:
json.loads(response_text)
return True
except json.JSONDecodeError as err:
logger.exception("JSON Decode Error")
raise DataMissingError from err
post_data = None
if self._use_json_for_login_post:
login_request_headers["Content-Type"] = "application/json"
post_data = json.dumps(login_payload)
else:
login_request_headers["Content-Type"] = (
"application/x-www-form-urlencoded; charset=UTF-8"
)
post_data = login_payload

else:
log = f"Login failed with status {response.status}. Full Response: {response.text}"
logger.error(log)
raise ConnectionFailedError from None
except aiohttp.ClientError as err:
logger.exception("Error during login")
raise ConnectionFailedError from err
try:
async with self.session.post(
self._login_url,
data=post_data,
headers=login_request_headers,
ssl=self.verify_ssl,
) as response:
if not response.cookies:
logger.exception("Empty cookies after login, bailing out.")
raise DataMissingError from None
else:
for _, morsel in response.cookies.items():
# If the AIROS_ cookie was parsed but isn't automatically added to the jar, add it manually
if (
morsel.key.startswith("AIROS_")
and morsel.key not in self.session.cookie_jar
):
# `SimpleCookie`'s Morsel objects are designed to be compatible with cookie jars.
# We need to set the domain if it's missing, otherwise the cookie might not be sent.
# For IP addresses, the domain is typically blank.
# aiohttp's jar should handle it, but for explicit control:
if not morsel.get("domain"):
morsel["domain"] = (
response.url.host
) # Set to the host that issued it
self.session.cookie_jar.update_cookies(
{
morsel.key: morsel.output(header="")[
len(morsel.key) + 1 :
]
.split(";")[0]
.strip()
},
response.url,
)
# The update_cookies method can take a SimpleCookie morsel directly or a dict.
# The morsel.output method gives 'NAME=VALUE; Path=...; HttpOnly'
# We just need 'NAME=VALUE' or the morsel object itself.
# Let's use the morsel directly which is more robust.
# Alternatively: self.session.cookie_jar.update_cookies({morsel.key: morsel.value}) might work if it's simpler.
# Aiohttp's update_cookies takes a dict mapping name to value.
# To pass the full morsel with its attributes, we need to add it to the jar's internal structure.
# Simpler: just ensure the key-value pair is there for simple jar.

# Let's try the direct update of the key-value
self.session.cookie_jar.update_cookies(
{morsel.key: morsel.value}
)

new_csrf_token = response.headers.get("X-CSRF-ID")
if new_csrf_token:
self.current_csrf_token = new_csrf_token
else:
return

# Re-check cookies in self.session.cookie_jar AFTER potential manual injection
airos_cookie_found = False
ok_cookie_found = False
if not self.session.cookie_jar:
logger.exception(
"COOKIE JAR IS EMPTY after login POST. This is a major issue."
)
raise DataMissingError from None
for cookie in self.session.cookie_jar:
if cookie.key.startswith("AIROS_"):
airos_cookie_found = True
if cookie.key == "ok":
ok_cookie_found = True

if not airos_cookie_found and not ok_cookie_found:
raise DataMissingError from None

response_text = await response.text()

if response.status == 200:
try:
json.loads(response_text)
return True
except json.JSONDecodeError as err:
logger.exception("JSON Decode Error")
raise DataMissingError from err

else:
log = f"Login failed with status {response.status}. Full Response: {response.text}"
logger.error(log)
raise ConnectionFailedError from None
except aiohttp.ClientError as err:
logger.exception("Error during login")
raise ConnectionFailedError from err

async def status(self) -> dict:
"""Retrieve status from the device."""
Expand All @@ -169,7 +173,9 @@ async def status(self) -> dict:

try:
async with self.session.get(
self._status_cgi_url, headers=authenticated_get_headers
self._status_cgi_url,
headers=authenticated_get_headers,
ssl=self.verify_ssl,
) as response:
status_response_text = await response.text()

Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "airos"
version = "0.0.3a2"
version = "0.0.4a0"
license = "MIT"
description = "Ubiquity airOS module(s) for Python 3."
readme = "README.md"
Expand All @@ -22,7 +22,6 @@ maintainers = [
requires-python = ">=3.13"
dependencies = [
"aiohttp",
"munch",
]

[project.urls]
Expand Down