Skip to content

Commit ca0d48c

Browse files
authored
[PECOBLR-703][PECOBLR-735] Improve telemetry performance + Force telemetry to use daemon threads (#931)
## Description - Performance improvements : - Centralised thread pool, to avoid OOM and improve performance - Force executors on every export event - There are 2 types of thread creation : - One for in-general flush - One for scheduling flush every 300 seconds. (Daemon threads were not being in this - this prevented JVM from exiting. This PR changes that too) ## Testing - Manually tested E2E using `ForceEnableTelemetry=1` NO_CHANGELOG=true
1 parent 067cd57 commit ca0d48c

File tree

6 files changed

+53
-27
lines changed

6 files changed

+53
-27
lines changed

src/main/java/com/databricks/jdbc/common/util/ProcessNameUtil.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22

33
import static com.databricks.jdbc.common.util.WildcardUtil.isNullOrEmpty;
44

5-
import org.slf4j.Logger;
6-
import org.slf4j.LoggerFactory;
7-
85
/**
96
* Utility class for determining the current process name as it would appear in Activity Monitor.
7+
* Note : removing logging as it methods are called on static INIT and logging might not be fully
8+
* configured.
109
*/
1110
public class ProcessNameUtil {
1211
private static final String FALL_BACK_PROCESS_NAME = "UnknownJavaProcess";
13-
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessNameUtil.class);
1412

1513
/**
1614
* Gets the current process name as it would appear in Activity Monitor.
@@ -22,14 +20,12 @@ public static String getProcessName() {
2220
// Step 1: Try ProcessHandle API (Java 9+)
2321
String processName = getProcessNameFromHandle();
2422
if (!isNullOrEmpty(processName)) {
25-
LOGGER.trace("getProcessNameFromHandle: {}", processName);
2623
return processName;
2724
}
2825

2926
// Fallback
3027
return FALL_BACK_PROCESS_NAME;
3128
} catch (Exception e) {
32-
LOGGER.trace("Error getting process name {}, returning fallback", e);
3329
return FALL_BACK_PROCESS_NAME;
3430
}
3531
}
@@ -62,7 +58,6 @@ public static String getProcessNameFromHandle() {
6258
}
6359
return null;
6460
} catch (Exception e) {
65-
LOGGER.trace("Error getting process name from handle {}, returning null", e);
6661
return null;
6762
}
6863
}

src/main/java/com/databricks/jdbc/model/telemetry/StatementTelemetryDetails.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
public class StatementTelemetryDetails {
1111
private boolean isInternalCall;
1212
private final ChunkDetails chunkDetails;
13-
private final ResultLatency resultLatency;
13+
private final ResultLatency resultLatency =
14+
new ResultLatency()
15+
.setResultSetReadyLatencyMillis(null)
16+
.setResultSetConsumptionLatencyMillis(null);
1417
private final OperationDetail operationDetail;
1518
private Long operationLatencyMillis;
1619
private final String statementId;
@@ -25,10 +28,6 @@ public class StatementTelemetryDetails {
2528

2629
public StatementTelemetryDetails(String statementId) {
2730
this.chunkDetails = new ChunkDetails();
28-
this.resultLatency =
29-
new ResultLatency()
30-
.setResultSetReadyLatencyMillis(null)
31-
.setResultSetConsumptionLatencyMillis(null);
3231
this.isInternalCall = false;
3332
this.operationDetail = new OperationDetail(isInternalCall);
3433
this.operationLatencyMillis = null;

src/main/java/com/databricks/jdbc/telemetry/TelemetryClient.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
import java.util.LinkedList;
88
import java.util.List;
99
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
1011
import java.util.concurrent.ScheduledExecutorService;
1112
import java.util.concurrent.ScheduledFuture;
13+
import java.util.concurrent.ThreadFactory;
1214
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicInteger;
1316

1417
public class TelemetryClient implements ITelemetryClient {
1518

@@ -24,6 +27,19 @@ public class TelemetryClient implements ITelemetryClient {
2427
private ScheduledFuture<?> flushTask;
2528
private final int flushIntervalMillis;
2629

30+
private static ThreadFactory createSchedulerThreadFactory() {
31+
return new ThreadFactory() {
32+
private final AtomicInteger threadNumber = new AtomicInteger(1);
33+
34+
@Override
35+
public Thread newThread(Runnable r) {
36+
Thread thread = new Thread(r, "Telemetry-Scheduler-" + threadNumber.getAndIncrement());
37+
thread.setDaemon(true);
38+
return thread;
39+
}
40+
};
41+
}
42+
2743
public TelemetryClient(
2844
IDatabricksConnectionContext connectionContext,
2945
ExecutorService executorService,
@@ -35,7 +51,7 @@ public TelemetryClient(
3551
this.databricksConfig = config;
3652
this.executorService = executorService;
3753
this.scheduledExecutorService =
38-
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
54+
Executors.newSingleThreadScheduledExecutor(createSchedulerThreadFactory());
3955
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
4056
this.lastFlushedTime = System.currentTimeMillis();
4157
schedulePeriodicFlush();
@@ -50,7 +66,7 @@ public TelemetryClient(
5066
this.databricksConfig = null;
5167
this.executorService = executorService;
5268
this.scheduledExecutorService =
53-
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
69+
Executors.newSingleThreadScheduledExecutor(createSchedulerThreadFactory());
5470
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
5571
this.lastFlushedTime = System.currentTimeMillis();
5672
schedulePeriodicFlush();

src/main/java/com/databricks/jdbc/telemetry/TelemetryClientFactory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import java.util.LinkedHashMap;
1111
import java.util.concurrent.ExecutorService;
1212
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ThreadFactory;
14+
import java.util.concurrent.atomic.AtomicInteger;
1315

1416
public class TelemetryClientFactory {
1517

@@ -26,8 +28,21 @@ public class TelemetryClientFactory {
2628

2729
private final ExecutorService telemetryExecutorService;
2830

31+
private static ThreadFactory createThreadFactory() {
32+
return new ThreadFactory() {
33+
private final AtomicInteger threadNumber = new AtomicInteger(1);
34+
35+
@Override
36+
public Thread newThread(Runnable r) {
37+
Thread thread = new Thread(r, "Telemetry-Thread-" + threadNumber.getAndIncrement());
38+
thread.setDaemon(false);
39+
return thread;
40+
}
41+
};
42+
}
43+
2944
private TelemetryClientFactory() {
30-
telemetryExecutorService = Executors.newFixedThreadPool(10);
45+
telemetryExecutorService = Executors.newFixedThreadPool(10, createThreadFactory());
3146
}
3247

3348
public static TelemetryClientFactory getInstance() {

src/main/java/com/databricks/jdbc/telemetry/TelemetryPushTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class TelemetryPushTask implements Runnable {
5252

5353
@Override
5454
public void run() {
55-
LOGGER.debug("Pushing Telemetry logs of size {}", queueToBePushed.size());
55+
LOGGER.trace("Pushing Telemetry logs of size {}", queueToBePushed.size());
5656
TelemetryRequest request = new TelemetryRequest();
5757
if (queueToBePushed.isEmpty()) {
5858
return;
@@ -67,7 +67,7 @@ public void run() {
6767
try {
6868
return objectMapper.writeValueAsString(event);
6969
} catch (JsonProcessingException e) {
70-
LOGGER.error(
70+
LOGGER.trace(
7171
"Failed to serialize Telemetry event {} with error: {}", event, e);
7272
return null; // Return null for failed serialization
7373
}
@@ -98,13 +98,13 @@ public void run() {
9898
TelemetryResponse telResponse =
9999
objectMapper.readValue(
100100
EntityUtils.toString(response.getEntity()), TelemetryResponse.class);
101-
LOGGER.debug(
101+
LOGGER.trace(
102102
"Pushed Telemetry logs with request-Id {} with events {} with error count {}",
103103
response.getFirstHeader(REQUEST_ID_HEADER),
104104
telResponse.getNumProtoSuccess(),
105105
telResponse.getErrors().size());
106106
if (!telResponse.getErrors().isEmpty()) {
107-
LOGGER.debug("Failed to push telemetry logs with error: {}", telResponse.getErrors());
107+
LOGGER.trace("Failed to push telemetry logs with error: {}", telResponse.getErrors());
108108
}
109109
if (queueToBePushed.size() != telResponse.getNumProtoSuccess()) {
110110
LOGGER.debug(

src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,14 @@
44
import static org.mockito.Mockito.*;
55

66
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
7-
import com.databricks.jdbc.model.telemetry.StatementTelemetryDetails;
87
import com.databricks.jdbc.model.telemetry.latency.ChunkDetails;
98
import com.databricks.jdbc.model.telemetry.latency.OperationType;
10-
import com.databricks.jdbc.telemetry.TelemetryHelper;
119
import org.junit.jupiter.api.AfterEach;
1210
import org.junit.jupiter.api.BeforeEach;
1311
import org.junit.jupiter.api.Test;
1412
import org.junit.jupiter.params.ParameterizedTest;
1513
import org.junit.jupiter.params.provider.CsvSource;
1614
import org.junit.jupiter.params.provider.EnumSource;
17-
import org.mockito.MockedStatic;
1815

1916
public class TelemetryCollectorTest {
2017
private final TelemetryCollector handler = TelemetryCollector.getInstance();
@@ -74,12 +71,16 @@ void testRecordOperationLatency_WithCloseOperation() {
7471
String methodName = "closeStatement";
7572
long latency = 100L;
7673

77-
try (MockedStatic<TelemetryHelper> mockedStatic = mockStatic(TelemetryHelper.class)) {
78-
handler.recordOperationLatency(latency, methodName);
79-
80-
mockedStatic.verify(
81-
() -> TelemetryHelper.exportTelemetryLog(any(StatementTelemetryDetails.class)));
74+
// Ensure telemetry is collected for the statement id prior to close
75+
handler.recordGetOperationStatus(TEST_STATEMENT_ID, 1L);
76+
handler.recordOperationLatency(latency, methodName);
77+
// Allow asynchronous export to complete and internal map to be cleared
78+
try {
79+
Thread.sleep(200);
80+
} catch (InterruptedException e) {
81+
Thread.currentThread().interrupt();
8282
}
83+
assertNull(handler.getTelemetryDetails(TEST_STATEMENT_ID));
8384
}
8485

8586
@ParameterizedTest

0 commit comments

Comments
 (0)