Skip to content

Commit 25596e7

Browse files
simplify HTTP client using requests.session
Signed-off-by: varun-edachali-dbx <[email protected]>
1 parent 7fb59e4 commit 25596e7

File tree

1 file changed

+52
-130
lines changed

1 file changed

+52
-130
lines changed

src/databricks/sql/backend/sea/utils/http_client.py

Lines changed: 52 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
1-
import json
21
import logging
3-
import ssl
4-
import urllib.parse
5-
import urllib.request
62
from typing import Dict, Any, Optional, List, Tuple, Union
7-
from urllib.parse import urljoin
83

9-
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, ProxyManager
10-
from urllib3.util import make_headers
4+
import requests
5+
from requests.adapters import HTTPAdapter
116
from urllib3.exceptions import MaxRetryError
127

138
from databricks.sql.auth.authenticators import AuthProvider
@@ -27,16 +22,12 @@ class SeaHttpClient:
2722
"""
2823
HTTP client for Statement Execution API (SEA).
2924
30-
This client uses urllib3 for robust HTTP communication with retry policies
31-
and connection pooling, similar to the Thrift HTTP client but simplified.
25+
This client uses requests.Session for HTTP communication with retry policies
26+
and connection pooling.
3227
"""
3328

3429
retry_policy: Union[DatabricksRetryPolicy, int]
35-
_pool: Optional[Union[HTTPConnectionPool, HTTPSConnectionPool]]
36-
proxy_uri: Optional[str]
37-
realhost: Optional[str]
38-
realport: Optional[int]
39-
proxy_auth: Optional[Dict[str, str]]
30+
session: requests.Session
4031

4132
def __init__(
4233
self,
@@ -64,22 +55,13 @@ def __init__(
6455
self.server_hostname = server_hostname
6556
self.port = port or 443
6657
self.http_path = http_path
58+
self.http_headers = http_headers
6759
self.auth_provider = auth_provider
6860
self.ssl_options = ssl_options
6961

7062
# Build base URL
7163
self.base_url = f"https://{server_hostname}:{self.port}"
7264

73-
# Parse URL for proxy handling
74-
parsed_url = urllib.parse.urlparse(self.base_url)
75-
self.scheme = parsed_url.scheme
76-
self.host = parsed_url.hostname
77-
self.port = parsed_url.port or (443 if self.scheme == "https" else 80)
78-
79-
# Setup headers
80-
self.headers: Dict[str, str] = dict(http_headers)
81-
self.headers.update({"Content-Type": "application/json"})
82-
8365
# Extract retry policy settings
8466
self._retry_delay_min = kwargs.get("_retry_delay_min", 1.0)
8567
self._retry_delay_max = kwargs.get("_retry_delay_max", 60.0)
@@ -121,83 +103,44 @@ def __init__(
121103
# Legacy behavior - no automatic retries
122104
self.retry_policy = 0
123105

124-
# Handle proxy settings
125-
try:
126-
proxy = urllib.request.getproxies().get(self.scheme)
127-
except (KeyError, AttributeError):
128-
proxy = None
129-
else:
130-
if self.host and urllib.request.proxy_bypass(self.host):
131-
proxy = None
132-
133-
if proxy:
134-
parsed_proxy = urllib.parse.urlparse(proxy)
135-
self.realhost = self.host
136-
self.realport = self.port
137-
self.proxy_uri = proxy
138-
self.host = parsed_proxy.hostname
139-
self.port = parsed_proxy.port or (443 if self.scheme == "https" else 80)
140-
self.proxy_auth = self._basic_proxy_auth_headers(parsed_proxy)
141-
else:
142-
self.realhost = None
143-
self.realport = None
144-
self.proxy_auth = None
145-
self.proxy_uri = None
146-
147-
# Initialize connection pool
148-
self._pool = None
149-
self._open()
150-
151-
def _basic_proxy_auth_headers(self, proxy_parsed) -> Optional[Dict[str, str]]:
152-
"""Create basic auth headers for proxy if credentials are provided."""
153-
if proxy_parsed is None or not proxy_parsed.username:
154-
return None
155-
ap = f"{urllib.parse.unquote(proxy_parsed.username)}:{urllib.parse.unquote(proxy_parsed.password)}"
156-
return make_headers(proxy_basic_auth=ap)
157-
158-
def _open(self):
159-
"""Initialize the connection pool."""
160-
pool_kwargs = {"maxsize": self.max_connections}
161-
162-
if self.scheme == "http":
163-
pool_class = HTTPConnectionPool
164-
else: # https
165-
pool_class = HTTPSConnectionPool
166-
pool_kwargs.update(
167-
{
168-
"cert_reqs": ssl.CERT_REQUIRED
169-
if self.ssl_options.tls_verify
170-
else ssl.CERT_NONE,
171-
"ca_certs": self.ssl_options.tls_trusted_ca_file,
172-
"cert_file": self.ssl_options.tls_client_cert_file,
173-
"key_file": self.ssl_options.tls_client_cert_key_file,
174-
"key_password": self.ssl_options.tls_client_cert_key_password,
175-
}
176-
)
106+
# Create session and configure it
107+
self.session = requests.Session()
108+
self._configure_session()
177109

178-
if self.using_proxy():
179-
proxy_manager = ProxyManager(
180-
self.proxy_uri,
181-
num_pools=1,
182-
proxy_headers=self.proxy_auth,
183-
)
184-
self._pool = proxy_manager.connection_from_host(
185-
host=self.realhost,
186-
port=self.realport,
187-
scheme=self.scheme,
188-
pool_kwargs=pool_kwargs,
189-
)
190-
else:
191-
self._pool = pool_class(self.host, self.port, **pool_kwargs)
110+
def _configure_session(self):
111+
"""Configure the requests session with headers, SSL, and retry policy."""
112+
# Setup headers
113+
self.session.headers.update(dict(self.http_headers))
114+
self.session.headers.update({"Content-Type": "application/json"})
115+
116+
# Configure SSL
117+
if not self.ssl_options.tls_verify:
118+
self.session.verify = False
119+
elif self.ssl_options.tls_trusted_ca_file:
120+
self.session.verify = self.ssl_options.tls_trusted_ca_file
121+
122+
if self.ssl_options.tls_client_cert_file:
123+
if self.ssl_options.tls_client_cert_key_file:
124+
self.session.cert = (
125+
self.ssl_options.tls_client_cert_file,
126+
self.ssl_options.tls_client_cert_key_file,
127+
)
128+
else:
129+
self.session.cert = self.ssl_options.tls_client_cert_file
130+
131+
# Configure retry adapter
132+
adapter = HTTPAdapter(
133+
pool_connections=self.max_connections,
134+
pool_maxsize=self.max_connections,
135+
max_retries=self.retry_policy,
136+
)
192137

193-
def close(self):
194-
"""Close the connection pool."""
195-
if self._pool:
196-
self._pool.clear()
138+
self.session.mount("https://", adapter)
139+
self.session.mount("http://", adapter)
197140

198-
def using_proxy(self) -> bool:
199-
"""Check if proxy is being used (for compatibility with Thrift client)."""
200-
return self.realhost is not None
141+
def close(self):
142+
"""Close the session."""
143+
self.session.close()
201144

202145
def set_retry_command_type(self, command_type: CommandType):
203146
"""Set the command type for retry policy decision making."""
@@ -236,46 +179,36 @@ def _make_request(
236179
RequestError: If the request fails after retries
237180
"""
238181

239-
# Prepare headers
240-
headers = {**self.headers, **self._get_auth_headers()}
241-
242-
# Prepare request body
243-
body = json.dumps(data).encode("utf-8") if data else b""
244-
if body:
245-
headers["Content-Length"] = str(len(body))
246-
247182
# Set command type for retry policy
248183
command_type = self._get_command_type_from_path(path, method)
249184
self.set_retry_command_type(command_type)
250185
self.start_retry_timer()
251186

252-
logger.debug(f"Making {method} request to {path}")
187+
# Prepare headers
188+
headers = self._get_auth_headers()
253189

254-
# When v3 retries are enabled, urllib3 handles retries internally via DatabricksRetryPolicy
255-
# When disabled, we let exceptions bubble up (similar to Thrift backend approach)
256-
if self._pool is None:
257-
raise RequestError("Connection pool not initialized", None)
190+
logger.debug(f"Making {method} request to {path}")
258191

259192
try:
260-
response = self._pool.request(
193+
response = self.session.request(
261194
method=method.upper(),
262-
url=path,
263-
body=body,
195+
url=f"{self.base_url}{path}",
196+
json=data, # requests handles JSON encoding automatically
264197
headers=headers,
265-
preload_content=False,
266-
retries=self.retry_policy,
267198
)
199+
200+
response.raise_for_status() # This will raise an HTTPError for non-2xx responses
201+
return response.json()
202+
268203
except MaxRetryDurationError as e:
269204
# MaxRetryDurationError is raised directly by DatabricksRetryPolicy
270-
# when duration limits are exceeded (like in test_retry_exponential_backoff)
205+
# when duration limits are exceeded
271206
error_message = f"Request failed due to retry duration limit: {e}"
272-
# Construct RequestError with message, context, and specific error
273207
raise RequestError(error_message, None, e)
274208
except (SessionAlreadyClosedError, CursorAlreadyClosedError) as e:
275209
# These exceptions are raised by DatabricksRetryPolicy when detecting
276210
# "already closed" scenarios (404 responses with retry history)
277211
error_message = f"Request failed: {e}"
278-
# Construct RequestError with proper 3-argument format (message, context, error)
279212
raise RequestError(error_message, None, e)
280213
except MaxRetryError as e:
281214
# urllib3 MaxRetryError should bubble up for redirect tests to catch
@@ -284,19 +217,8 @@ def _make_request(
284217
except Exception as e:
285218
logger.error(f"SEA HTTP request failed with exception: {e}")
286219
error_message = f"Error during request to server. {e}"
287-
# Construct RequestError with proper 3-argument format (message, context, error)
288220
raise RequestError(error_message, None, e)
289221

290-
logger.debug(f"Response status: {response.status}")
291-
292-
# Handle successful responses
293-
if 200 <= response.status < 300:
294-
return response.json()
295-
296-
error_message = f"SEA HTTP request failed with status {response.status}"
297-
298-
raise RequestError(error_message, None)
299-
300222
def _get_command_type_from_path(self, path: str, method: str) -> CommandType:
301223
"""
302224
Determine the command type based on the API path and method.

0 commit comments

Comments
 (0)