Skip to content

Commit 1a79375

Browse files
databricks support changes
1 parent 2e10824 commit 1a79375

File tree

3 files changed

+25
-7
lines changed

3 files changed

+25
-7
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
142142
this.appId = appId;
143143
this.sparkVersion = sparkVersion;
144144

145-
isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
145+
// isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
146+
isRunningOnDatabricks = false;
147+
log.info("Overriding is Running on Databricks. Setting to false");
146148
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
147149
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
148150
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
@@ -167,7 +169,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
167169
}
168170

169171
public void setupOpenLineage(DDTraceId traceId) {
170-
log.debug("Setting up OpenLineage configuration with trace id {}", traceId);
172+
log.info("Setting up OpenLineage configuration with trace id {}", traceId);
171173
if (openLineageSparkListener != null) {
172174
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
173175
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -190,10 +192,10 @@ public void setupOpenLineage(DDTraceId traceId) {
190192
setupOpenLineageCircuitBreaker();
191193
return;
192194
}
193-
log.debug(
195+
log.info(
194196
"There is no OpenLineage Spark Listener in the context. Skipping setting tags. {}",
195197
openLineageSparkListener);
196-
log.debug(
198+
log.info(
197199
"There is no OpenLineage SparkConf in the context. Skipping setting tags. {}",
198200
openLineageSparkConf);
199201
}
@@ -231,6 +233,8 @@ public synchronized void onApplicationStart(SparkListenerApplicationStart applic
231233
OpenlineageParentContext.from(sparkConf)
232234
.map(context -> context.getTraceId())
233235
.orElse(predeterminedTraceIdContext.getTraceId()));
236+
} else {
237+
log.info("Skipping openlineage setup. What a shame...");
234238
}
235239
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
236240
}
@@ -1301,9 +1305,9 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
13011305

13021306
private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
13031307
// Service for OpenLineage in Databricks is not supported yet
1304-
if (isRunningOnDatabricks) {
1305-
return null;
1306-
}
1308+
// if (isRunningOnDatabricks) {
1309+
// return null;
1310+
// }
13071311

13081312
// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
13091313
String serviceName = Config.get().getServiceName();

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
1212
import datadog.trace.bootstrap.InstanceStore;
13+
import java.lang.reflect.InvocationTargetException;
1314
import net.bytebuddy.asm.Advice;
15+
import org.apache.spark.SparkConf;
1416
import org.apache.spark.deploy.SparkSubmitArguments;
1517
import org.apache.spark.scheduler.SparkListenerInterface;
1618
import org.slf4j.Logger;
@@ -127,6 +129,17 @@ public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener)
127129
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
128130
.equals(listener.getClass().getCanonicalName())) {
129131
log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus");
132+
133+
// for some reason on databricks env, the config was not captured
134+
try {
135+
log.info("Getting openlineage conf from the listener");
136+
Object openLineageConf = listener.getClass().getMethod("getConf").invoke(listener);
137+
InstanceStore.of(SparkConf.class)
138+
.put("openLineageSparkConf", (SparkConf) openLineageConf);
139+
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
140+
log.error("Something got wrong with getting openlineage conf: {}", e.getMessage());
141+
}
142+
130143
InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener);
131144
return true;
132145
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenLineageInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public static class OpenLineageSparkListenerAdvice {
6464
public static void exit(@Advice.This Object self, @Advice.FieldValue("conf") SparkConf conf)
6565
throws IllegalAccessException {
6666
Logger log = LoggerFactory.getLogger("OpenLineageSparkListenerAdvice");
67+
log.info("Applying OpenLineageSparkListenerAdvice");
6768
if (!Config.get().isDataJobsOpenLineageEnabled()) {
6869
log.debug(
6970
"OpenLineage - Data Jobs integration disabled. Not manipulating OpenLineageSparkListener");

0 commit comments

Comments
 (0)