18
18
from urllib .parse import urljoin
19
19
from warnings import warn
20
20
21
- import requests
21
+ import httpx
22
+ from httpx_retries import Retry , RetryTransport
22
23
from pydantic .v1 import (
23
24
BaseSettings ,
24
25
HttpUrl ,
27
28
constr ,
28
29
validate_arguments ,
29
30
)
30
- from requests .adapters import HTTPAdapter
31
- from urllib3 .util .retry import Retry
32
31
33
32
from pyatlan .cache .atlan_tag_cache import AtlanTagCache
34
33
from pyatlan .cache .connection_cache import ConnectionCache
41
40
from pyatlan .client .admin import AdminClient
42
41
from pyatlan .client .asset import A , AssetClient , IndexSearchResults , LineageListResults
43
42
from pyatlan .client .audit import AuditClient
44
- from pyatlan .client .common import CONNECTION_RETRY , HTTP_PREFIX , HTTPS_PREFIX
45
- from pyatlan .client .constants import EVENT_STREAM , GET_TOKEN , PARSE_QUERY , UPLOAD_IMAGE
43
+ from pyatlan .client .common import CONNECTION_RETRY
44
+ from pyatlan .client .constants import EVENT_STREAM , PARSE_QUERY , UPLOAD_IMAGE
46
45
from pyatlan .client .contract import ContractClient
47
46
from pyatlan .client .credential import CredentialClient
48
47
from pyatlan .client .file import FileClient
@@ -109,7 +108,6 @@ def get_adapter() -> logging.LoggerAdapter:
109
108
backoff_factor = 1 ,
110
109
status_forcelist = [403 , 429 , 500 , 502 , 503 , 504 ],
111
110
allowed_methods = ["HEAD" , "GET" , "OPTIONS" , "POST" , "PUT" , "DELETE" ],
112
- raise_on_status = False ,
113
111
# When response.status is in `status_forcelist`
114
112
# and the "Retry-After" header is present, the retry mechanism
115
113
# will use the header's value to delay the next API call.
@@ -124,28 +122,14 @@ def log_response(response, *args, **kwargs):
124
122
LOGGER .debug ("URL: %s" , response .request .url )
125
123
126
124
127
- def get_session ():
128
- session = requests .session ()
129
- session .headers .update (
130
- {
131
- "x-atlan-agent" : "sdk" ,
132
- "x-atlan-agent-id" : "python" ,
133
- "x-atlan-client-origin" : "product_sdk" ,
134
- "User-Agent" : f"Atlan-PythonSDK/{ VERSION } " ,
135
- }
136
- )
137
- session .hooks ["response" ].append (log_response )
138
- return session
139
-
140
-
141
125
class AtlanClient (BaseSettings ):
142
126
base_url : Union [Literal ["INTERNAL" ], HttpUrl ]
143
127
api_key : str
144
128
connect_timeout : float = 30.0 # 30 secs
145
129
read_timeout : float = 900.0 # 15 mins
146
130
retry : Retry = DEFAULT_RETRY
147
131
_401_has_retried : ContextVar [bool ] = ContextVar ("_401_has_retried" , default = False )
148
- _session : requests . Session = PrivateAttr (default_factory = get_session )
132
+ _session : httpx . Client = PrivateAttr (default_factory = lambda : httpx . Client () )
149
133
_request_params : dict = PrivateAttr ()
150
134
_user_id : Optional [str ] = PrivateAttr (default = None )
151
135
_workflow_client : Optional [WorkflowClient ] = PrivateAttr (default = None )
@@ -185,10 +169,17 @@ def __init__(self, **data):
185
169
"authorization" : f"Bearer { self .api_key } " ,
186
170
}
187
171
}
188
- session = self ._session
189
- adapter = HTTPAdapter (max_retries = self .retry )
190
- session .mount (HTTPS_PREFIX , adapter )
191
- session .mount (HTTP_PREFIX , adapter )
172
+ # Configure httpx client with retry transport
173
+ self ._session = httpx .Client (
174
+ transport = RetryTransport (retry = self .retry ),
175
+ headers = {
176
+ "x-atlan-agent" : "sdk" ,
177
+ "x-atlan-agent-id" : "python" ,
178
+ "x-atlan-client-origin" : "product_sdk" ,
179
+ "User-Agent" : f"Atlan-PythonSDK/{ VERSION } " ,
180
+ },
181
+ event_hooks = {"response" : [log_response ]},
182
+ )
192
183
self ._401_has_retried .set (False )
193
184
194
185
@property
@@ -438,21 +429,24 @@ def _call_api_internal(
438
429
token = request_id_var .set (str (uuid .uuid4 ()))
439
430
try :
440
431
params ["headers" ]["X-Atlan-Request-Id" ] = request_id_var .get ()
432
+ timeout = httpx .Timeout (
433
+ None , connect = self .connect_timeout , read = self .read_timeout
434
+ )
441
435
if binary_data :
442
436
response = self ._session .request (
443
437
api .method .value ,
444
438
path ,
445
439
data = binary_data ,
446
440
** params ,
447
- timeout = ( self . connect_timeout , self . read_timeout ) ,
441
+ timeout = timeout ,
448
442
)
449
443
elif api .consumes == EVENT_STREAM and api .produces == EVENT_STREAM :
450
444
response = self ._session .request (
451
445
api .method .value ,
452
446
path ,
453
447
** params ,
454
448
stream = True ,
455
- timeout = ( self . connect_timeout , self . read_timeout ) ,
449
+ timeout = timeout ,
456
450
)
457
451
if download_file_path :
458
452
return self ._handle_file_download (response .raw , download_file_path )
@@ -461,7 +455,7 @@ def _call_api_internal(
461
455
api .method .value ,
462
456
path ,
463
457
** params ,
464
- timeout = ( self . connect_timeout , self . read_timeout ) ,
458
+ timeout = timeout ,
465
459
)
466
460
if response is not None :
467
461
LOGGER .debug ("HTTP Status: %s" , response .status_code )
@@ -527,10 +521,7 @@ def _call_api_internal(
527
521
)
528
522
LOGGER .debug ("response: %s" , response_ )
529
523
return response_
530
- except (
531
- requests .exceptions .JSONDecodeError ,
532
- json .decoder .JSONDecodeError ,
533
- ) as e :
524
+ except (json .decoder .JSONDecodeError ,) as e :
534
525
raise ErrorCode .JSON_ERROR .exception_with_parameters (
535
526
response .text , response .status_code , str (e )
536
527
) from e
@@ -1827,12 +1818,11 @@ def max_retries(
1827
1818
) -> Generator [None , None , None ]:
1828
1819
"""Creates a context manger that can used to temporarily change parameters used for retrying connnections.
1829
1820
The original Retry information will be restored when the context is exited."""
1830
- if self .base_url == "INTERNAL" :
1831
- adapter = self ._session .adapters [HTTP_PREFIX ]
1832
- else :
1833
- adapter = self ._session .adapters [HTTPS_PREFIX ]
1834
- current_max = adapter .max_retries # type: ignore[attr-defined]
1835
- adapter .max_retries = max_retries # type: ignore[attr-defined]
1821
+ # Store current transport and create new one with updated retries
1822
+ current_transport = self ._session ._transport
1823
+ new_transport = RetryTransport (retry = max_retries )
1824
+ self ._session ._transport = new_transport
1825
+
1836
1826
LOGGER .debug (
1837
1827
"max_retries set to total: %s force_list: %s" ,
1838
1828
max_retries .total ,
@@ -1842,16 +1832,13 @@ def max_retries(
1842
1832
LOGGER .debug ("Entering max_retries" )
1843
1833
yield None
1844
1834
LOGGER .debug ("Exiting max_retries" )
1845
- except requests . exceptions . RetryError as err :
1835
+ except httpx . TransportError as err :
1846
1836
LOGGER .exception ("Exception in max retries" )
1847
1837
raise ErrorCode .RETRY_OVERRUN .exception_with_parameters () from err
1848
1838
finally :
1849
- adapter .max_retries = current_max # type: ignore[attr-defined]
1850
- LOGGER .debug (
1851
- "max_retries restored to total: %s force_list: %s" ,
1852
- adapter .max_retries .total , # type: ignore[attr-defined]
1853
- adapter .max_retries .status_forcelist , # type: ignore[attr-defined]
1854
- )
1839
+ # Restore original transport
1840
+ self ._session ._transport = current_transport
1841
+ LOGGER .debug ("max_retries restored %s" , self ._session ._transport .retry )
1855
1842
1856
1843
1857
1844
@contextlib .contextmanager
0 commit comments