Skip to content

Commit 7789009

Browse files
authored
Add Circuit breaker implementation for Telemetry Push (#925)
## Description <!-- Provide a brief summary of the changes made and the issue they aim to address.--> Added CircuitBreaker implementation using resilience4j library. Started with hard coded values for CircuitBreaker config. ## Testing <!-- Describe how the changes have been tested--> Added thorough unit tests for states: open, closed, closed -> open, closed -> open -> closed ## Additional Notes to the Reviewer <!-- Share any additional context or insights that may help the reviewer understand the changes better. This could include challenges faced, limitations, or compromises made during the development process. Also, mention any areas of the code that you would like the reviewer to focus on specifically. --> --------- Signed-off-by: Gopal Lal <[email protected]>
1 parent f4cc384 commit 7789009

14 files changed

+516
-72
lines changed

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- Added support for providing custom HTTP options: `HttpMaxConnectionsPerRoute` and `HttpConnectionRequestTimeout`.
77
- Add V2 of chunk download using async http client with corresponding implementations of AbstractRemoteChunkProvider and
88
AbstractArrowResultChunk
9+
- Added CircuitBreaker support to handle transient failures in the Telemetry.
910

1011
### Updated
1112

pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
<async-httpclient.version>5.3.1</async-httpclient.version>
7070
<netty.version>4.2.0.Final</netty.version>
7171
<grpc.version>1.71.0</grpc.version>
72+
<resilience4j.version>2.2.0</resilience4j.version>
7273
</properties>
7374
<dependencyManagement>
7475
<!-- Force safe version of commons-lang3 https://nvd.nist.gov/vuln/detail/CVE-2025-48924 -->
@@ -269,6 +270,16 @@
269270
<artifactId>httpcore5</artifactId>
270271
<version>${async-httpclient.version}</version>
271272
</dependency>
273+
<dependency>
274+
<groupId>io.github.resilience4j</groupId>
275+
<artifactId>resilience4j-circuitbreaker</artifactId>
276+
<version>${resilience4j.version}</version>
277+
</dependency>
278+
<dependency>
279+
<groupId>io.github.resilience4j</groupId>
280+
<artifactId>resilience4j-core</artifactId>
281+
<version>${resilience4j.version}</version>
282+
</dependency>
272283
</dependencies>
273284

274285
<build>

src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,6 +926,11 @@ public int getChunkReadyTimeoutSeconds() {
926926
return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.CHUNK_READY_TIMEOUT_SECONDS));
927927
}
928928

929+
@Override
930+
public boolean isTelemetryCircuitBreakerEnabled() {
931+
return getParameter(DatabricksJdbcUrlParams.TELEMETRY_CIRCUIT_BREAKER_ENABLED).equals("1");
932+
}
933+
929934
@Override
930935
public int getHttpMaxConnectionsPerRoute() {
931936
int maxConnectionsPerRoute = DEFAULT_MAX_HTTP_CONNECTIONS_PER_ROUTE;

src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,9 @@ public interface IDatabricksConnectionContext {
342342
/** Returns the flush interval in milliseconds for telemetry */
343343
int getTelemetryFlushIntervalInMilliseconds();
344344

345+
/** Returns whether circuit breaker is enabled for telemetry */
346+
boolean isTelemetryCircuitBreakerEnabled();
347+
345348
/** Returns the maximum number of HTTP connections per route */
346349
int getHttpMaxConnectionsPerRoute();
347350

src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ public enum DatabricksJdbcUrlParams {
146146
"MaxVolumeOperationConcurrentPresignedRequests",
147147
"Maximum number of concurrent presigned requests",
148148
"50"),
149+
TELEMETRY_CIRCUIT_BREAKER_ENABLED(
150+
"TelemetryCircuitBreakerEnabled", "Enable circuit breaker for telemetry", "0"),
149151
HTTP_MAX_CONNECTIONS_PER_ROUTE(
150152
"HttpMaxConnectionsPerRoute", "Maximum connections per route for HTTP client", "1000"),
151153
HTTP_CONNECTION_REQUEST_TIMEOUT(
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.databricks.jdbc.telemetry;
2+
3+
import com.databricks.jdbc.exception.DatabricksHttpException;
4+
import com.databricks.jdbc.exception.DatabricksParsingException;
5+
import com.databricks.jdbc.log.JdbcLogger;
6+
import com.databricks.jdbc.log.JdbcLoggerFactory;
7+
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
8+
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
9+
import java.net.ConnectException;
10+
import java.net.NoRouteToHostException;
11+
import java.net.SocketTimeoutException;
12+
import java.net.UnknownHostException;
13+
import java.time.Duration;
14+
import java.util.Map;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.RejectedExecutionException;
17+
import org.apache.arrow.util.VisibleForTesting;
18+
import org.apache.http.client.HttpResponseException;
19+
20+
/**
21+
* CircuitBreakerManager is a singleton that manages circuit breakers for different hosts. It
22+
* initializes circuit breakers with a predefined configuration and provides methods to retrieve or
23+
* reset them.
24+
*/
25+
public class CircuitBreakerManager {
26+
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(CircuitBreakerManager.class);
27+
28+
// Singleton instance of CircuitBreakerManager
29+
private static final CircuitBreakerManager INSTANCE = new CircuitBreakerManager();
30+
31+
// Method to get the singleton instance
32+
public static CircuitBreakerManager getInstance() {
33+
return INSTANCE;
34+
}
35+
36+
private final Map<String, CircuitBreaker> breakerPerHost = new ConcurrentHashMap<>();
37+
private final CircuitBreakerConfig config;
38+
39+
// Private constructor to prevent instantiation
40+
private CircuitBreakerManager() {
41+
// Initialize config hard coded for now
42+
config =
43+
CircuitBreakerConfig.custom()
44+
.failureRateThreshold(50) // Opens if 50%+ fail
45+
.minimumNumberOfCalls(20) // Minimum sample size
46+
.slidingWindowSize(30) // Keep recent 30 calls in window
47+
.waitDurationInOpenState(Duration.ofSeconds(30)) // Cool-down before retrying
48+
.permittedNumberOfCallsInHalfOpenState(3) // Retry with 3 test calls
49+
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
50+
// Exceptions that should be treated as failures
51+
.recordExceptions(
52+
// Server unavailability errors
53+
ConnectException.class,
54+
SocketTimeoutException.class,
55+
NoRouteToHostException.class,
56+
UnknownHostException.class,
57+
// Resource exhausted errors
58+
RejectedExecutionException.class,
59+
// Memory errors also can lean to open state
60+
OutOfMemoryError.class,
61+
// HTTP errors that indicate server issues
62+
HttpResponseException.class,
63+
// Databricks specific exceptions
64+
DatabricksHttpException.class)
65+
.ignoreExceptions(
66+
// Exceptions that can be ignored, the logic is that due to these exception,
67+
// the execution will not even reach the server, so no point in opening the
68+
// circuit breaker
69+
DatabricksParsingException.class,
70+
IllegalArgumentException.class,
71+
NullPointerException.class)
72+
.build();
73+
LOGGER.debug("CircuitBreakerManager initialized");
74+
}
75+
76+
/**
77+
* Retrieves the CircuitBreaker for the specified host. If it does not exist, it creates a new one
78+
* with the default configuration.
79+
*
80+
* @param host The host for which to retrieve the CircuitBreaker.
81+
* @return The CircuitBreaker instance for the specified host.
82+
*/
83+
public CircuitBreaker getCircuitBreaker(String host) {
84+
return breakerPerHost.computeIfAbsent(
85+
host,
86+
h -> {
87+
CircuitBreaker breaker = CircuitBreaker.of("telemetry-client-" + h, config);
88+
breaker
89+
.getEventPublisher()
90+
.onStateTransition(
91+
event -> {
92+
LOGGER.debug(
93+
"CircuitBreaker for host [{}] transitioned from {} to {}",
94+
h,
95+
event.getStateTransition().getFromState(),
96+
event.getStateTransition().getToState());
97+
});
98+
return breaker;
99+
});
100+
}
101+
102+
@VisibleForTesting
103+
void resetCircuitBreaker(String host) {
104+
breakerPerHost.remove(host);
105+
}
106+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.databricks.jdbc.telemetry;
2+
3+
import com.databricks.jdbc.log.JdbcLogger;
4+
import com.databricks.jdbc.log.JdbcLoggerFactory;
5+
import com.databricks.jdbc.model.telemetry.TelemetryRequest;
6+
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
7+
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
8+
import org.apache.arrow.util.VisibleForTesting;
9+
10+
/**
11+
* TelemetryClient wrapper that implements circuit breaker pattern using Resilience4j. This wrapper
12+
* handles server unavailability and resource exhausted errors by temporarily stopping telemetry
13+
* requests when the service is experiencing issues.
14+
*/
15+
public class CircuitBreakerTelemetryPushClient implements ITelemetryPushClient {
16+
17+
private static final JdbcLogger LOGGER =
18+
JdbcLoggerFactory.getLogger(CircuitBreakerTelemetryPushClient.class);
19+
private static final String TEST_CLIENT_NAME = "telemetry-client-test";
20+
21+
private final ITelemetryPushClient delegate;
22+
private final CircuitBreaker circuitBreaker;
23+
private final String host;
24+
25+
CircuitBreakerTelemetryPushClient(ITelemetryPushClient delegate, String host) {
26+
this.delegate = delegate;
27+
this.host = host;
28+
this.circuitBreaker = CircuitBreakerManager.getInstance().getCircuitBreaker(host);
29+
}
30+
31+
@VisibleForTesting
32+
CircuitBreakerTelemetryPushClient(
33+
ITelemetryPushClient delegate, String host, CircuitBreakerConfig config) {
34+
this.delegate = delegate;
35+
this.host = host;
36+
this.circuitBreaker = CircuitBreaker.of(TEST_CLIENT_NAME, config);
37+
}
38+
39+
@Override
40+
public void pushEvent(TelemetryRequest request) {
41+
try {
42+
circuitBreaker.executeCallable(
43+
() -> {
44+
delegate.pushEvent(request);
45+
return null;
46+
});
47+
} catch (Exception e) {
48+
LOGGER.debug("Failed to export telemetry for host [{}]: {}", host, e.getMessage());
49+
50+
if (circuitBreaker.getState() == CircuitBreaker.State.OPEN) {
51+
LOGGER.debug("CircuitBreaker for host [{}] is OPEN - dropping telemetry", host);
52+
}
53+
}
54+
}
55+
56+
/**
57+
* Get the current state of the circuit breaker.
58+
*
59+
* @return The current circuit breaker state
60+
*/
61+
@VisibleForTesting
62+
CircuitBreaker.State getCircuitBreakerState() {
63+
return circuitBreaker.getState();
64+
}
65+
66+
/**
67+
* Get metrics about the circuit breaker.
68+
*
69+
* @return Circuit breaker metrics
70+
*/
71+
@VisibleForTesting
72+
CircuitBreaker.Metrics getCircuitBreakerMetrics() {
73+
return circuitBreaker.getMetrics();
74+
}
75+
}

src/main/java/com/databricks/jdbc/telemetry/ITelemetryClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,22 @@
22

33
import com.databricks.jdbc.model.telemetry.TelemetryFrontendLog;
44

5+
/**
6+
* Interface for telemetry clients that handle the export of telemetry events. The implementation
7+
* further handles batching and flushing of telemetry events to the backend service.
8+
*/
59
public interface ITelemetryClient {
10+
11+
/**
12+
* Exports a telemetry event to the backend service.
13+
*
14+
* @param event The telemetry event to be exported.
15+
*/
616
void exportEvent(TelemetryFrontendLog event);
717

18+
/**
19+
* Closes the telemetry client, releasing any resources it holds. This method should be called on
20+
* closure of connection or JVM shutdown.
21+
*/
822
void close();
923
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.databricks.jdbc.telemetry;
2+
3+
import com.databricks.jdbc.model.telemetry.TelemetryRequest;
4+
5+
/**
6+
* Interface for pushing telemetry events. Implementations should handle the actual transmission of
7+
* telemetry data. This actually pushes the data to the telemetry service. The circuit breaker is
8+
* engaged here to prevent overwhelming the service with requests during outages or high error
9+
* rates.
10+
*/
11+
interface ITelemetryPushClient {
12+
13+
/**
14+
* Pushes a telemetry request to the telemetry service.
15+
*
16+
* @param request The telemetry request to be sent.
17+
* @throws Exception If there is an error while pushing the event.
18+
*/
19+
void pushEvent(TelemetryRequest request) throws Exception;
20+
}

src/main/java/com/databricks/jdbc/telemetry/TelemetryClient.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ public class TelemetryClient implements ITelemetryClient {
1919
private final IDatabricksConnectionContext context;
2020
private final DatabricksConfig databricksConfig;
2121
private final int eventsBatchSize;
22-
private final boolean isAuthEnabled;
2322
private final ExecutorService executorService;
23+
private final ITelemetryPushClient telemetryPushClient;
2424
private final ScheduledExecutorService scheduledExecutorService;
2525
private List<TelemetryFrontendLog> eventsBatch;
2626
private volatile long lastFlushedTime;
@@ -46,29 +46,33 @@ public TelemetryClient(
4646
DatabricksConfig config) {
4747
this.eventsBatch = new LinkedList<>();
4848
this.eventsBatchSize = connectionContext.getTelemetryBatchSize();
49-
this.isAuthEnabled = true;
5049
this.context = connectionContext;
5150
this.databricksConfig = config;
5251
this.executorService = executorService;
5352
this.scheduledExecutorService =
5453
Executors.newSingleThreadScheduledExecutor(createSchedulerThreadFactory());
5554
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
5655
this.lastFlushedTime = System.currentTimeMillis();
56+
this.telemetryPushClient =
57+
TelemetryClientFactory.getTelemetryPushClient(
58+
true /* isAuthEnabled */, context, databricksConfig);
5759
schedulePeriodicFlush();
5860
}
5961

6062
public TelemetryClient(
6163
IDatabricksConnectionContext connectionContext, ExecutorService executorService) {
6264
this.eventsBatch = new LinkedList<>();
6365
this.eventsBatchSize = connectionContext.getTelemetryBatchSize();
64-
this.isAuthEnabled = false;
6566
this.context = connectionContext;
6667
this.databricksConfig = null;
6768
this.executorService = executorService;
6869
this.scheduledExecutorService =
6970
Executors.newSingleThreadScheduledExecutor(createSchedulerThreadFactory());
7071
this.flushIntervalMillis = context.getTelemetryFlushIntervalInMilliseconds();
7172
this.lastFlushedTime = System.currentTimeMillis();
73+
this.telemetryPushClient =
74+
TelemetryClientFactory.getTelemetryPushClient(
75+
false /* isAuthEnabled */, context, null /* databricksConfig */);
7276
schedulePeriodicFlush();
7377
}
7478

@@ -118,8 +122,7 @@ private void flush(boolean forceFlush) {
118122
synchronized (this) {
119123
if (!forceFlush ? isBatchFull() : !eventsBatch.isEmpty()) {
120124
List<TelemetryFrontendLog> logsToBeFlushed = eventsBatch;
121-
executorService.submit(
122-
new TelemetryPushTask(logsToBeFlushed, isAuthEnabled, context, databricksConfig));
125+
executorService.submit(new TelemetryPushTask(logsToBeFlushed, telemetryPushClient));
123126
eventsBatch = new LinkedList<>();
124127
}
125128
lastFlushedTime = System.currentTimeMillis();

0 commit comments

Comments
 (0)