Skip to content

Commit 6bfd76b

Browse files
committed
Service name override for DSM checkpoints in Spark context
1 parent 88c9405 commit 6bfd76b

File tree

15 files changed

+363
-138
lines changed

15 files changed

+363
-138
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/build.gradle

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,24 @@ dependencies {
4141
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
4242
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"
4343

44+
testImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "$sparkVersion"
45+
testImplementation group: 'org.apache.kafka', name: "kafka_$scalaVersion", version: '2.4.0'
46+
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.0'
47+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.4.0.RELEASE'
48+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.4.0.RELEASE'
49+
50+
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11')
51+
4452
test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.8"
4553
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.8"
4654
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.8"
55+
test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "2.4.8"
4756

4857
test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.2.4"
4958
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.2.4"
5059
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.2.4"
60+
test_spark24Implementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "3.2.4"
61+
5162
// We do not support netty versions older than this because of a change to the number of parameters to the
5263
// PooledByteBufAllocator constructor. See this PR where the new constructor (the only one we support) was introduced:
5364
// https://github.com/netty/netty/pull/10267
@@ -56,6 +67,7 @@ dependencies {
5667
latestDepTestImplementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: '+'
5768
latestDepTestImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: '+'
5869
latestDepTestImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: '+'
70+
latestDepTestImplementation group: 'org.apache.spark', name:"spark-sql-kafka-0-10_$scalaVersion", version: "+"
5971
}
6072

6173
tasks.named("test").configure {

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public String[] helperClassNames() {
2020
packageName + ".RemoveEldestHashMap",
2121
packageName + ".SparkAggregatedTaskMetrics",
2222
packageName + ".SparkConfAllowList",
23+
packageName + ".SparkConfUtils",
2324
packageName + ".SparkSQLUtils",
2425
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2526
packageName + ".SparkSQLUtils$AccumulatorWithStage",
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import datadog.trace.agent.test.AgentTestRunner
2+
import org.apache.kafka.clients.producer.ProducerRecord
3+
import org.apache.spark.api.java.function.VoidFunction2
4+
import org.apache.spark.sql.Dataset
5+
import org.apache.spark.sql.Row
6+
import org.apache.spark.sql.SparkSession
7+
import org.apache.spark.sql.streaming.Trigger
8+
import org.junit.Rule
9+
import org.springframework.kafka.core.DefaultKafkaProducerFactory
10+
import org.springframework.kafka.test.EmbeddedKafkaBroker
11+
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
12+
import org.springframework.kafka.test.utils.KafkaTestUtils
13+
14+
class SparkStreamingKafkaTest extends AgentTestRunner {
15+
static final SOURCE_TOPIC = "source"
16+
static final SINK_TOPIC = "sink"
17+
18+
@Override
19+
boolean isDataStreamsEnabled() {
20+
return true
21+
}
22+
23+
@Rule
24+
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, false, 1, SOURCE_TOPIC, SINK_TOPIC)
25+
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka
26+
27+
@Override
28+
void configurePreAgent() {
29+
super.configurePreAgent()
30+
injectSysConfig("dd.integration.spark.enabled", "true")
31+
injectSysConfig("dd.integration.kafka.enabled", "true")
32+
}
33+
34+
def "test dsm checkpoints are correctly set"() {
35+
setup:
36+
def appName = "test-app"
37+
def sparkSession = SparkSession.builder()
38+
.config("spark.master", "local[2]")
39+
.config("spark.driver.bindAddress", "localhost")
40+
.appName(appName)
41+
.getOrCreate()
42+
43+
def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
44+
def producer = new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer()
45+
46+
when:
47+
for (int i = 0; i < 100; i++) {
48+
producer.send(new ProducerRecord<>(SOURCE_TOPIC, i, i.toString()))
49+
}
50+
producer.flush()
51+
52+
def df = sparkSession
53+
.readStream()
54+
.format("kafka")
55+
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
56+
.option("startingOffsets", "earliest")
57+
.option("failOnDataLoss", "false")
58+
.option("subscribe", SOURCE_TOPIC)
59+
.load()
60+
61+
def query = df
62+
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
63+
.writeStream()
64+
.format("kafka")
65+
.option("kafka.bootstrap.servers", embeddedKafka.getBrokersAsString())
66+
.option("checkpointLocation", "/tmp/" + System.currentTimeMillis().toString())
67+
.option("topic", SINK_TOPIC)
68+
.trigger(Trigger.Once())
69+
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
70+
@Override
71+
void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
72+
rowDataset.show()
73+
rowDataset.write()
74+
}
75+
})
76+
.start()
77+
78+
query.processAllAvailable()
79+
80+
then:
81+
query.stop()
82+
producer.close()
83+
84+
// check that checkpoints were written with a service name override == "SparkAppName"
85+
assert TEST_DATA_STREAMS_WRITER.payloads.size() > 0
86+
assert TEST_DATA_STREAMS_WRITER.services.size() == 1
87+
assert TEST_DATA_STREAMS_WRITER.services.get(0) == appName
88+
}
89+
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public String[] helperClassNames() {
2020
packageName + ".RemoveEldestHashMap",
2121
packageName + ".SparkAggregatedTaskMetrics",
2222
packageName + ".SparkConfAllowList",
23+
packageName + ".SparkConfUtils",
2324
packageName + ".SparkSQLUtils",
2425
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2526
packageName + ".SparkSQLUtils$AccumulatorWithStage",

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 9 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG;
55
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
66
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
7+
import static datadog.trace.instrumentation.spark.SparkConfUtils.getDatabricksClusterName;
8+
import static datadog.trace.instrumentation.spark.SparkConfUtils.getIsRunningOnDatabricks;
9+
import static datadog.trace.instrumentation.spark.SparkConfUtils.getServiceNameOverride;
710

811
import com.fasterxml.jackson.databind.JsonNode;
912
import com.fasterxml.jackson.databind.ObjectMapper;
10-
import datadog.trace.api.Config;
1113
import datadog.trace.api.DDTags;
1214
import datadog.trace.api.DDTraceId;
1315
import datadog.trace.api.sampling.PrioritySampling;
@@ -110,8 +112,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
110112

111113
private final boolean isRunningOnDatabricks;
112114
private final String databricksClusterName;
113-
private final String databricksServiceName;
114-
private final String sparkServiceName;
115+
private final String serviceNameOverride;
115116

116117
private boolean lastJobFailed = false;
117118
private String lastJobFailedMessage;
@@ -130,10 +131,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
130131
this.appId = appId;
131132
this.sparkVersion = sparkVersion;
132133

133-
isRunningOnDatabricks = sparkConf.contains("spark.databricks.sparkContextId");
134-
databricksClusterName = sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
135-
databricksServiceName = getDatabricksServiceName(sparkConf, databricksClusterName);
136-
sparkServiceName = getSparkServiceName(sparkConf, isRunningOnDatabricks);
134+
isRunningOnDatabricks = getIsRunningOnDatabricks(sparkConf);
135+
databricksClusterName = getDatabricksClusterName(sparkConf);
136+
serviceNameOverride = getServiceNameOverride(sparkConf);
137137

138138
// If JVM exiting with System.exit(code), it bypass the code closing the application span
139139
//
@@ -924,10 +924,8 @@ private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties prope
924924
AgentTracer.SpanBuilder builder =
925925
tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId);
926926

927-
if (databricksServiceName != null) {
928-
builder.withServiceName(databricksServiceName);
929-
} else if (sparkServiceName != null) {
930-
builder.withServiceName(sparkServiceName);
927+
if (serviceNameOverride != null) {
928+
builder.withServiceName(serviceNameOverride);
931929
}
932930

933931
addPropertiesTags(builder, properties);
@@ -1153,45 +1151,6 @@ private static String getBatchIdFromBatchKey(String batchKey) {
11531151
return batchKey.substring(batchKey.lastIndexOf(".") + 1);
11541152
}
11551153

1156-
private static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) {
1157-
if (Config.get().isServiceNameSetByUser()) {
1158-
return null;
1159-
}
1160-
1161-
String serviceName = null;
1162-
String runName = getDatabricksRunName(conf);
1163-
if (runName != null) {
1164-
serviceName = "databricks.job-cluster." + runName;
1165-
} else if (databricksClusterName != null) {
1166-
serviceName = "databricks.all-purpose-cluster." + databricksClusterName;
1167-
}
1168-
1169-
return serviceName;
1170-
}
1171-
1172-
private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) {
1173-
// If config is not set or running on databricks, not changing the service name
1174-
if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) {
1175-
return null;
1176-
}
1177-
1178-
// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
1179-
String serviceName = Config.get().getServiceName();
1180-
if (Config.get().isServiceNameSetByUser()
1181-
&& !"spark".equals(serviceName)
1182-
&& !"hadoop".equals(serviceName)) {
1183-
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
1184-
return null;
1185-
}
1186-
1187-
String sparkAppName = conf.get("spark.app.name", null);
1188-
if (sparkAppName != null) {
1189-
log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName);
1190-
}
1191-
1192-
return sparkAppName;
1193-
}
1194-
11951154
private static void reportKafkaOffsets(
11961155
final String appName, final AgentSpan span, final SourceProgress progress) {
11971156
if (!span.traceConfig().isDataStreamsEnabled()
@@ -1234,34 +1193,4 @@ private static void reportKafkaOffsets(
12341193
}
12351194
}
12361195
}
1237-
1238-
private static String getDatabricksRunName(SparkConf conf) {
1239-
String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null);
1240-
if (allTags == null) {
1241-
return null;
1242-
}
1243-
1244-
try {
1245-
// Using the jackson JSON lib used by spark
1246-
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
1247-
JsonNode jsonNode = objectMapper.readTree(allTags);
1248-
1249-
for (JsonNode node : jsonNode) {
1250-
String key = node.get("key").asText();
1251-
if ("RunName".equals(key)) {
1252-
// Databricks jobs launched by Azure Data Factory have an uuid at the end of the name
1253-
return removeUuidFromEndOfString(node.get("value").asText());
1254-
}
1255-
}
1256-
} catch (Exception ignored) {
1257-
}
1258-
1259-
return null;
1260-
}
1261-
1262-
@SuppressForbidden // called at most once per spark application
1263-
private static String removeUuidFromEndOfString(String input) {
1264-
return input.replaceAll(
1265-
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
1266-
}
12671196
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import datadog.trace.api.Config;
6+
import de.thetaphi.forbiddenapis.SuppressForbidden;
7+
import org.apache.spark.SparkConf;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class SparkConfUtils {
12+
private static final ObjectMapper objectMapper = new ObjectMapper();
13+
private static final Logger log = LoggerFactory.getLogger(SparkConfUtils.class);
14+
15+
public static boolean getIsRunningOnDatabricks(SparkConf sparkConf) {
16+
return sparkConf.contains("spark.databricks.sparkContextId");
17+
}
18+
19+
public static String getDatabricksClusterName(SparkConf sparkConf) {
20+
return sparkConf.get("spark.databricks.clusterUsageTags.clusterName", null);
21+
}
22+
23+
public static String getDatabricksServiceName(SparkConf conf, String databricksClusterName) {
24+
if (Config.get().isServiceNameSetByUser()) {
25+
return null;
26+
}
27+
28+
String serviceName = null;
29+
String runName = getDatabricksRunName(conf);
30+
if (runName != null) {
31+
serviceName = "databricks.job-cluster." + runName;
32+
} else if (databricksClusterName != null) {
33+
serviceName = "databricks.all-purpose-cluster." + databricksClusterName;
34+
}
35+
36+
return serviceName;
37+
}
38+
39+
public static String getSparkServiceName(SparkConf conf, boolean isRunningOnDatabricks) {
40+
// If config is not set or running on databricks, not changing the service name
41+
if (!Config.get().useSparkAppNameAsService() || isRunningOnDatabricks) {
42+
return null;
43+
}
44+
45+
// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
46+
String serviceName = Config.get().getServiceName();
47+
if (Config.get().isServiceNameSetByUser()
48+
&& !"spark".equals(serviceName)
49+
&& !"hadoop".equals(serviceName)) {
50+
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
51+
return null;
52+
}
53+
54+
String sparkAppName = conf.get("spark.app.name", null);
55+
if (sparkAppName != null) {
56+
log.info("Using Spark application name '{}' as the Datadog service name", sparkAppName);
57+
}
58+
59+
return sparkAppName;
60+
}
61+
62+
public static String getServiceNameOverride(SparkConf conf) {
63+
boolean isRunningOnDatabricks = getIsRunningOnDatabricks(conf);
64+
String databricksClusterName = getDatabricksClusterName(conf);
65+
String databricksServiceName = getDatabricksServiceName(conf, databricksClusterName);
66+
String sparkServiceName = getSparkServiceName(conf, isRunningOnDatabricks);
67+
68+
return databricksServiceName != null ? databricksServiceName : sparkServiceName;
69+
}
70+
71+
private static String getDatabricksRunName(SparkConf conf) {
72+
String allTags = conf.get("spark.databricks.clusterUsageTags.clusterAllTags", null);
73+
if (allTags == null) {
74+
return null;
75+
}
76+
77+
try {
78+
// Using the jackson JSON lib used by spark
79+
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
80+
JsonNode jsonNode = objectMapper.readTree(allTags);
81+
82+
for (JsonNode node : jsonNode) {
83+
String key = node.get("key").asText();
84+
if ("RunName".equals(key)) {
85+
// Databricks jobs launched by Azure Data Factory have an uuid at the end of the name
86+
return removeUuidFromEndOfString(node.get("value").asText());
87+
}
88+
}
89+
} catch (Exception ignored) {
90+
}
91+
92+
return null;
93+
}
94+
95+
@SuppressForbidden // called at most once per spark application
96+
private static String removeUuidFromEndOfString(String input) {
97+
return input.replaceAll(
98+
"_[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$", "");
99+
}
100+
}

0 commit comments

Comments
 (0)