17
17
from urllib .parse import urljoin
18
18
from warnings import warn
19
19
20
- import requests
20
+ import httpx
21
+ from httpx_retries import Retry , RetryTransport
21
22
from pydantic .v1 import (
22
23
BaseSettings ,
23
24
HttpUrl ,
26
27
constr ,
27
28
validate_arguments ,
28
29
)
29
- from requests .adapters import HTTPAdapter
30
- from urllib3 .util .retry import Retry
31
30
32
31
from pyatlan .cache .atlan_tag_cache import AtlanTagCache
33
32
from pyatlan .cache .connection_cache import ConnectionCache
40
39
from pyatlan .client .admin import AdminClient
41
40
from pyatlan .client .asset import A , AssetClient , IndexSearchResults , LineageListResults
42
41
from pyatlan .client .audit import AuditClient
43
- from pyatlan .client .common import CONNECTION_RETRY , HTTP_PREFIX , HTTPS_PREFIX
42
+ from pyatlan .client .common import CONNECTION_RETRY
44
43
from pyatlan .client .constants import EVENT_STREAM , PARSE_QUERY , UPLOAD_IMAGE
45
44
from pyatlan .client .contract import ContractClient
46
45
from pyatlan .client .credential import CredentialClient
@@ -108,7 +107,6 @@ def get_adapter() -> logging.LoggerAdapter:
108
107
backoff_factor = 1 ,
109
108
status_forcelist = [403 , 429 , 500 , 502 , 503 , 504 ],
110
109
allowed_methods = ["HEAD" , "GET" , "OPTIONS" , "POST" , "PUT" , "DELETE" ],
111
- raise_on_status = False ,
112
110
# When response.status is in `status_forcelist`
113
111
# and the "Retry-After" header is present, the retry mechanism
114
112
# will use the header's value to delay the next API call.
@@ -123,28 +121,14 @@ def log_response(response, *args, **kwargs):
123
121
LOGGER .debug ("URL: %s" , response .request .url )
124
122
125
123
126
- def get_session ():
127
- session = requests .session ()
128
- session .headers .update (
129
- {
130
- "x-atlan-agent" : "sdk" ,
131
- "x-atlan-agent-id" : "python" ,
132
- "x-atlan-client-origin" : "product_sdk" ,
133
- "User-Agent" : f"Atlan-PythonSDK/{ VERSION } " ,
134
- }
135
- )
136
- session .hooks ["response" ].append (log_response )
137
- return session
138
-
139
-
140
124
class AtlanClient (BaseSettings ):
141
125
base_url : Union [Literal ["INTERNAL" ], HttpUrl ]
142
126
api_key : str
143
127
connect_timeout : float = 30.0 # 30 secs
144
128
read_timeout : float = 900.0 # 15 mins
145
129
retry : Retry = DEFAULT_RETRY
146
130
_401_has_retried : ContextVar [bool ] = ContextVar ("_401_has_retried" , default = False )
147
- _session : requests . Session = PrivateAttr (default_factory = get_session )
131
+ _session : httpx . Client = PrivateAttr (default_factory = lambda : httpx . Client () )
148
132
_request_params : dict = PrivateAttr ()
149
133
_user_id : Optional [str ] = PrivateAttr (default = None )
150
134
_workflow_client : Optional [WorkflowClient ] = PrivateAttr (default = None )
@@ -184,10 +168,17 @@ def __init__(self, **data):
184
168
"authorization" : f"Bearer { self .api_key } " ,
185
169
}
186
170
}
187
- session = self ._session
188
- adapter = HTTPAdapter (max_retries = self .retry )
189
- session .mount (HTTPS_PREFIX , adapter )
190
- session .mount (HTTP_PREFIX , adapter )
171
+ # Configure httpx client with retry transport
172
+ self ._session = httpx .Client (
173
+ transport = RetryTransport (retry = self .retry ),
174
+ headers = {
175
+ "x-atlan-agent" : "sdk" ,
176
+ "x-atlan-agent-id" : "python" ,
177
+ "x-atlan-client-origin" : "product_sdk" ,
178
+ "User-Agent" : f"Atlan-PythonSDK/{ VERSION } " ,
179
+ },
180
+ event_hooks = {"response" : [log_response ]},
181
+ )
191
182
self ._401_has_retried .set (False )
192
183
193
184
@property
@@ -371,21 +362,24 @@ def _call_api_internal(
371
362
token = request_id_var .set (str (uuid .uuid4 ()))
372
363
try :
373
364
params ["headers" ]["X-Atlan-Request-Id" ] = request_id_var .get ()
365
+ timeout = httpx .Timeout (
366
+ None , connect = self .connect_timeout , read = self .read_timeout
367
+ )
374
368
if binary_data :
375
369
response = self ._session .request (
376
370
api .method .value ,
377
371
path ,
378
372
data = binary_data ,
379
373
** params ,
380
- timeout = ( self . connect_timeout , self . read_timeout ) ,
374
+ timeout = timeout ,
381
375
)
382
376
elif api .consumes == EVENT_STREAM and api .produces == EVENT_STREAM :
383
377
response = self ._session .request (
384
378
api .method .value ,
385
379
path ,
386
380
** params ,
387
381
stream = True ,
388
- timeout = ( self . connect_timeout , self . read_timeout ) ,
382
+ timeout = timeout ,
389
383
)
390
384
if download_file_path :
391
385
return self ._handle_file_download (response .raw , download_file_path )
@@ -394,7 +388,7 @@ def _call_api_internal(
394
388
api .method .value ,
395
389
path ,
396
390
** params ,
397
- timeout = ( self . connect_timeout , self . read_timeout ) ,
391
+ timeout = timeout ,
398
392
)
399
393
if response is not None :
400
394
LOGGER .debug ("HTTP Status: %s" , response .status_code )
@@ -460,10 +454,7 @@ def _call_api_internal(
460
454
)
461
455
LOGGER .debug ("response: %s" , response_ )
462
456
return response_
463
- except (
464
- requests .exceptions .JSONDecodeError ,
465
- json .decoder .JSONDecodeError ,
466
- ) as e :
457
+ except (json .decoder .JSONDecodeError ,) as e :
467
458
raise ErrorCode .JSON_ERROR .exception_with_parameters (
468
459
response .text , response .status_code , str (e )
469
460
) from e
@@ -1760,12 +1751,11 @@ def max_retries(
1760
1751
) -> Generator [None , None , None ]:
1761
1752
"""Creates a context manger that can used to temporarily change parameters used for retrying connnections.
1762
1753
The original Retry information will be restored when the context is exited."""
1763
- if self .base_url == "INTERNAL" :
1764
- adapter = self ._session .adapters [HTTP_PREFIX ]
1765
- else :
1766
- adapter = self ._session .adapters [HTTPS_PREFIX ]
1767
- current_max = adapter .max_retries # type: ignore[attr-defined]
1768
- adapter .max_retries = max_retries # type: ignore[attr-defined]
1754
+ # Store current transport and create new one with updated retries
1755
+ current_transport = self ._session ._transport
1756
+ new_transport = RetryTransport (retry = max_retries )
1757
+ self ._session ._transport = new_transport
1758
+
1769
1759
LOGGER .debug (
1770
1760
"max_retries set to total: %s force_list: %s" ,
1771
1761
max_retries .total ,
@@ -1775,16 +1765,13 @@ def max_retries(
1775
1765
LOGGER .debug ("Entering max_retries" )
1776
1766
yield None
1777
1767
LOGGER .debug ("Exiting max_retries" )
1778
- except requests . exceptions . RetryError as err :
1768
+ except httpx . TransportError as err :
1779
1769
LOGGER .exception ("Exception in max retries" )
1780
1770
raise ErrorCode .RETRY_OVERRUN .exception_with_parameters () from err
1781
1771
finally :
1782
- adapter .max_retries = current_max # type: ignore[attr-defined]
1783
- LOGGER .debug (
1784
- "max_retries restored to total: %s force_list: %s" ,
1785
- adapter .max_retries .total , # type: ignore[attr-defined]
1786
- adapter .max_retries .status_forcelist , # type: ignore[attr-defined]
1787
- )
1772
+ # Restore original transport
1773
+ self ._session ._transport = current_transport
1774
+ LOGGER .debug ("max_retries restored %s" , self ._session ._transport .retry )
1788
1775
1789
1776
1790
1777
@contextlib .contextmanager
0 commit comments