Skip to content

Commit 40ebe1b

Browse files
Configure Circuit breaker in OpenLineage instrumentation (#9554)
* configure circuit breaker in OpenLineage instrumentation
1 parent ffc2be9 commit 40ebe1b

File tree

6 files changed

+96
-0
lines changed

6 files changed

+96
-0
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
7676
private final int MAX_ACCUMULATOR_SIZE = 50000;
7777
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
7878
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";
79+
private static final int OL_CIRCUIT_BREAKER_TIMEOUT_IN_SECONDS = 60;
7980

8081
public volatile SparkListenerInterface openLineageSparkListener = null;
8182
public volatile SparkConf openLineageSparkConf = null;
@@ -186,6 +187,7 @@ public void setupOpenLineage(DDTraceId traceId) {
186187
+ ProcessTags.getTagsForSerialization()
187188
+ ";_dd.ol_app_id:"
188189
+ appId);
190+
setupOpenLineageCircuitBreaker();
189191
return;
190192
}
191193
log.debug(
@@ -1324,6 +1326,32 @@ private static String getServiceForOpenLineage(SparkConf conf, boolean isRunning
13241326
return sparkAppName;
13251327
}
13261328

1329+
private void setupOpenLineageCircuitBreaker() {
1330+
if (!Config.get().isDataJobsOpenLineageTimeoutEnabled()) {
1331+
log.debug("Data Jobs OpenLineage timeout is not enabled");
1332+
return;
1333+
}
1334+
if (!classIsLoadable("io.openlineage.client.circuitBreaker.TimeoutCircuitBreaker")) {
1335+
log.debug(
1336+
"OpenLineage version without timeout circuit breaker. Probably OL version < 1.35.0");
1337+
return;
1338+
}
1339+
if (openLineageSparkConf.contains("spark.openlineage.circuitBreaker.type")) {
1340+
log.debug(
1341+
"Other OpenLineage circuit breaker already configured: {}",
1342+
openLineageSparkConf.get("spark.openlineage.circuitBreaker.type"));
1343+
return;
1344+
}
1345+
1346+
openLineageSparkConf.set("spark.openlineage.circuitBreaker.type", "timeout");
1347+
openLineageSparkConf.setIfMissing(
1348+
"spark.openlineage.circuitBreaker.timeoutInSeconds",
1349+
String.valueOf(OL_CIRCUIT_BREAKER_TIMEOUT_IN_SECONDS));
1350+
log.debug(
1351+
"Setting OpenLineage circuit breaker with timeout {} seconds",
1352+
openLineageSparkConf.get("spark.openlineage.circuitBreaker.timeoutInSeconds"));
1353+
}
1354+
13271355
private static void reportKafkaOffsets(
13281356
final String appName, final AgentSpan span, final SourceProgress progress) {
13291357
if (!traceConfig().isDataStreamsEnabled()

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,50 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
568568
.contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization())
569569
}
570570

571+
def "test setupOpenLineage fills circuit breaker config"(
572+
Boolean configEnabled,
573+
String sparkConfCircuitBreakerType,
574+
String expectedCircuitBreakerType
575+
) {
576+
setup:
577+
injectSysConfig("data.jobs.openlineage.timeout.enabled", configEnabled.toString())
578+
def listener = getTestDatadogSparkListener()
579+
listener.openLineageSparkListener = Mock(SparkListenerInterface)
580+
listener.openLineageSparkConf = new SparkConf()
581+
if (sparkConfCircuitBreakerType != null) {
582+
listener.openLineageSparkConf.set("spark.openlineage.circuitBreaker.type", sparkConfCircuitBreakerType)
583+
}
584+
listener.setupOpenLineage(Mock(DDTraceId))
585+
586+
expect:
587+
assert listener
588+
.openLineageSparkConf
589+
.getOption("spark.openlineage.circuitBreaker.type") == Option.apply(expectedCircuitBreakerType)
590+
assert listener
591+
.openLineageSparkConf
592+
.getOption("spark.openlineage.circuitBreaker.timeoutInSeconds") == ((expectedCircuitBreakerType == "timeout") ? Option.apply("60") : Option.apply(null))
593+
594+
where:
595+
configEnabled | sparkConfCircuitBreakerType | expectedCircuitBreakerType
596+
true | null | "timeout"
597+
true | "other" | "other"
598+
false | null | null
599+
}
600+
601+
def "test OpenLineage's circuit breaker timeout can be overriden"() {
602+
setup:
603+
def listener = getTestDatadogSparkListener()
604+
listener.openLineageSparkListener = Mock(SparkListenerInterface)
605+
listener.openLineageSparkConf = new SparkConf()
606+
listener.openLineageSparkConf.set("spark.openlineage.circuitBreaker.timeoutInSeconds", "120")
607+
listener.setupOpenLineage(Mock(DDTraceId))
608+
609+
expect:
610+
assert listener
611+
.openLineageSparkConf
612+
.getOption("spark.openlineage.circuitBreaker.timeoutInSeconds") == Option.apply("120")
613+
}
614+
571615
protected validateRelativeError(double value, double expected, double relativeAccuracy) {
572616
double relativeError = Math.abs(value - expected) / expected
573617
assert relativeError < relativeAccuracy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.openlineage.client.circuitBreaker;
2+
3+
/**
4+
* Existence of the class with this name on a classpath is required to test OpenLineage circuit
5+
* breaker setup
6+
*/
7+
public class TimeoutCircuitBreaker {}

dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ public final class ConfigDefaults {
228228

229229
static final boolean DEFAULT_DATA_JOBS_ENABLED = false;
230230
static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED = false;
231+
static final boolean DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED = true;
231232

232233
static final boolean DEFAULT_DATA_STREAMS_ENABLED = false;
233234
static final int DEFAULT_DATA_STREAMS_BUCKET_DURATION = 10; // seconds

dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public final class GeneralConfig {
7777
public static final String DATA_JOBS_ENABLED = "data.jobs.enabled";
7878
public static final String DATA_JOBS_COMMAND_PATTERN = "data.jobs.command.pattern";
7979
public static final String DATA_JOBS_OPENLINEAGE_ENABLED = "data.jobs.openlineage.enabled";
80+
public static final String DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED =
81+
"data.jobs.openlineage.timeout.enabled";
8082

8183
public static final String DATA_STREAMS_ENABLED = "data.streams.enabled";
8284
public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS =

internal-api/src/main/java/datadog/trace/api/Config.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import static datadog.trace.api.ConfigDefaults.DEFAULT_CWS_TLS_REFRESH;
4848
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_ENABLED;
4949
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED;
50+
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED;
5051
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_STREAMS_BUCKET_DURATION;
5152
import static datadog.trace.api.ConfigDefaults.DEFAULT_DATA_STREAMS_ENABLED;
5253
import static datadog.trace.api.ConfigDefaults.DEFAULT_DB_CLIENT_HOST_SPLIT_BY_HOST;
@@ -331,6 +332,7 @@
331332
import static datadog.trace.api.config.GeneralConfig.AZURE_APP_SERVICES;
332333
import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_ENABLED;
333334
import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_OPENLINEAGE_ENABLED;
335+
import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED;
334336
import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_BUCKET_DURATION_SECONDS;
335337
import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_ENABLED;
336338
import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_ARGS;
@@ -1172,6 +1174,7 @@ public static String getHostName() {
11721174

11731175
private final boolean dataJobsEnabled;
11741176
private final boolean dataJobsOpenLineageEnabled;
1177+
private final boolean dataJobsOpenLineageTimeoutEnabled;
11751178

11761179
private final boolean dataStreamsEnabled;
11771180
private final float dataStreamsBucketDurationSeconds;
@@ -2595,6 +2598,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
25952598
dataJobsOpenLineageEnabled =
25962599
configProvider.getBoolean(
25972600
DATA_JOBS_OPENLINEAGE_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_ENABLED);
2601+
dataJobsOpenLineageTimeoutEnabled =
2602+
configProvider.getBoolean(
2603+
DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED, DEFAULT_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED);
25982604

25992605
dataStreamsEnabled =
26002606
configProvider.getBoolean(DATA_STREAMS_ENABLED, DEFAULT_DATA_STREAMS_ENABLED);
@@ -4499,6 +4505,10 @@ public boolean isDataJobsOpenLineageEnabled() {
44994505
return dataJobsOpenLineageEnabled;
45004506
}
45014507

4508+
public boolean isDataJobsOpenLineageTimeoutEnabled() {
4509+
return dataJobsOpenLineageTimeoutEnabled;
4510+
}
4511+
45024512
public boolean isApmTracingEnabled() {
45034513
return apmTracingEnabled;
45044514
}
@@ -5827,6 +5837,10 @@ public String toString() {
58275837
+ appSecRaspEnabled
58285838
+ ", dataJobsEnabled="
58295839
+ dataJobsEnabled
5840+
+ ", dataJobsOpenLineageEnabled="
5841+
+ dataJobsOpenLineageEnabled
5842+
+ ", dataJobsOpenLineageTimeoutEnabled="
5843+
+ dataJobsOpenLineageTimeoutEnabled
58305844
+ ", apmTracingEnabled="
58315845
+ apmTracingEnabled
58325846
+ ", jdkSocketEnabled="

0 commit comments

Comments
 (0)