4
4
5
5
import contextlib
6
6
import datetime as dt
7
- from functools import lru_cache , partial
8
7
import os
9
8
import ssl
9
+ from functools import lru_cache , partial
10
10
from typing import Any
11
11
from urllib .parse import ParseResultBytes , urlparse , urlunparse
12
- from yarl import URL
13
- from multidict import CIMultiDict
12
+
14
13
from zeep .exceptions import Fault
15
14
16
- utcnow : partial [dt .datetime ] = partial (dt .datetime .now , dt .timezone .utc )
15
+ from multidict import CIMultiDict
16
+ from yarl import URL
17
+
18
+ utcnow : partial [dt .datetime ] = partial (dt .datetime .now , dt .UTC )
17
19
18
20
# This does blocking I/O (stat) so we cache the result
19
21
# to minimize the impact of the blocking I/O.
23
25
24
26
25
27
def normalize_url (url : bytes | str | None ) -> str | None :
26
- """Normalize URL.
28
+ """
29
+ Normalize URL.
27
30
28
31
Some cameras respond with <wsa5:Address>http://192.168.1.106:8106:8106/onvif/Subscription?Idx=43</wsa5:Address>
29
32
https://github.com/home-assistant/core/issues/92603#issuecomment-1537213126
@@ -73,7 +76,8 @@ def stringify_onvif_error(error: Exception) -> str:
73
76
74
77
75
78
def is_auth_error (error : Exception ) -> bool :
76
- """Return True if error is an authentication error.
79
+ """
80
+ Return True if error is an authentication error.
77
81
78
82
Most of the tested cameras do not return a proper error code when
79
83
authentication fails, so we need to check the error message as well.
@@ -90,7 +94,8 @@ def is_auth_error(error: Exception) -> bool:
90
94
91
95
92
96
def create_no_verify_ssl_context () -> ssl .SSLContext :
93
- """Return an SSL context that does not verify the server certificate.
97
+ """
98
+ Return an SSL context that does not verify the server certificate.
94
99
This is a copy of aiohttp's create_default_context() function, with the
95
100
ssl verify turned off and old SSL versions enabled.
96
101
@@ -113,6 +118,12 @@ def create_no_verify_ssl_context() -> ssl.SSLContext:
113
118
def strip_user_pass_url (url : str ) -> str :
114
119
"""Strip password from URL."""
115
120
parsed_url = URL (url )
121
+
122
+ # First strip userinfo (user:pass@) from URL
123
+ if parsed_url .user or parsed_url .password :
124
+ parsed_url = parsed_url .with_user (None )
125
+
126
+ # Then strip credentials from query parameters
116
127
query = parsed_url .query
117
128
new_query : CIMultiDict | None = None
118
129
for key in _CREDENTIAL_KEYS :
@@ -122,12 +133,23 @@ def strip_user_pass_url(url: str) -> str:
122
133
new_query .popall (key )
123
134
if new_query is not None :
124
135
return str (parsed_url .with_query (new_query ))
125
- return url
136
+ return str ( parsed_url )
126
137
127
138
128
139
def obscure_user_pass_url (url : str ) -> str :
129
140
"""Obscure user and password from URL."""
130
141
parsed_url = URL (url )
142
+
143
+ # First obscure userinfo if present
144
+ if parsed_url .user :
145
+ # Keep the user but obscure the password
146
+ if parsed_url .password :
147
+ parsed_url = parsed_url .with_password ("********" )
148
+ else :
149
+ # If only user is present, obscure it
150
+ parsed_url = parsed_url .with_user ("********" )
151
+
152
+ # Then obscure credentials in query parameters
131
153
query = parsed_url .query
132
154
new_query : CIMultiDict | None = None
133
155
for key in _CREDENTIAL_KEYS :
@@ -138,4 +160,4 @@ def obscure_user_pass_url(url: str) -> str:
138
160
new_query [key ] = "********"
139
161
if new_query is not None :
140
162
return str (parsed_url .with_query (new_query ))
141
- return url
163
+ return str ( parsed_url )
0 commit comments