Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tests/__pycache__
tmp
todo
.DS_Store
test.py
89 changes: 66 additions & 23 deletions airos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(
self.username = username
self.password = password

self.api_version: int = 8

parsed_host = urlparse(host)
scheme = (
parsed_host.scheme
Expand All @@ -74,11 +76,13 @@ def __init__(
self.current_csrf_token: str | None = None

# Mostly 8.x API endpoints, login/status are the same in 6.x
self._login_urls = {
"default": f"{self.base_url}/api/auth",
"v6_alternative": f"{self.base_url}/login.cgi",
}
self._login_url = f"{self.base_url}/api/auth"
self._status_cgi_url = f"{self.base_url}/status.cgi"

# Presumed 6.x XM only endpoint
self._v6_xm_login_url = f"{self.base_url}/login.cgi"
self._v6_form_url = "/index.cgi"

# Presumed 8.x only endpoints
self._stakick_cgi_url = f"{self.base_url}/stakick.cgi"
self._provmode_url = f"{self.base_url}/api/provmode"
Expand All @@ -88,6 +92,8 @@ def __init__(
self._download_progress_url = f"{self.base_url}/api/fw/download-progress"
self._install_url = f"{self.base_url}/fwflash.cgi"

self._login_urls = [self._login_url, self._v6_xm_login_url]

@staticmethod
def derived_wireless_data(
derived: dict[str, Any], response: dict[str, Any]
Expand Down Expand Up @@ -204,7 +210,8 @@ def _get_authenticated_headers(
headers["X-CSRF-ID"] = self._csrf_id

if self._auth_cookie: # pragma: no cover
headers["Cookie"] = f"AIROS_{self._auth_cookie}"
# headers["Cookie"] = f"AIROS_{self._auth_cookie}"
headers["Cookie"] = self._auth_cookie

return headers

Expand Down Expand Up @@ -243,7 +250,7 @@ async def _request_json(
request_headers.update(headers)

try:
if url not in self._login_urls.values() and not self.connected:
if url not in self._login_urls and not self.connected:
_LOGGER.error("Not connected, login first")
raise AirOSDeviceConnectionError from None

Expand All @@ -259,7 +266,7 @@ async def _request_json(
_LOGGER.debug("Successfully fetched JSON from %s", url)

# If this is the login request, we need to store the new auth data
if url in self._login_urls.values():
if url in self._login_urls:
self._store_auth_data(response)
self.connected = True

Expand All @@ -283,32 +290,68 @@ async def _request_json(
_LOGGER.warning("Request to %s was cancelled", url)
raise

async def _login_v6(self) -> None:
"""Login to airOS v6 (XM) devices."""
# Handle session cookie from login url
async with self.session.request(
"GET",
self._v6_xm_login_url,
allow_redirects=False,
) as response:
session_cookie = next(
(c for n, c in response.cookies.items() if n.startswith("AIROS")), None
)
if not session_cookie:
raise AirOSDeviceConnectionError("No session cookie received.")
self._auth_cookie = f"{session_cookie.key}={session_cookie.value}"

# Handle login expecting 302 redirect
payload = {
"username": self.username,
"password": self.password,
"uri": self._v6_form_url,
}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Origin": self.base_url,
"Referer": self._v6_xm_login_url,
"Cookie": self._auth_cookie,
}
async with self.session.request(
"POST",
self._v6_xm_login_url,
data=payload,
headers=headers,
allow_redirects=False,
) as response:
if response.status != 302:
raise AirOSConnectionAuthenticationError("Login failed.")

# Activate session by accessing the form URL
headers = {"Referer": self._v6_xm_login_url, "Cookie": self._auth_cookie}
async with self.session.request(
"GET",
f"{self.base_url}{self._v6_form_url}",
headers=headers,
allow_redirects=True,
) as response:
if "login.cgi" in str(response.url):
raise AirOSConnectionAuthenticationError("Session activation failed.")
self.connected = True
self.api_version = 6

async def login(self) -> None:
"""Login to AirOS device."""
payload = {"username": self.username, "password": self.password}
try:
await self._request_json(
"POST", self._login_urls["default"], json_data=payload
)
await self._request_json("POST", self._login_url, json_data=payload)
except AirOSUrlNotFoundError:
pass # Try next URL
await self._login_v6()
except AirOSConnectionSetupError as err:
raise AirOSConnectionSetupError("Failed to login to AirOS device") from err
else:
return

try: # Alternative URL
await self._request_json(
"POST",
self._login_urls["v6_alternative"],
form_data=payload,
ct_form=True,
)
except AirOSConnectionSetupError as err:
raise AirOSConnectionSetupError(
"Failed to login to default and alternate AirOS device urls"
) from err

async def status(self) -> AirOSDataModel:
"""Retrieve status from the device."""
response = await self._request_json(
Expand Down