Skip to content

Commit 876ed88

Browse files
committed
test
1 parent cca421b commit 876ed88

File tree

2 files changed

+165
-131
lines changed

2 files changed

+165
-131
lines changed

src/databricks/sql/telemetry/latency_logger.py

Lines changed: 131 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import time
22
import functools
3-
from typing import Optional
3+
from typing import Optional, Dict, Any
44
import logging
55
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
66
from databricks.sql.telemetry.models.event import (
@@ -11,127 +11,141 @@
1111
logger = logging.getLogger(__name__)
1212

1313

14-
class TelemetryExtractor:
14+
def _extract_cursor_data(cursor) -> Dict[str, Any]:
1515
"""
16-
Base class for extracting telemetry information from various object types.
16+
Extract telemetry data directly from a Cursor object.
1717
18-
This class serves as a proxy that delegates attribute access to the wrapped object
19-
while providing a common interface for extracting telemetry-related data.
20-
"""
21-
22-
def __init__(self, obj):
23-
self._obj = obj
24-
25-
def __getattr__(self, name):
26-
return getattr(self._obj, name)
27-
28-
def get_session_id_hex(self):
29-
pass
30-
31-
def get_statement_id(self):
32-
pass
33-
34-
def get_is_compressed(self):
35-
pass
36-
37-
def get_execution_result_format(self):
38-
pass
39-
40-
def get_retry_count(self):
41-
pass
42-
43-
def get_chunk_id(self):
44-
pass
18+
OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
19+
This eliminates object creation overhead and method call indirection.
4520
21+
Args:
22+
cursor: The Cursor object to extract data from
4623
47-
class CursorExtractor(TelemetryExtractor):
24+
Returns:
25+
Dict with telemetry data (values may be None if extraction fails)
4826
"""
49-
Telemetry extractor specialized for Cursor objects.
50-
51-
Extracts telemetry information from database cursor objects, including
52-
statement IDs, session information, compression settings, and result formats.
27+
data = {}
28+
29+
# Extract statement_id (query_id) - direct attribute access
30+
try:
31+
data['statement_id'] = cursor.query_id
32+
except (AttributeError, Exception):
33+
data['statement_id'] = None
34+
35+
# Extract session_id_hex - direct method call
36+
try:
37+
data['session_id_hex'] = cursor.connection.get_session_id_hex()
38+
except (AttributeError, Exception):
39+
data['session_id_hex'] = None
40+
41+
# Extract is_compressed - direct attribute access
42+
try:
43+
data['is_compressed'] = cursor.connection.lz4_compression
44+
except (AttributeError, Exception):
45+
data['is_compressed'] = False
46+
47+
# Extract execution_result_format - inline logic
48+
try:
49+
if cursor.active_result_set is None:
50+
data['execution_result'] = ExecutionResultFormat.FORMAT_UNSPECIFIED
51+
else:
52+
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
53+
54+
results = cursor.active_result_set.results
55+
if isinstance(results, ColumnQueue):
56+
data['execution_result'] = ExecutionResultFormat.COLUMNAR_INLINE
57+
elif isinstance(results, CloudFetchQueue):
58+
data['execution_result'] = ExecutionResultFormat.EXTERNAL_LINKS
59+
elif isinstance(results, ArrowQueue):
60+
data['execution_result'] = ExecutionResultFormat.INLINE_ARROW
61+
else:
62+
data['execution_result'] = ExecutionResultFormat.FORMAT_UNSPECIFIED
63+
except (AttributeError, Exception):
64+
data['execution_result'] = ExecutionResultFormat.FORMAT_UNSPECIFIED
65+
66+
# Extract retry_count - direct attribute access
67+
try:
68+
if hasattr(cursor.backend, "retry_policy") and cursor.backend.retry_policy:
69+
data['retry_count'] = len(cursor.backend.retry_policy.history)
70+
else:
71+
data['retry_count'] = 0
72+
except (AttributeError, Exception):
73+
data['retry_count'] = 0
74+
75+
# chunk_id is always None for Cursor
76+
data['chunk_id'] = None
77+
78+
return data
79+
80+
81+
def _extract_result_set_handler_data(handler) -> Dict[str, Any]:
5382
"""
83+
Extract telemetry data directly from a ResultSetDownloadHandler object.
5484
55-
def get_statement_id(self) -> Optional[str]:
56-
return self.query_id
57-
58-
def get_session_id_hex(self) -> Optional[str]:
59-
return self.connection.get_session_id_hex()
60-
61-
def get_is_compressed(self) -> bool:
62-
return self.connection.lz4_compression
63-
64-
def get_execution_result_format(self) -> ExecutionResultFormat:
65-
if self.active_result_set is None:
66-
return ExecutionResultFormat.FORMAT_UNSPECIFIED
67-
68-
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
69-
70-
if isinstance(self.active_result_set.results, ColumnQueue):
71-
return ExecutionResultFormat.COLUMNAR_INLINE
72-
elif isinstance(self.active_result_set.results, CloudFetchQueue):
73-
return ExecutionResultFormat.EXTERNAL_LINKS
74-
elif isinstance(self.active_result_set.results, ArrowQueue):
75-
return ExecutionResultFormat.INLINE_ARROW
76-
return ExecutionResultFormat.FORMAT_UNSPECIFIED
77-
78-
def get_retry_count(self) -> int:
79-
if hasattr(self.backend, "retry_policy") and self.backend.retry_policy:
80-
return len(self.backend.retry_policy.history)
81-
return 0
82-
83-
def get_chunk_id(self):
84-
return None
85+
OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
8586
87+
Args:
88+
handler: The ResultSetDownloadHandler object to extract data from
8689
87-
class ResultSetDownloadHandlerExtractor(TelemetryExtractor):
88-
"""
89-
Telemetry extractor specialized for ResultSetDownloadHandler objects.
90+
Returns:
91+
Dict with telemetry data (values may be None if extraction fails)
9092
"""
93+
data = {}
9194

92-
def get_session_id_hex(self) -> Optional[str]:
93-
return self._obj.session_id_hex
95+
# Extract session_id_hex - direct attribute access
96+
try:
97+
data['session_id_hex'] = handler.session_id_hex
98+
except (AttributeError, Exception):
99+
data['session_id_hex'] = None
94100

95-
def get_statement_id(self) -> Optional[str]:
96-
return self._obj.statement_id
101+
# Extract statement_id - direct attribute access
102+
try:
103+
data['statement_id'] = handler.statement_id
104+
except (AttributeError, Exception):
105+
data['statement_id'] = None
97106

98-
def get_is_compressed(self) -> bool:
99-
return self._obj.settings.is_lz4_compressed
107+
# Extract is_compressed - direct attribute access
108+
try:
109+
data['is_compressed'] = handler.settings.is_lz4_compressed
110+
except (AttributeError, Exception):
111+
data['is_compressed'] = False
100112

101-
def get_execution_result_format(self) -> ExecutionResultFormat:
102-
return ExecutionResultFormat.EXTERNAL_LINKS
113+
# execution_result is always EXTERNAL_LINKS for result set handlers
114+
data['execution_result'] = ExecutionResultFormat.EXTERNAL_LINKS
103115

104-
def get_retry_count(self) -> Optional[int]:
105-
# standard requests and urllib3 libraries don't expose retry count
106-
return None
116+
# retry_count is not available for result set handlers
117+
data['retry_count'] = None
118+
119+
# Extract chunk_id - direct attribute access
120+
try:
121+
data['chunk_id'] = handler.chunk_id
122+
except (AttributeError, Exception):
123+
data['chunk_id'] = None
107124

108-
def get_chunk_id(self) -> Optional[int]:
109-
return self._obj.chunk_id
125+
return data
110126

111127

112-
def get_extractor(obj):
128+
def _extract_telemetry_data(obj) -> Optional[Dict[str, Any]]:
113129
"""
114-
Factory function to create the appropriate telemetry extractor for an object.
130+
Extract telemetry data from an object based on its type.
115131
116-
Determines the object type and returns the corresponding specialized extractor
117-
that can extract telemetry information from that object type.
132+
OPTIMIZATION: Returns a simple dict instead of creating wrapper objects.
133+
This dict will be used to create the SqlExecutionEvent in the background thread.
118134
119135
Args:
120-
obj: The object to create an extractor for. Can be a Cursor,
121-
ResultSetDownloadHandler, or any other object.
136+
obj: The object to extract data from (Cursor, ResultSetDownloadHandler, etc.)
122137
123138
Returns:
124-
TelemetryExtractor: A specialized extractor instance:
125-
- CursorExtractor for Cursor objects
126-
- ResultSetDownloadHandlerExtractor for ResultSetDownloadHandler objects
127-
- None for all other objects
139+
Dict with telemetry data, or None if object type is not supported
128140
"""
129-
if obj.__class__.__name__ == "Cursor":
130-
return CursorExtractor(obj)
131-
elif obj.__class__.__name__ == "ResultSetDownloadHandler":
132-
return ResultSetDownloadHandlerExtractor(obj)
141+
obj_type = obj.__class__.__name__
142+
143+
if obj_type == "Cursor":
144+
return _extract_cursor_data(obj)
145+
elif obj_type == "ResultSetDownloadHandler":
146+
return _extract_result_set_handler_data(obj)
133147
else:
134-
logger.debug("No extractor found for %s", obj.__class__.__name__)
148+
logger.debug("No telemetry extraction available for %s", obj_type)
135149
return None
136150

137151

@@ -143,11 +157,10 @@ def log_latency(statement_type: StatementType = StatementType.NONE):
143157
data about the operation, including latency, statement information, and
144158
execution context.
145159
146-
The decorator automatically:
147-
- Measures execution time using high-precision performance counters
148-
- Extracts telemetry information from the method's object (self)
149-
- Creates a SqlExecutionEvent with execution details
150-
- Sends the telemetry data asynchronously via TelemetryClient
160+
OPTIMIZATIONS APPLIED:
161+
- Uses time.monotonic() instead of time.perf_counter() for faster timing
162+
- Direct attribute access instead of wrapper extractor objects
163+
- Dict-based data collection to minimize object creation overhead
151164
152165
Args:
153166
statement_type (StatementType): The type of SQL statement being executed.
@@ -162,46 +175,41 @@ def execute(self, query):
162175
function: A decorator that wraps methods to add latency logging.
163176
164177
Note:
165-
The wrapped method's object (self) must be compatible with the
166-
telemetry extractor system (e.g., Cursor or ResultSet objects).
178+
The wrapped method's object (self) must be a Cursor or
179+
ResultSetDownloadHandler for telemetry data extraction.
167180
"""
168181

169182
def decorator(func):
170183
@functools.wraps(func)
171184
def wrapper(self, *args, **kwargs):
172-
start_time = time.perf_counter()
185+
# Use monotonic clock for faster timing, sufficient for telemetry
186+
start_time = time.monotonic()
173187
result = None
174188
try:
175189
result = func(self, *args, **kwargs)
176190
return result
177191
finally:
178-
179-
def _safe_call(func_to_call):
180-
"""Calls a function and returns a default value on any exception."""
181-
try:
182-
return func_to_call()
183-
except Exception:
184-
return None
185-
186-
end_time = time.perf_counter()
192+
# Calculate duration once
193+
end_time = time.monotonic()
187194
duration_ms = int((end_time - start_time) * 1000)
188195

189-
extractor = get_extractor(self)
196+
# Extract telemetry data directly without creating extractor objects
197+
telemetry_data = _extract_telemetry_data(self)
190198

191-
if extractor is not None:
192-
session_id_hex = _safe_call(extractor.get_session_id_hex)
193-
statement_id = _safe_call(extractor.get_statement_id)
199+
if telemetry_data is not None:
200+
session_id_hex = telemetry_data.get('session_id_hex')
201+
statement_id = telemetry_data.get('statement_id')
194202

203+
# Create event from extracted data
195204
sql_exec_event = SqlExecutionEvent(
196205
statement_type=statement_type,
197-
is_compressed=_safe_call(extractor.get_is_compressed),
198-
execution_result=_safe_call(
199-
extractor.get_execution_result_format
200-
),
201-
retry_count=_safe_call(extractor.get_retry_count),
202-
chunk_id=_safe_call(extractor.get_chunk_id),
206+
is_compressed=telemetry_data.get('is_compressed'),
207+
execution_result=telemetry_data.get('execution_result'),
208+
retry_count=telemetry_data.get('retry_count'),
209+
chunk_id=telemetry_data.get('chunk_id'),
203210
)
204211

212+
# Send telemetry asynchronously
205213
telemetry_client = TelemetryClientFactory.get_telemetry_client(
206214
session_id_hex
207215
)

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
import logging
44
import json
5+
from queue import Queue, Full
56
from concurrent.futures import ThreadPoolExecutor
67
from concurrent.futures import Future
78
from datetime import datetime, timezone
@@ -180,8 +181,11 @@ def __init__(
180181
self._session_id_hex = session_id_hex
181182
self._auth_provider = auth_provider
182183
self._user_agent = None
183-
self._events_batch = []
184-
self._lock = threading.RLock()
184+
185+
# OPTIMIZATION: Use lock-free Queue instead of list + lock
186+
# Queue is thread-safe internally and has better performance under concurrency
187+
self._events_queue = Queue(maxsize=batch_size * 2) # Allow some buffering
188+
185189
self._driver_connection_params = None
186190
self._host_url = host_url
187191
self._executor = executor
@@ -192,19 +196,41 @@ def __init__(
192196
def _export_event(self, event):
193197
"""Add an event to the batch queue and flush if batch is full"""
194198
logger.debug("Exporting event for connection %s", self._session_id_hex)
195-
with self._lock:
196-
self._events_batch.append(event)
197-
if len(self._events_batch) >= self._batch_size:
199+
200+
# OPTIMIZATION: Use non-blocking put with queue
201+
# No explicit lock needed - Queue is thread-safe internally
202+
try:
203+
self._events_queue.put_nowait(event)
204+
except Full:
205+
# Queue is full, trigger immediate flush
206+
logger.debug("Event queue full, triggering flush")
207+
self._flush()
208+
# Try again after flush
209+
try:
210+
self._events_queue.put_nowait(event)
211+
except Full:
212+
# Still full, drop event (acceptable for telemetry)
213+
logger.debug("Dropped telemetry event - queue still full")
214+
215+
# Check if we should flush based on queue size
216+
if self._events_queue.qsize() >= self._batch_size:
198217
logger.debug(
199218
"Batch size limit reached (%s), flushing events", self._batch_size
200219
)
201220
self._flush()
202221

203222
def _flush(self):
204223
"""Flush the current batch of events to the server"""
205-
with self._lock:
206-
events_to_flush = self._events_batch.copy()
207-
self._events_batch = []
224+
# OPTIMIZATION: Drain queue without locks
225+
# Collect all events currently in the queue
226+
events_to_flush = []
227+
while not self._events_queue.empty():
228+
try:
229+
event = self._events_queue.get_nowait()
230+
events_to_flush.append(event)
231+
except:
232+
# Queue is empty
233+
break
208234

209235
if events_to_flush:
210236
logger.debug("Flushing %s telemetry events to server", len(events_to_flush))

0 commit comments

Comments
 (0)