Skip to content

Commit 8a2f333

Browse files
authored
[PECOBLR-1408] [PECOBLR-1407] [PECOBLR-1409][PECOBLR-1411] Address telemetry code audit comments- part1 (databricks#1163)
## Description - 4 things that is improved with respect to telemetry : - Common object mapper across telemetry use-case (This is already thread safe and is expensive to create, i.e., good tor re-use) - Make`flushIntervalMillis` config same across both telemetry clients (un-auth and auth) - Clear connection param cache when connection is closed : this was a memory leak before - Rather than creating a scheduledExecutor for each telemetry client, we share it across a factory. ## Testing - unit tests ## Additional Notes to the Reviewer - When the LAST connection to a host is closed, all pending telemetry events for that host are flushed across all prior connections (since they all shared the same TelemetryClient). i.e., If you have 5 connections to `host-A`, closing connections 1-4 does nothing (just decrements refCount). Only when you close connection 5 (the last one) does the flush occur, sending all accumulated telemetry from all 5 connections. NO_CHANGELOG=true
1 parent 9c9ffe2 commit 8a2f333

24 files changed

+968
-101
lines changed

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import com.databricks.jdbc.model.core.ResultManifest;
3131
import com.databricks.jdbc.model.core.StatementStatus;
3232
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
33-
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
33+
import com.databricks.jdbc.telemetry.TelemetryHelper;
3434
import com.databricks.sdk.support.ToStringer;
3535
import com.google.common.annotations.VisibleForTesting;
3636
import java.io.InputStream;
@@ -271,9 +271,8 @@ public DatabricksResultSet(
271271
public boolean next() throws SQLException {
272272
checkIfClosed();
273273
boolean hasNext = this.executionResult.next();
274-
TelemetryCollector.getInstance()
275-
.recordResultSetIteration(
276-
statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext);
274+
TelemetryHelper.recordResultSetIteration(
275+
parentStatement, statementId, resultSetMetaData.getChunkCount(), hasNext);
277276
return hasNext;
278277
}
279278

src/main/java/com/databricks/jdbc/api/impl/ExecutionResultFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import com.databricks.jdbc.model.core.ResultData;
1616
import com.databricks.jdbc.model.core.ResultManifest;
1717
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
18-
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
18+
import com.databricks.jdbc.telemetry.TelemetryHelper;
1919
import java.sql.SQLException;
2020
import java.util.List;
2121

@@ -50,7 +50,8 @@ private static IExecutionResult getResultHandler(
5050
throw new DatabricksParsingException(
5151
"Empty response format", DatabricksDriverErrorCode.INVALID_STATE);
5252
}
53-
TelemetryCollector.getInstance().setResultFormat(statementId, manifest.getFormat());
53+
TelemetryHelper.setResultFormat(
54+
session.getConnectionContext(), statementId, manifest.getFormat());
5455
LOGGER.info("Processing result of format {} from SQL Execution API", manifest.getFormat());
5556
// We use JSON_ARRAY for metadata and update commands, and ARROW_STREAM for query results
5657
switch (manifest.getFormat()) {
@@ -91,7 +92,7 @@ private static IExecutionResult getResultHandler(
9192
IDatabricksSession session)
9293
throws SQLException {
9394
TSparkRowSetType resultFormat = resultsResp.getResultSetMetadata().getResultFormat();
94-
TelemetryCollector.getInstance().setResultFormat(parentStatement, resultFormat);
95+
TelemetryHelper.setResultFormat(session.getConnectionContext(), parentStatement, resultFormat);
9596
LOGGER.info("Processing result of format {} from Thrift server", resultFormat);
9697
switch (resultFormat) {
9798
case COLUMN_BASED_SET:

src/main/java/com/databricks/jdbc/api/impl/arrow/AbstractRemoteChunkProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import com.databricks.jdbc.model.core.ResultData;
1818
import com.databricks.jdbc.model.core.ResultManifest;
1919
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
20-
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
20+
import com.databricks.jdbc.telemetry.TelemetryHelper;
2121
import com.databricks.sdk.service.sql.BaseChunkInfo;
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.ConcurrentMap;
@@ -83,7 +83,7 @@ protected AbstractRemoteChunkProvider(
8383
chunkCount,
8484
chunkIndexToChunksMap,
8585
resultData.getExternalLinks() != null ? resultData.getExternalLinks().size() : 1);
86-
TelemetryCollector.getInstance().recordTotalChunks(statementId, chunkCount);
86+
TelemetryHelper.recordTotalChunks(session.getConnectionContext(), statementId, chunkCount);
8787
initializeData();
8888
}
8989

@@ -271,7 +271,7 @@ private ConcurrentMap<Long, T> initializeChunksMap(
271271
resultsResp = session.getDatabricksClient().getMoreResults(parentStatement);
272272
populateChunkIndexMap(resultsResp.getResults(), chunkIndexMap);
273273
}
274-
TelemetryCollector.getInstance().recordTotalChunks(statementId, chunkCount);
274+
TelemetryHelper.recordTotalChunks(session.getConnectionContext(), statementId, chunkCount);
275275
return chunkIndexMap;
276276
}
277277

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowResultChunk.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static com.databricks.jdbc.common.util.ValidationUtil.checkHTTPError;
55
import static com.databricks.jdbc.telemetry.TelemetryHelper.getStatementIdString;
66

7+
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
78
import com.databricks.jdbc.common.CompressionCodec;
89
import com.databricks.jdbc.common.DatabricksJdbcUrlParams;
910
import com.databricks.jdbc.common.util.DecompressionUtil;
@@ -15,7 +16,7 @@
1516
import com.databricks.jdbc.log.JdbcLoggerFactory;
1617
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
1718
import com.databricks.jdbc.model.core.ExternalLink;
18-
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
19+
import com.databricks.jdbc.telemetry.TelemetryHelper;
1920
import com.databricks.sdk.service.sql.BaseChunkInfo;
2021
import java.io.IOException;
2122
import java.io.InputStream;
@@ -28,6 +29,7 @@
2829

2930
public class ArrowResultChunk extends AbstractArrowResultChunk {
3031
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ArrowResultChunk.class);
32+
private final IDatabricksConnectionContext connectionContext;
3133

3234
private ArrowResultChunk(Builder builder) throws DatabricksParsingException {
3335
super(
@@ -39,6 +41,7 @@ private ArrowResultChunk(Builder builder) throws DatabricksParsingException {
3941
builder.chunkLink,
4042
builder.expiryTime,
4143
builder.chunkReadyTimeoutSeconds);
44+
this.connectionContext = builder.connectionContext;
4245
if (builder.inputStream != null) {
4346
// Data is already available
4447
try {
@@ -80,10 +83,9 @@ protected void downloadData(
8083
checkHTTPError(response);
8184
long downloadTimeMs = (System.nanoTime() - startTime) / 1_000_000;
8285

83-
// Add telemetry for the time to first byte for chunk download
84-
TelemetryCollector.getInstance()
85-
.recordChunkDownloadLatency(
86-
getStatementIdString(statementId), chunkIndex, downloadTimeMs);
86+
// Record chunk download latency telemetry
87+
TelemetryHelper.recordChunkDownloadLatency(
88+
connectionContext, getStatementIdString(statementId), chunkIndex, downloadTimeMs);
8789

8890
// Read compressed stream fully (download latency excludes decompression)
8991
byte[] compressed = IOUtils.toByteArray(response.getEntity().getContent());
@@ -183,12 +185,18 @@ public static class Builder {
183185
private InputStream inputStream;
184186
private int chunkReadyTimeoutSeconds =
185187
Integer.parseInt(DatabricksJdbcUrlParams.CHUNK_READY_TIMEOUT_SECONDS.getDefaultValue());
188+
private IDatabricksConnectionContext connectionContext;
186189

187190
public Builder withStatementId(StatementId statementId) {
188191
this.statementId = statementId;
189192
return this;
190193
}
191194

195+
public Builder withConnectionContext(IDatabricksConnectionContext connectionContext) {
196+
this.connectionContext = connectionContext;
197+
return this;
198+
}
199+
192200
public Builder withChunkInfo(BaseChunkInfo baseChunkInfo) {
193201
this.chunkIndex = baseChunkInfo.getChunkIndex();
194202
this.numRows = baseChunkInfo.getRowCount();

src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ private static ChunkProvider createRemoteChunkProvider(
128128
linkPrefetchWindow,
129129
chunkReadyTimeoutSeconds,
130130
cloudFetchSpeedThreshold,
131+
connectionContext,
131132
initialLinks);
132133
} else {
133134
// Use the original RemoteChunkProvider
@@ -208,6 +209,7 @@ private static ChunkProvider createThriftRemoteChunkProvider(
208209
linkPrefetchWindow,
209210
chunkReadyTimeoutSeconds,
210211
cloudFetchSpeedThreshold,
212+
connectionContext,
211213
initialLinks);
212214
} else {
213215
// Use the original RemoteChunkProvider

src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ protected ArrowResultChunk createChunk(
6666
.withStatementId(statementId)
6767
.withChunkInfo(chunkInfo)
6868
.withChunkReadyTimeoutSeconds(chunkReadyTimeoutSeconds)
69+
.withConnectionContext(session.getConnectionContext())
6970
.build();
7071
}
7172

@@ -78,6 +79,7 @@ protected ArrowResultChunk createChunk(
7879
.withStatementId(statementId)
7980
.withThriftChunkInfo(chunkIndex, resultLink)
8081
.withChunkReadyTimeoutSeconds(chunkReadyTimeoutSeconds)
82+
.withConnectionContext(session.getConnectionContext())
8183
.build();
8284
}
8385

src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.databricks.jdbc.api.impl.arrow;
22

3+
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
34
import com.databricks.jdbc.common.CompressionCodec;
45
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
56
import com.databricks.jdbc.dbclient.impl.common.StatementId;
@@ -60,6 +61,7 @@ public class StreamingChunkProvider implements ChunkProvider {
6061
private final CompressionCodec compressionCodec;
6162
private final StatementId statementId;
6263
private final double cloudFetchSpeedThreshold;
64+
private final IDatabricksConnectionContext connectionContext;
6365

6466
// Chunk storage
6567
private final ConcurrentMap<Long, ArrowResultChunk> chunks = new ConcurrentHashMap<>();
@@ -125,6 +127,7 @@ public StreamingChunkProvider(
125127
int linkPrefetchWindow,
126128
int chunkReadyTimeoutSeconds,
127129
double cloudFetchSpeedThreshold,
130+
IDatabricksConnectionContext connectionContext,
128131
ChunkLinkFetchResult initialLinks)
129132
throws DatabricksParsingException {
130133

@@ -136,6 +139,7 @@ public StreamingChunkProvider(
136139
this.linkPrefetchWindow = linkPrefetchWindow;
137140
this.chunkReadyTimeoutSeconds = chunkReadyTimeoutSeconds;
138141
this.cloudFetchSpeedThreshold = cloudFetchSpeedThreshold;
142+
this.connectionContext = connectionContext;
139143

140144
LOGGER.info(
141145
"Creating StreamingChunkProvider for statement {}: maxChunksInMemory={}, linkPrefetchWindow={}",
@@ -461,6 +465,7 @@ private void createChunkFromLink(ExternalLink link) throws DatabricksParsingExce
461465
.withStatementId(statementId)
462466
.withChunkMetadata(chunkIndex, rowCount, rowOffset)
463467
.withChunkReadyTimeoutSeconds(chunkReadyTimeoutSeconds)
468+
.withConnectionContext(connectionContext)
464469
.build();
465470

466471
chunk.setChunkLink(link);
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package com.databricks.jdbc.common.util;
22

3+
import com.fasterxml.jackson.annotation.JsonInclude;
34
import com.fasterxml.jackson.databind.ObjectMapper;
45

56
public class JsonUtil {
67
// Thread-safe singleton instance
78
private static final ObjectMapper MAPPER = new ObjectMapper();
9+
private static final ObjectMapper TELEMETRY_MAPPER =
10+
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
811

9-
// Use the shared instance for your operations
12+
// Use the shared instance for driver operations
1013
public static ObjectMapper getMapper() {
1114
return MAPPER;
1215
}
16+
17+
// Use the shared instance for telemetry operations
18+
public static ObjectMapper getTelemetryMapper() {
19+
return TELEMETRY_MAPPER;
20+
}
1321
}

src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.databricks.jdbc.model.client.thrift.generated.*;
2323
import com.databricks.jdbc.model.core.StatementStatus;
2424
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
25-
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
25+
import com.databricks.jdbc.telemetry.TelemetryHelper;
2626
import com.databricks.sdk.core.DatabricksConfig;
2727
import com.databricks.sdk.service.sql.StatementState;
2828
import java.sql.SQLException;
@@ -849,8 +849,8 @@ private TGetOperationStatusResp getOperationStatus(
849849
"Statement [{}] Thrift operation status latency: {}ms",
850850
statementId,
851851
operationStatusLatencyMillis);
852-
TelemetryCollector.getInstance()
853-
.recordGetOperationStatus(statementId.toSQLExecStatementId(), operationStatusLatencyMillis);
852+
TelemetryHelper.recordGetOperationStatus(
853+
connectionContext, statementId.toSQLExecStatementId(), operationStatusLatencyMillis);
854854
return operationStatus;
855855
}
856856
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryClient.java

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
import com.databricks.jdbc.log.JdbcLoggerFactory;
66
import com.databricks.jdbc.model.telemetry.TelemetryFrontendLog;
77
import com.databricks.jdbc.telemetry.latency.TelemetryCollector;
8+
import com.databricks.jdbc.telemetry.latency.TelemetryCollectorManager;
89
import com.databricks.sdk.core.DatabricksConfig;
910
import java.util.LinkedList;
1011
import java.util.List;
1112
import java.util.concurrent.*;
1213
import java.util.concurrent.CompletableFuture;
13-
import java.util.concurrent.atomic.AtomicInteger;
1414

1515
public class TelemetryClient implements ITelemetryClient {
1616
private static final int MINIMUM_TELEMETRY_FLUSH_MILLISECONDS = 1000;
@@ -26,31 +26,27 @@ public class TelemetryClient implements ITelemetryClient {
2626
private ScheduledFuture<?> flushTask;
2727
private final int flushIntervalMillis;
2828

29-
private static ThreadFactory createSchedulerThreadFactory() {
30-
return new ThreadFactory() {
31-
private final AtomicInteger threadNumber = new AtomicInteger(1);
32-
33-
@Override
34-
public Thread newThread(Runnable r) {
35-
Thread thread = new Thread(r, "Telemetry-Scheduler-" + threadNumber.getAndIncrement());
36-
thread.setDaemon(true);
37-
return thread;
38-
}
39-
};
40-
}
41-
4229
public TelemetryClient(
4330
IDatabricksConnectionContext connectionContext,
4431
ExecutorService executorService,
32+
ScheduledExecutorService scheduledExecutorService,
4533
DatabricksConfig config) {
4634
this.eventsBatch = new LinkedList<>();
4735
this.eventsBatchSize = connectionContext.getTelemetryBatchSize();
4836
this.context = connectionContext;
4937
this.databricksConfig = config;
5038
this.executorService = executorService;
51-
this.scheduledExecutorService =
52-
Executors.newSingleThreadScheduledExecutor(createSchedulerThreadFactory());
53-
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
39+
/*
40+
* The scheduledExecutorService is shared across all telemetry clients and only schedules
41+
* periodic flush checks. The actual flush work (network I/O) is submitted asynchronously
42+
* to the executorService (10-thread pool), so a slow flush on one statement does not block
43+
* flushes for other statements as long as worker threads are available in the pool.
44+
*/
45+
this.scheduledExecutorService = scheduledExecutorService;
46+
this.flushIntervalMillis =
47+
Math.max(
48+
context.getTelemetryFlushIntervalInMilliseconds(),
49+
MINIMUM_TELEMETRY_FLUSH_MILLISECONDS); // To avoid illegalArgument exception in any case
5450
this.lastFlushedTime = System.currentTimeMillis();
5551
this.telemetryPushClient =
5652
TelemetryClientFactory.getTelemetryPushClient(
@@ -59,14 +55,15 @@ public TelemetryClient(
5955
}
6056

6157
public TelemetryClient(
62-
IDatabricksConnectionContext connectionContext, ExecutorService executorService) {
58+
IDatabricksConnectionContext connectionContext,
59+
ExecutorService executorService,
60+
ScheduledExecutorService scheduledExecutorService) {
6361
this.eventsBatch = new LinkedList<>();
6462
eventsBatchSize = connectionContext.getTelemetryBatchSize();
6563
this.context = connectionContext;
6664
this.databricksConfig = null;
6765
this.executorService = executorService;
68-
this.scheduledExecutorService =
69-
Executors.newSingleThreadScheduledExecutor(createSchedulerThreadFactory());
66+
this.scheduledExecutorService = scheduledExecutorService;
7067
this.flushIntervalMillis =
7168
Math.max(
7269
context.getTelemetryFlushIntervalInMilliseconds(),
@@ -108,8 +105,10 @@ public void exportEvent(TelemetryFrontendLog event) {
108105

109106
@Override
110107
public void close() {
111-
// Export any pending latency telemetry before flushing
112-
TelemetryCollector.getInstance().exportAllPendingTelemetryDetails();
108+
// Export any pending latency telemetry before flushing for this connection
109+
TelemetryCollector collector =
110+
TelemetryCollectorManager.getInstance().getOrCreateCollector(context);
111+
collector.exportAllPendingTelemetryDetails();
113112

114113
try {
115114
// Synchronously flush the remaining events and wait for the task to complete
@@ -129,22 +128,13 @@ public void close() {
129128
flushTask.cancel(false);
130129
}
131130

132-
// Shut down the scheduler.
133-
// The executorService is assumed to be a shared resource and is not shut down here.
134-
scheduledExecutorService.shutdown();
135-
try {
136-
if (!scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)) {
137-
scheduledExecutorService.shutdownNow();
138-
}
139-
} catch (InterruptedException ie) {
140-
LOGGER.trace("Interrupted while waiting for flush to finish. Error: {}", ie);
141-
Thread.currentThread().interrupt();
142-
scheduledExecutorService.shutdownNow();
143-
}
131+
// Note: Both executorService and scheduledExecutorService are shared resources
132+
// managed by TelemetryClientFactory and should not be shut down here.
144133
}
145134

146135
/**
147-
* Submits a flush task to the executor service.
136+
* Submits a flush task to the executor service. Non-blocking: uses a shared thread pool (10
137+
* threads) so slow flushes don't block other statements.
148138
*
149139
* @param forceFlush - Flushes the eventsBatch for all size variations if forceFlush, otherwise
150140
* only flushes if eventsBatch size has breached

0 commit comments

Comments
 (0)