@@ -55,6 +55,8 @@ def __init__(
5555 self .username = username
5656 self .password = password
5757
58+ self .api_version : int = 8
59+
5860 parsed_host = urlparse (host )
5961 scheme = (
6062 parsed_host .scheme
@@ -74,11 +76,13 @@ def __init__(
7476 self .current_csrf_token : str | None = None
7577
7678 # Mostly 8.x API endpoints, login/status are the same in 6.x
77- self ._login_urls = {
78- "default" : f"{ self .base_url } /api/auth" ,
79- "v6_alternative" : f"{ self .base_url } /login.cgi" ,
80- }
79+ self ._login_url = f"{ self .base_url } /api/auth"
8180 self ._status_cgi_url = f"{ self .base_url } /status.cgi"
81+
82+ # Presumed 6.x XM only endpoint
83+ self ._v6_xm_login_url = f"{ self .base_url } /login.cgi"
84+ self ._v6_form_url = "/index.cgi"
85+
8286 # Presumed 8.x only endpoints
8387 self ._stakick_cgi_url = f"{ self .base_url } /stakick.cgi"
8488 self ._provmode_url = f"{ self .base_url } /api/provmode"
@@ -88,8 +92,10 @@ def __init__(
8892 self ._download_progress_url = f"{ self .base_url } /api/fw/download-progress"
8993 self ._install_url = f"{ self .base_url } /fwflash.cgi"
9094
95+ self ._login_urls = [self ._login_url , self ._v6_xm_login_url ]
96+
9197 @staticmethod
92- def derived_wireless_data (
98+ def _derived_wireless_data (
9399 derived : dict [str , Any ], response : dict [str , Any ]
94100 ) -> dict [str , Any ]:
95101 """Add derived wireless data to the device response."""
@@ -129,7 +135,7 @@ def _derived_data_helper(
129135 sku = UispAirOSProductMapper ().get_sku_by_devmodel (devmodel )
130136 except KeyError :
131137 _LOGGER .warning (
132- "Unknown SKU/Model ID for %s . Please report at "
138+ "Unknown SKU/Model ID for '%s' . Please report at "
133139 "https://github.com/CoMPaTech/python-airos/issues so we can add support." ,
134140 devmodel ,
135141 )
@@ -152,41 +158,71 @@ def _derived_data_helper(
152158 "mode" : DerivedWirelessMode .PTP ,
153159 "sku" : sku ,
154160 }
161+
155162 # WIRELESS
156163 derived = derived_wireless_data_func (derived , response )
157164
158- # INTERFACES
159- addresses = {}
160- interface_order = ["br0" , "eth0" , "ath0" ]
161-
165+ # Interfaces / MAC (for unique id)
162166 interfaces = response .get ("interfaces" , [])
163-
164167 # No interfaces, no mac, no usability
165168 if not interfaces :
166169 _LOGGER .error ("Failed to determine interfaces from AirOS data" )
167170 raise AirOSKeyDataMissingError from None
168171
169- for interface in interfaces :
170- if interface ["enabled" ]: # Only consider if enabled
171- addresses [interface ["ifname" ]] = interface ["hwaddr" ]
172-
173- # Fallback take fist alternate interface found
174- derived ["mac" ] = interfaces [0 ]["hwaddr" ]
175- derived ["mac_interface" ] = interfaces [0 ]["ifname" ]
172+ derived ["mac" ] = AirOS .get_mac (interfaces )["mac" ]
173+ derived ["mac_interface" ] = AirOS .get_mac (interfaces )["mac_interface" ]
176174
177- for interface in interface_order :
178- if interface in addresses :
179- derived ["mac" ] = addresses [interface ]
180- derived ["mac_interface" ] = interface
181- break
175+ # Firmware Major Version
176+ fwversion = (response .get ("host" ) or {}).get ("fwversion" , "invalid" )
177+ derived ["fw_major" ] = AirOS .get_fw_major (fwversion )
182178
183179 response ["derived" ] = derived
184180
185181 return response
186182
187- def derived_data (self , response : dict [str , Any ]) -> dict [str , Any ]:
183+ @staticmethod
184+ def get_fw_major (fwversion : str ) -> int :
185+ """Extract major firmware version from fwversion string."""
186+ try :
187+ return int (fwversion .lstrip ("v" ).split ("." , 1 )[0 ])
188+ except (ValueError , AttributeError ) as err :
189+ _LOGGER .error ("Invalid firmware version '%s'" , fwversion )
190+ raise AirOSKeyDataMissingError ("invalid fwversion" ) from err
191+
192+ @staticmethod
193+ def get_mac (interfaces : list [dict [str , Any ]]) -> dict [str , str ]:
194+ """Extract MAC address from interfaces."""
195+ result : dict [str , str ] = {"mac" : "" , "mac_interface" : "" }
196+
197+ if not interfaces :
198+ return result
199+
200+ addresses : dict [str , str ] = {}
201+ interface_order = ["br0" , "eth0" , "ath0" ]
202+
203+ for interface in interfaces :
204+ if (
205+ interface .get ("enabled" )
206+ and interface .get ("hwaddr" )
207+ and interface .get ("ifname" )
208+ ):
209+ addresses [interface ["ifname" ]] = interface ["hwaddr" ]
210+
211+ for preferred in interface_order :
212+ if preferred in addresses :
213+ result ["mac" ] = addresses [preferred ]
214+ result ["mac_interface" ] = preferred
215+ break
216+ else :
217+ result ["mac" ] = interfaces [0 ].get ("hwaddr" , "" )
218+ result ["mac_interface" ] = interfaces [0 ].get ("ifname" , "" )
219+
220+ return result
221+
222+ @classmethod
223+ def derived_data (cls , response : dict [str , Any ]) -> dict [str , Any ]:
188224 """Add derived data to the device response (instance method for polymorphism)."""
189- return self ._derived_data_helper (response , self . derived_wireless_data )
225+ return cls ._derived_data_helper (response , cls . _derived_wireless_data )
190226
191227 def _get_authenticated_headers (
192228 self ,
@@ -204,7 +240,8 @@ def _get_authenticated_headers(
204240 headers ["X-CSRF-ID" ] = self ._csrf_id
205241
206242 if self ._auth_cookie : # pragma: no cover
207- headers ["Cookie" ] = f"AIROS_{ self ._auth_cookie } "
243+ # headers["Cookie"] = f"AIROS_{self._auth_cookie}"
244+ headers ["Cookie" ] = self ._auth_cookie
208245
209246 return headers
210247
@@ -218,7 +255,8 @@ def _store_auth_data(self, response: aiohttp.ClientResponse) -> None:
218255 cookie .load (set_cookie )
219256 for key , morsel in cookie .items ():
220257 if key .startswith ("AIROS_" ):
221- self ._auth_cookie = morsel .key [6 :] + "=" + morsel .value
258+ # self._auth_cookie = morsel.key[6:] + "=" + morsel.value
259+ self ._auth_cookie = f"{ morsel .key } ={ morsel .value } "
222260 break
223261
224262 async def _request_json (
@@ -243,7 +281,7 @@ async def _request_json(
243281 request_headers .update (headers )
244282
245283 try :
246- if url not in self ._login_urls . values () and not self .connected :
284+ if url not in self ._login_urls and not self .connected :
247285 _LOGGER .error ("Not connected, login first" )
248286 raise AirOSDeviceConnectionError from None
249287
@@ -259,7 +297,7 @@ async def _request_json(
259297 _LOGGER .debug ("Successfully fetched JSON from %s" , url )
260298
261299 # If this is the login request, we need to store the new auth data
262- if url in self ._login_urls . values () :
300+ if url in self ._login_urls :
263301 self ._store_auth_data (response )
264302 self .connected = True
265303
@@ -283,31 +321,71 @@ async def _request_json(
283321 _LOGGER .warning ("Request to %s was cancelled" , url )
284322 raise
285323
324+ async def _login_v6 (self ) -> None :
325+ """Login to airOS v6 (XM) devices."""
326+ # Handle session cookie from login url
327+ async with self .session .request (
328+ "GET" ,
329+ self ._v6_xm_login_url ,
330+ allow_redirects = False ,
331+ ) as response :
332+ session_cookie = next (
333+ (c for n , c in response .cookies .items () if n .startswith ("AIROS" )), None
334+ )
335+ if not session_cookie :
336+ raise AirOSDeviceConnectionError ("No session cookie received." )
337+ self ._auth_cookie = f"{ session_cookie .key } ={ session_cookie .value } "
338+
339+ # Handle login expecting 302 redirect
340+ payload = {
341+ "username" : self .username ,
342+ "password" : self .password ,
343+ "uri" : self ._v6_form_url ,
344+ }
345+ headers = {
346+ "Content-Type" : "application/x-www-form-urlencoded" ,
347+ "Origin" : self .base_url ,
348+ "Referer" : self ._v6_xm_login_url ,
349+ "Cookie" : self ._auth_cookie ,
350+ }
351+ async with self .session .request (
352+ "POST" ,
353+ self ._v6_xm_login_url ,
354+ data = payload ,
355+ headers = headers ,
356+ allow_redirects = False ,
357+ ) as response :
358+ if response .status != 302 :
359+ raise AirOSConnectionAuthenticationError ("Login failed." )
360+
361+ # Activate session by accessing the form URL
362+ headers = {"Referer" : self ._v6_xm_login_url , "Cookie" : self ._auth_cookie }
363+ async with self .session .request (
364+ "GET" ,
365+ f"{ self .base_url } { self ._v6_form_url } " ,
366+ headers = headers ,
367+ allow_redirects = True ,
368+ ) as response :
369+ if "login.cgi" in str (response .url ):
370+ raise AirOSConnectionAuthenticationError ("Session activation failed." )
371+ self .connected = True
372+ self .api_version = 6
373+
286374 async def login (self ) -> None :
287375 """Login to AirOS device."""
288376 payload = {"username" : self .username , "password" : self .password }
289377 try :
290- await self ._request_json (
291- "POST" , self ._login_urls ["default" ], json_data = payload
292- )
378+ await self ._request_json ("POST" , self ._login_url , json_data = payload )
293379 except AirOSUrlNotFoundError :
294- pass # Try next URL
380+ await self . _login_v6 ()
295381 except AirOSConnectionSetupError as err :
296382 raise AirOSConnectionSetupError ("Failed to login to AirOS device" ) from err
297383 else :
298384 return
299385
300- try : # Alternative URL
301- await self ._request_json (
302- "POST" ,
303- self ._login_urls ["v6_alternative" ],
304- form_data = payload ,
305- ct_form = True ,
306- )
307- except AirOSConnectionSetupError as err :
308- raise AirOSConnectionSetupError (
309- "Failed to login to default and alternate AirOS device urls"
310- ) from err
386+ async def raw_status (self ) -> dict [str , Any ]:
387+ """Retrieve raw status from the device."""
388+ return await self ._request_json ("GET" , self ._status_cgi_url , authenticated = True )
311389
312390 async def status (self ) -> AirOSDataModel :
313391 """Retrieve status from the device."""
0 commit comments