|
16 | 16 |
|
17 | 17 | import logging |
18 | 18 | import re |
| 19 | +from urllib.parse import urljoin |
19 | 20 |
|
20 | 21 | import requests |
21 | 22 |
|
|
41 | 42 | ) |
42 | 43 | _RE_TITLE = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE | re.DOTALL) |
43 | 44 | _RE_FORM_ACTION = re.compile(r"<form[^>]+action=['\"]([^'\"]+)['\"]", re.IGNORECASE) |
| 45 | +_RE_FORM_BLOCK = re.compile( |
| 46 | + r"<form[^>]*action=['\"](?P<action>[^'\"]+)['\"][^>]*>(?P<body>.*?)</form>", |
| 47 | + re.IGNORECASE | re.DOTALL, |
| 48 | +) |
| 49 | +_RE_INPUT = re.compile(r"<input\b(?P<attrs>[^>]*)>", re.IGNORECASE | re.DOTALL) |
| 50 | +_RE_ATTR = re.compile(r"(?P<name>[A-Za-z_:][-A-Za-z0-9_:.]*)\s*=\s*['\"](?P<value>[^'\"]*)['\"]") |
44 | 51 |
|
45 | 52 | # Fields per channel for each section (after the leading count value). |
46 | 53 | _DS_QAM_FIELDS = 9 # num|lock|mod|chID|freq|power|snr|corrErr|uncorrErr |
@@ -73,7 +80,18 @@ def login(self) -> None: |
73 | 80 | try: |
74 | 81 | r = self._session.get(f"{self._url}{_STATUS_PATH}", timeout=30) |
75 | 82 | r.raise_for_status() |
76 | | - self._ensure_status_page(r.text) |
| 83 | + try: |
| 84 | + self._ensure_status_page(r.text) |
| 85 | + except RuntimeError: |
| 86 | + if not self._looks_like_login_page(r.text): |
| 87 | + self._log_status_page_diagnostics(r.text, "login") |
| 88 | + raise |
| 89 | + if not self._login_via_form(): |
| 90 | + self._log_status_page_diagnostics(r.text, "login") |
| 91 | + raise |
| 92 | + r = self._session.get(f"{self._url}{_STATUS_PATH}", timeout=30) |
| 93 | + r.raise_for_status() |
| 94 | + self._ensure_status_page(r.text) |
77 | 95 | self._status_html = r.text |
78 | 96 | log.info("CM3000 auth OK") |
79 | 97 | return |
@@ -178,6 +196,71 @@ def _fetch_status_page(self) -> str: |
178 | 196 | raise |
179 | 197 | return r.text |
180 | 198 |
|
| 199 | + def _login_via_form(self) -> bool: |
| 200 | + """Try the newer Netgear web login flow used before DocsisStatus access.""" |
| 201 | + login_url = urljoin(f"{self._url}/", "Login.htm") |
| 202 | + try: |
| 203 | + r = self._session.get(login_url, timeout=30) |
| 204 | + r.raise_for_status() |
| 205 | + action, payload = self._extract_login_form(r.text) |
| 206 | + if not action: |
| 207 | + action = "/goform/Login" |
| 208 | + payload = payload or {} |
| 209 | + self._apply_login_credentials(payload) |
| 210 | + if not any(v for k, v in payload.items() if "pass" in k.lower()): |
| 211 | + payload["loginPassword"] = self._password |
| 212 | + if not any(v for k, v in payload.items() if "user" in k.lower() or "name" in k.lower()): |
| 213 | + payload["loginName"] = self._user |
| 214 | + post_url = urljoin(f"{self._url}/", action.lstrip("/")) |
| 215 | + r = self._session.post(post_url, data=payload, timeout=30) |
| 216 | + r.raise_for_status() |
| 217 | + return True |
| 218 | + except requests.RequestException as exc: |
| 219 | + log.debug("CM3000 form login failed: %s", exc) |
| 220 | + return False |
| 221 | + |
| 222 | + def _apply_login_credentials(self, payload: dict) -> None: |
| 223 | + """Populate parsed login form fields with configured credentials.""" |
| 224 | + lowered = {k.lower(): k for k in payload} |
| 225 | + user_keys = [k for k in payload if any(token in k.lower() for token in ("loginname", "username", "user", "name"))] |
| 226 | + pass_keys = [k for k in payload if "pass" in k.lower()] |
| 227 | + for key in user_keys: |
| 228 | + payload[key] = self._user |
| 229 | + for key in pass_keys: |
| 230 | + payload[key] = self._password |
| 231 | + if not user_keys and "loginname" not in lowered: |
| 232 | + payload["loginName"] = self._user |
| 233 | + if not pass_keys and "loginpassword" not in lowered: |
| 234 | + payload["loginPassword"] = self._password |
| 235 | + |
| 236 | + @staticmethod |
| 237 | + def _extract_login_form(html: str) -> tuple[str | None, dict]: |
| 238 | + """Extract login form action and input values from Login.htm.""" |
| 239 | + match = _RE_FORM_BLOCK.search(html or "") |
| 240 | + if not match: |
| 241 | + return None, {} |
| 242 | + action = match.group("action").strip() |
| 243 | + body = match.group("body") |
| 244 | + payload = {} |
| 245 | + for input_match in _RE_INPUT.finditer(body): |
| 246 | + attrs = { |
| 247 | + attr_match.group("name").lower(): attr_match.group("value") |
| 248 | + for attr_match in _RE_ATTR.finditer(input_match.group("attrs")) |
| 249 | + } |
| 250 | + name = attrs.get("name") |
| 251 | + if not name: |
| 252 | + continue |
| 253 | + input_type = attrs.get("type", "").lower() |
| 254 | + if input_type in {"submit", "button", "image"}: |
| 255 | + continue |
| 256 | + payload[name] = attrs.get("value", "") |
| 257 | + return action, payload |
| 258 | + |
| 259 | + @staticmethod |
| 260 | + def _looks_like_login_page(html: str) -> bool: |
| 261 | + lower_html = (html or "").lower() |
| 262 | + return any(marker in lower_html for marker in _LOGIN_MARKERS) |
| 263 | + |
181 | 264 | @staticmethod |
182 | 265 | def _ensure_status_page(html: str) -> None: |
183 | 266 | """Reject login/placeholder pages that would otherwise parse as zero channels.""" |
|
0 commit comments