diff --git a/onvif/client.py b/onvif/client.py index cf95df4..5f4f7b3 100644 --- a/onvif/client.py +++ b/onvif/client.py @@ -8,7 +8,6 @@ import os.path from typing import Any from collections.abc import Callable - import httpx from httpx import AsyncClient, BasicAuth, DigestAuth from zeep.cache import SqliteCache @@ -27,7 +26,14 @@ from .settings import DEFAULT_SETTINGS from .transport import ASYNC_TRANSPORT from .types import FastDateTime, ForgivingTime -from .util import create_no_verify_ssl_context, normalize_url, path_isfile, utcnow +from .util import ( + create_no_verify_ssl_context, + normalize_url, + path_isfile, + utcnow, + strip_user_pass_url, + obscure_user_pass_url, +) from .wrappers import retry_connection_error # noqa: F401 from .wsa import WsAddressingIfMissingPlugin @@ -573,12 +579,16 @@ async def get_snapshot( else: auth = DigestAuth(self.user, self.passwd) - try: - response = await self._snapshot_client.get(uri, auth=auth) - except httpx.TimeoutException as error: - raise ONVIFTimeoutError(f"Timed out fetching {uri}: {error}") from error - except httpx.RequestError as error: - raise ONVIFError(f"Error fetching {uri}: {error}") from error + response = await self._try_snapshot_uri(uri, auth) + + # If the request fails with a 401, make sure to strip any + # sample user/pass from the URL and try again + if ( + response.status_code == 401 + and (stripped_uri := strip_user_pass_url(uri)) + and stripped_uri != uri + ): + response = await self._try_snapshot_uri(stripped_uri, auth) if response.status_code == 401: raise ONVIFAuthError(f"Failed to authenticate to {uri}") @@ -588,6 +598,20 @@ async def get_snapshot( return None + async def _try_snapshot_uri( + self, uri: str, auth: BasicAuth | DigestAuth | None + ) -> httpx.Response: + try: + return await self._snapshot_client.get(uri, auth=auth) + except httpx.TimeoutException as error: + raise ONVIFTimeoutError( + f"Timed out fetching {obscure_user_pass_url(uri)}: {error}" + ) from error + except httpx.RequestError as error: + raise ONVIFError( + f"Error fetching {obscure_user_pass_url(uri)}: {error}" + ) from error + def get_definition( self, name: str, port_type: str | None = None ) -> tuple[str, str, str]: diff --git a/onvif/util.py b/onvif/util.py index 794c030..c6d0b17 100644 --- a/onvif/util.py +++ b/onvif/util.py @@ -9,7 +9,8 @@ import ssl from typing import Any from urllib.parse import ParseResultBytes, urlparse, urlunparse - +from yarl import URL +from multidict import CIMultiDict from zeep.exceptions import Fault utcnow: partial[dt.datetime] = partial(dt.datetime.now, dt.timezone.utc) @@ -18,6 +19,8 @@ # to minimize the impact of the blocking I/O. path_isfile = lru_cache(maxsize=128)(os.path.isfile) +_CREDENTIAL_KEYS = ("username", "password", "user", "pass") + def normalize_url(url: bytes | str | None) -> str | None: """Normalize URL. @@ -105,3 +108,34 @@ def create_no_verify_ssl_context() -> ssl.SSLContext: # ssl.OP_LEGACY_SERVER_CONNECT is only available in Python 3.12a4+ sslcontext.options |= getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4) return sslcontext + + +def strip_user_pass_url(url: str) -> str: + """Strip password from URL.""" + parsed_url = URL(url) + query = parsed_url.query + new_query: CIMultiDict | None = None + for key in _CREDENTIAL_KEYS: + if key in query: + if new_query is None: + new_query = CIMultiDict(parsed_url.query) + new_query.popall(key) + if new_query is not None: + return str(parsed_url.with_query(new_query)) + return url + + +def obscure_user_pass_url(url: str) -> str: + """Obscure user and password from URL.""" + parsed_url = URL(url) + query = parsed_url.query + new_query: CIMultiDict | None = None + for key in _CREDENTIAL_KEYS: + if key in query: + if new_query is None: + new_query = CIMultiDict(parsed_url.query) + new_query.popall(key) + new_query[key] = "********" + if new_query is not None: + return str(parsed_url.with_query(new_query)) + return url diff --git a/requirements.txt b/requirements.txt index a93b1aa..e89dead 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ ciso8601==2.3.2 httpx==0.28.1 zeep[async]==4.3.1 +yarl>=1.10.0 diff --git a/tests/test_util.py b/tests/test_util.py index e3eaa71..c3d36aa 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -4,6 +4,7 @@ import pytest from zeep.loader import parse_xml +from onvif.util import strip_user_pass_url, obscure_user_pass_url from onvif.client import ONVIFCamera from onvif.settings import DEFAULT_SETTINGS @@ -40,3 +41,16 @@ async def test_normalize_url_with_missing_url(): ) result = operation.process_reply(envelope) assert normalize_url(result.SubscriptionReference.Address._value_1) is None + + +def test_strip_user_pass_url(): + assert strip_user_pass_url("http://1.2.3.4/?user=foo&pass=bar") == "http://1.2.3.4/" + assert strip_user_pass_url("http://1.2.3.4/") == "http://1.2.3.4/" + + +def test_obscure_user_pass_url(): + assert ( + obscure_user_pass_url("http://1.2.3.4/?user=foo&pass=bar") + == "http://1.2.3.4/?user=********&pass=********" + ) + assert obscure_user_pass_url("http://1.2.3.4/") == "http://1.2.3.4/"