Skip to content

Commit 6779c5d

Browse files
authored
Try fetching the snapshot URI without user/pass if it fails (#88)
1 parent ca3b2cc commit 6779c5d

File tree

4 files changed

+82
-9
lines changed

4 files changed

+82
-9
lines changed

onvif/client.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import os.path
99
from typing import Any
1010
from collections.abc import Callable
11-
1211
import httpx
1312
from httpx import AsyncClient, BasicAuth, DigestAuth
1413
from zeep.cache import SqliteCache
@@ -27,7 +26,14 @@
2726
from .settings import DEFAULT_SETTINGS
2827
from .transport import ASYNC_TRANSPORT
2928
from .types import FastDateTime, ForgivingTime
30-
from .util import create_no_verify_ssl_context, normalize_url, path_isfile, utcnow
29+
from .util import (
30+
create_no_verify_ssl_context,
31+
normalize_url,
32+
path_isfile,
33+
utcnow,
34+
strip_user_pass_url,
35+
obscure_user_pass_url,
36+
)
3137
from .wrappers import retry_connection_error # noqa: F401
3238
from .wsa import WsAddressingIfMissingPlugin
3339

@@ -573,12 +579,16 @@ async def get_snapshot(
573579
else:
574580
auth = DigestAuth(self.user, self.passwd)
575581

576-
try:
577-
response = await self._snapshot_client.get(uri, auth=auth)
578-
except httpx.TimeoutException as error:
579-
raise ONVIFTimeoutError(f"Timed out fetching {uri}: {error}") from error
580-
except httpx.RequestError as error:
581-
raise ONVIFError(f"Error fetching {uri}: {error}") from error
582+
response = await self._try_snapshot_uri(uri, auth)
583+
584+
# If the request fails with a 401, make sure to strip any
585+
# sample user/pass from the URL and try again
586+
if (
587+
response.status_code == 401
588+
and (stripped_uri := strip_user_pass_url(uri))
589+
and stripped_uri != uri
590+
):
591+
response = await self._try_snapshot_uri(stripped_uri, auth)
582592

583593
if response.status_code == 401:
584594
raise ONVIFAuthError(f"Failed to authenticate to {uri}")
@@ -588,6 +598,20 @@ async def get_snapshot(
588598

589599
return None
590600

601+
async def _try_snapshot_uri(
602+
self, uri: str, auth: BasicAuth | DigestAuth | None
603+
) -> httpx.Response:
604+
try:
605+
return await self._snapshot_client.get(uri, auth=auth)
606+
except httpx.TimeoutException as error:
607+
raise ONVIFTimeoutError(
608+
f"Timed out fetching {obscure_user_pass_url(uri)}: {error}"
609+
) from error
610+
except httpx.RequestError as error:
611+
raise ONVIFError(
612+
f"Error fetching {obscure_user_pass_url(uri)}: {error}"
613+
) from error
614+
591615
def get_definition(
592616
self, name: str, port_type: str | None = None
593617
) -> tuple[str, str, str]:

onvif/util.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import ssl
1010
from typing import Any
1111
from urllib.parse import ParseResultBytes, urlparse, urlunparse
12-
12+
from yarl import URL
13+
from multidict import CIMultiDict
1314
from zeep.exceptions import Fault
1415

1516
utcnow: partial[dt.datetime] = partial(dt.datetime.now, dt.timezone.utc)
@@ -18,6 +19,8 @@
1819
# to minimize the impact of the blocking I/O.
1920
path_isfile = lru_cache(maxsize=128)(os.path.isfile)
2021

22+
_CREDENTIAL_KEYS = ("username", "password", "user", "pass")
23+
2124

2225
def normalize_url(url: bytes | str | None) -> str | None:
2326
"""Normalize URL.
@@ -105,3 +108,34 @@ def create_no_verify_ssl_context() -> ssl.SSLContext:
105108
# ssl.OP_LEGACY_SERVER_CONNECT is only available in Python 3.12a4+
106109
sslcontext.options |= getattr(ssl, "OP_LEGACY_SERVER_CONNECT", 0x4)
107110
return sslcontext
111+
112+
113+
def strip_user_pass_url(url: str) -> str:
114+
"""Strip password from URL."""
115+
parsed_url = URL(url)
116+
query = parsed_url.query
117+
new_query: CIMultiDict | None = None
118+
for key in _CREDENTIAL_KEYS:
119+
if key in query:
120+
if new_query is None:
121+
new_query = CIMultiDict(parsed_url.query)
122+
new_query.popall(key)
123+
if new_query is not None:
124+
return str(parsed_url.with_query(new_query))
125+
return url
126+
127+
128+
def obscure_user_pass_url(url: str) -> str:
129+
"""Obscure user and password from URL."""
130+
parsed_url = URL(url)
131+
query = parsed_url.query
132+
new_query: CIMultiDict | None = None
133+
for key in _CREDENTIAL_KEYS:
134+
if key in query:
135+
if new_query is None:
136+
new_query = CIMultiDict(parsed_url.query)
137+
new_query.popall(key)
138+
new_query[key] = "********"
139+
if new_query is not None:
140+
return str(parsed_url.with_query(new_query))
141+
return url

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
ciso8601==2.3.2
33
httpx==0.28.1
44
zeep[async]==4.3.1
5+
yarl>=1.10.0

tests/test_util.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66
from zeep.loader import parse_xml
7+
from onvif.util import strip_user_pass_url, obscure_user_pass_url
78

89
from onvif.client import ONVIFCamera
910
from onvif.settings import DEFAULT_SETTINGS
@@ -40,3 +41,16 @@ async def test_normalize_url_with_missing_url():
4041
)
4142
result = operation.process_reply(envelope)
4243
assert normalize_url(result.SubscriptionReference.Address._value_1) is None
44+
45+
46+
def test_strip_user_pass_url():
47+
assert strip_user_pass_url("http://1.2.3.4/?user=foo&pass=bar") == "http://1.2.3.4/"
48+
assert strip_user_pass_url("http://1.2.3.4/") == "http://1.2.3.4/"
49+
50+
51+
def test_obscure_user_pass_url():
52+
assert (
53+
obscure_user_pass_url("http://1.2.3.4/?user=foo&pass=bar")
54+
== "http://1.2.3.4/?user=********&pass=********"
55+
)
56+
assert obscure_user_pass_url("http://1.2.3.4/") == "http://1.2.3.4/"

0 commit comments

Comments
 (0)