Skip to content

Commit 3991af2

Browse files
committed
shifted telemetry http client to http.py
Signed-off-by: Sai Shree Pradhan <[email protected]>
1 parent 87ef0c6 commit 3991af2

File tree

3 files changed

+75
-39
lines changed

3 files changed

+75
-39
lines changed

src/databricks/sql/common/http.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import threading
66
from dataclasses import dataclass
77
from contextlib import contextmanager
8-
from typing import Generator
8+
from typing import Generator, Optional
99
import logging
10+
from requests.adapters import HTTPAdapter
11+
from databricks.sql.auth.retry import DatabricksRetryPolicy, CommandType
1012

1113
logger = logging.getLogger(__name__)
1214

@@ -81,3 +83,70 @@ def execute(
8183

8284
def close(self):
8385
self.session.close()
86+
87+
88+
class TelemetryHTTPAdapter(HTTPAdapter):
89+
"""
90+
Custom HTTP adapter to prepare our DatabricksRetryPolicy before each request.
91+
This ensures the retry timer is started and the command type is set correctly,
92+
allowing the policy to manage its state for the duration of the request retries.
93+
"""
94+
95+
def send(self, request, **kwargs):
96+
self.max_retries.command_type = CommandType.OTHER
97+
self.max_retries.start_retry_timer()
98+
return super().send(request, **kwargs)
99+
100+
101+
class TelemetryHttpClient: # TODO: Unify all the http clients in the PySQL Connector
102+
"""Singleton HTTP client for sending telemetry data."""
103+
104+
_instance: Optional["TelemetryHttpClient"] = None
105+
_lock = threading.Lock()
106+
107+
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_COUNT = 3
108+
TELEMETRY_RETRY_DELAY_MIN = 1.0
109+
TELEMETRY_RETRY_DELAY_MAX = 10.0
110+
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION = 30.0
111+
112+
def __init__(self):
113+
"""Initializes the session and mounts the custom retry adapter."""
114+
retry_policy = DatabricksRetryPolicy(
115+
delay_min=self.TELEMETRY_RETRY_DELAY_MIN,
116+
delay_max=self.TELEMETRY_RETRY_DELAY_MAX,
117+
stop_after_attempts_count=self.TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_COUNT,
118+
stop_after_attempts_duration=self.TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION,
119+
delay_default=1.0,
120+
force_dangerous_codes=[],
121+
)
122+
adapter = TelemetryHTTPAdapter(max_retries=retry_policy)
123+
self.session = requests.Session()
124+
self.session.mount("https://", adapter)
125+
self.session.mount("http://", adapter)
126+
127+
@classmethod
128+
def get_instance(cls) -> "TelemetryHttpClient":
129+
"""Get the singleton instance of the TelemetryHttpClient."""
130+
if cls._instance is None:
131+
with cls._lock:
132+
if cls._instance is None:
133+
logger.debug("Initializing singleton TelemetryHttpClient")
134+
cls._instance = TelemetryHttpClient()
135+
return cls._instance
136+
137+
def post(self, url: str, **kwargs) -> requests.Response:
138+
"""
139+
Executes a POST request using the configured session.
140+
141+
This is a blocking call intended to be run in a background thread.
142+
"""
143+
logger.debug("Executing telemetry POST request to: %s", url)
144+
return self.session.post(url, **kwargs)
145+
146+
def close(self):
147+
"""Closes the underlying requests.Session."""
148+
logger.debug("Closing TelemetryHttpClient session.")
149+
self.session.close()
150+
# Clear the instance to allow for re-initialization if needed
151+
with TelemetryHttpClient._lock:
152+
TelemetryHttpClient._instance = None

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
from concurrent.futures import ThreadPoolExecutor
66
from typing import Dict, Optional
7+
from databricks.sql.common.http import TelemetryHttpClient
78
from databricks.sql.telemetry.models.event import (
89
TelemetryEvent,
910
DriverSystemConfiguration,
@@ -25,8 +26,6 @@
2526
DatabricksOAuthProvider,
2627
ExternalAuthProvider,
2728
)
28-
from requests.adapters import HTTPAdapter
29-
from databricks.sql.auth.retry import DatabricksRetryPolicy, CommandType
3029
import sys
3130
import platform
3231
import uuid
@@ -36,19 +35,6 @@
3635
logger = logging.getLogger(__name__)
3736

3837

39-
class TelemetryHTTPAdapter(HTTPAdapter):
40-
"""
41-
Custom HTTP adapter to prepare our DatabricksRetryPolicy before each request.
42-
This ensures the retry timer is started and the command type is set correctly,
43-
allowing the policy to manage its state for the duration of the request retries.
44-
"""
45-
46-
def send(self, request, **kwargs):
47-
self.max_retries.command_type = CommandType.OTHER
48-
self.max_retries.start_retry_timer()
49-
return super().send(request, **kwargs)
50-
51-
5238
class TelemetryHelper:
5339
"""Helper class for getting telemetry related information."""
5440

@@ -143,11 +129,6 @@ class TelemetryClient(BaseTelemetryClient):
143129
It uses a thread pool to handle asynchronous operations, that it gets from the TelemetryClientFactory.
144130
"""
145131

146-
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_COUNT = 3
147-
TELEMETRY_RETRY_DELAY_MIN = 1.0
148-
TELEMETRY_RETRY_DELAY_MAX = 10.0
149-
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION = 30.0
150-
151132
# Telemetry endpoint paths
152133
TELEMETRY_AUTHENTICATED_PATH = "/telemetry-ext"
153134
TELEMETRY_UNAUTHENTICATED_PATH = "/telemetry-unauth"
@@ -173,21 +154,7 @@ def __init__(
173154
self._driver_connection_params = None
174155
self._host_url = host_url
175156
self._executor = executor
176-
177-
self._telemetry_retry_policy = DatabricksRetryPolicy(
178-
delay_min=self.TELEMETRY_RETRY_DELAY_MIN,
179-
delay_max=self.TELEMETRY_RETRY_DELAY_MAX,
180-
stop_after_attempts_count=self.TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_COUNT,
181-
stop_after_attempts_duration=self.TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION,
182-
delay_default=1.0,
183-
force_dangerous_codes=[],
184-
)
185-
self._session = (
186-
requests.Session()
187-
) # TODO: Use DatabricksHttpClient instead (unify all http clients)
188-
adapter = TelemetryHTTPAdapter(max_retries=self._telemetry_retry_policy)
189-
self._session.mount("https://", adapter)
190-
self._session.mount("http://", adapter)
157+
self._http_client = TelemetryHttpClient.get_instance()
191158

192159
def _export_event(self, event):
193160
"""Add an event to the batch queue and flush if batch is full"""
@@ -236,7 +203,7 @@ def _send_telemetry(self, events):
236203
try:
237204
logger.debug("Submitting telemetry request to thread pool")
238205
future = self._executor.submit(
239-
self._session.post,
206+
self._http_client.post,
240207
url,
241208
data=request.to_json(),
242209
headers=headers,
@@ -341,7 +308,6 @@ def close(self):
341308
"""Flush remaining events before closing"""
342309
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
343310
self._flush()
344-
self._session.close()
345311

346312

347313
class TelemetryClientFactory:
@@ -463,6 +429,7 @@ def close(session_id_hex):
463429
)
464430
try:
465431
TelemetryClientFactory._executor.shutdown(wait=True)
432+
TelemetryHttpClient.get_instance().close()
466433
except Exception as e:
467434
logger.debug("Failed to shutdown thread pool executor: %s", e)
468435
TelemetryClientFactory._executor = None

tests/unit/test_telemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def test_network_request_flow(self, mock_post, mock_telemetry_client):
9191
args, kwargs = client._executor.submit.call_args
9292

9393
# Verify correct function and URL
94-
assert args[0] == client._session.post
94+
assert args[0] == client._http_client.post
9595
assert args[1] == "https://test-host.com/telemetry-ext"
9696
assert kwargs["headers"]["Authorization"] == "Bearer test-token"
9797

0 commit comments

Comments
 (0)