Skip to content

Commit a700298

Browse files
committed
compile sparklistener on spark 2.4 and use reflection to parse logical plan from spark 3
1 parent 9292a6c commit a700298

File tree

4 files changed

+148
-33
lines changed

4 files changed

+148
-33
lines changed

dd-java-agent/instrumentation/spark/build.gradle

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ configurations.all {
77
resolutionStrategy.deactivateDependencyLocking()
88
}
99
dependencies {
10-
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.5.1'
11-
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.5.1'
10+
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
11+
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
1212

1313
testFixturesImplementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.2'
1414
testFixturesImplementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.14.0'
@@ -17,7 +17,7 @@ dependencies {
1717
testFixturesApi project(':dd-java-agent:instrumentation:trace-annotation')
1818
testFixturesApi project(':dd-java-agent:testing')
1919

20-
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.5.1'
21-
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.5.1'
22-
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '3.5.1'
20+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
21+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
22+
testFixturesCompileOnly group: 'org.apache.spark', name: 'spark-yarn_2.12', version: '2.4.0'
2323
}

dd-java-agent/instrumentation/spark/spark_2.12/build.gradle

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'java-test-fixtures'
33
}
44

5-
def sparkVersion = '3.5.1'
5+
def sparkVersion = '2.4.0'
66
def scalaVersion = '2.12'
77

88
muzzle {
@@ -41,13 +41,13 @@ dependencies {
4141
testImplementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "$sparkVersion"
4242
testImplementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "$sparkVersion"
4343

44-
test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.5.1"
45-
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.5.1"
46-
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.5.1"
44+
test_spark24Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "2.4.8"
45+
test_spark24Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "2.4.8"
46+
test_spark24Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "2.4.8"
4747

48-
test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.5.1"
49-
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.5.1"
50-
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.5.1"
48+
test_spark32Implementation group: 'org.apache.spark', name: "spark-core_$scalaVersion", version: "3.2.4"
49+
test_spark32Implementation group: 'org.apache.spark', name: "spark-sql_$scalaVersion", version: "3.2.4"
50+
test_spark32Implementation group: 'org.apache.spark', name: "spark-yarn_$scalaVersion", version: "3.2.4"
5151
// We do not support netty versions older than this because of a change to the number of parameters to the
5252
// PooledByteBufAllocator constructor. See this PR where the new constructor (the only one we support) was introduced:
5353
// https://github.com/netty/netty/pull/10267

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,10 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
817817
for (int i = 0; i < datasets.size(); i++) {
818818
SparkSQLUtils.LineageDataset dataset = datasets.get(i);
819819

820+
if (dataset == null) {
821+
continue;
822+
}
823+
820824
span.setTag("dataset." + i + ".name", dataset.name);
821825
span.setTag("dataset." + i + ".schema", dataset.schema);
822826
span.setTag("dataset." + i + ".stats", dataset.stats);

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 132 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import com.fasterxml.jackson.databind.DeserializationFeature;
55
import com.fasterxml.jackson.databind.ObjectMapper;
66
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
7+
import de.thetaphi.forbiddenapis.SuppressForbidden;
78
import java.io.ByteArrayOutputStream;
89
import java.io.IOException;
10+
import java.lang.invoke.MethodHandle;
11+
import java.lang.invoke.MethodHandles;
12+
import java.lang.invoke.MethodType;
913
import java.nio.charset.StandardCharsets;
1014
import java.util.ArrayList;
1115
import java.util.Collection;
@@ -14,11 +18,13 @@
1418
import java.util.Map;
1519
import java.util.Set;
1620
import org.apache.spark.scheduler.AccumulableInfo;
21+
import org.apache.spark.sql.catalyst.analysis.NamedRelation;
1722
import org.apache.spark.sql.catalyst.plans.logical.AppendData;
1823
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
1924
import org.apache.spark.sql.execution.SparkPlanInfo;
2025
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
2126
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
27+
import org.apache.spark.sql.types.StructType;
2228
import org.slf4j.Logger;
2329
import org.slf4j.LoggerFactory;
2430
import scala.PartialFunction;
@@ -28,6 +34,54 @@
2834
public class SparkSQLUtils {
2935
private static final Logger log = LoggerFactory.getLogger(SparkSQLUtils.class);
3036

37+
private static final Class<?> dataSourceV2RelationClass;
38+
private static final MethodHandle schemaMethod;
39+
private static final MethodHandle nameMethod;
40+
private static final MethodHandle propertiesMethod;
41+
42+
private static final Class<?> tableClass;
43+
44+
@SuppressForbidden // Using reflection to avoid splitting the instrumentation once more
45+
private static Class<?> findDataSourceV2Relation() throws ClassNotFoundException {
46+
return Class.forName("org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation");
47+
}
48+
49+
@SuppressForbidden // Using reflection to avoid splitting the instrumentation once more
50+
private static Class<?> findTable() throws ClassNotFoundException {
51+
return Class.forName("org.apache.spark.sql.connector.catalog.Table");
52+
}
53+
54+
static {
55+
Class<?> relationClassFound = null;
56+
Class<?> tableClassFound = null;
57+
58+
MethodHandle nameMethodFound = null;
59+
MethodHandle schemaMethodFound = null;
60+
MethodHandle propertiesMethodFound = null;
61+
62+
try {
63+
MethodHandles.Lookup lookup = MethodHandles.lookup();
64+
65+
relationClassFound = findDataSourceV2Relation();
66+
tableClassFound = findTable();
67+
68+
schemaMethodFound =
69+
lookup.findVirtual(tableClassFound, "schema", MethodType.methodType(StructType.class));
70+
nameMethodFound =
71+
lookup.findVirtual(tableClassFound, "name", MethodType.methodType(String.class));
72+
propertiesMethodFound =
73+
lookup.findVirtual(tableClassFound, "properties", MethodType.methodType(Map.class));
74+
75+
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException ignored) {
76+
}
77+
78+
dataSourceV2RelationClass = relationClassFound;
79+
tableClass = tableClassFound;
80+
schemaMethod = schemaMethodFound;
81+
nameMethod = nameMethodFound;
82+
propertiesMethod = propertiesMethodFound;
83+
}
84+
3185
public static void addSQLPlanToStageSpan(
3286
AgentSpan span,
3387
SparkPlanInfo plan,
@@ -264,31 +318,88 @@ public boolean isDefinedAt(LogicalPlan x) {
264318

265319
@Override
266320
public LineageDataset apply(LogicalPlan x) {
321+
if (dataSourceV2RelationClass != null && dataSourceV2RelationClass.isInstance(x)) {
322+
log.info(
323+
"class {} is instance of {}",
324+
x.getClass().getName(),
325+
dataSourceV2RelationClass.getName());
326+
return parseDataSourceV2Relation(x, "input");
327+
} else if (x instanceof AppendData) {
328+
log.info(
329+
"class {} is instance of {}", x.getClass().getName(), AppendData.class.getName());
330+
AppendData appendData = (AppendData) x;
331+
NamedRelation table = appendData.table();
332+
if (dataSourceV2RelationClass != null && dataSourceV2RelationClass.isInstance(table)) {
333+
log.info(
334+
"class {} is instance of {}",
335+
table.getClass().getName(),
336+
dataSourceV2RelationClass.getName());
337+
return parseDataSourceV2Relation(table, "output");
338+
}
339+
}
340+
return null;
341+
}
342+
343+
private LineageDataset parseDataSourceV2Relation(Object logicalPlan, String datasetType) {
267344
try {
268-
if (x instanceof DataSourceV2Relation) {
269-
DataSourceV2Relation relation = (DataSourceV2Relation) x;
270-
return new LineageDataset(
271-
relation.table().name(),
272-
relation.schema().json(),
273-
"",
274-
relation.table().properties().toString(),
275-
"input");
276-
} else if (x instanceof AppendData) {
277-
AppendData appendData = (AppendData) x;
278-
DataSourceV2Relation relation = (DataSourceV2Relation) appendData.table();
279-
return new LineageDataset(
280-
relation.table().name(),
281-
relation.schema().json(),
282-
"",
283-
relation.table().properties().toString(),
284-
"output");
345+
String tableName = null;
346+
String tableSchema = null;
347+
String properties = null;
348+
349+
if (logicalPlan.getClass().getMethod("table") == null) {
350+
log.info(
351+
"method table does not exist for {}, cannot parse current LogicalPlan",
352+
logicalPlan.getClass().getName());
353+
return null;
354+
}
355+
356+
Object table = logicalPlan.getClass().getMethod("table").invoke(logicalPlan);
357+
if (table == null) {
358+
log.info(
359+
"table is null for {}, cannot parse current LogicalPlan",
360+
logicalPlan.getClass().getName());
361+
return null;
362+
}
363+
364+
if (tableClass == null || !tableClass.isInstance(table)) {
365+
log.info("table is not instance of a Table class, cannot parse current LogicalPlan");
366+
return null;
367+
}
368+
369+
if (table.getClass().getMethod("name") != null) {
370+
tableName = (String) nameMethod.invoke(table);
371+
log.info(
372+
"method name exists for {} with table name {}",
373+
table.getClass().getName(),
374+
tableName);
375+
} else {
376+
log.info("method name does not exist for {}", table.getClass().getName());
285377
}
286-
} catch (Exception e) {
287-
log.debug("Error while converting logical plan to dataset", e);
378+
379+
if (table.getClass().getMethod("schema") != null) {
380+
StructType schema = (StructType) schemaMethod.invoke(table);
381+
log.info(
382+
"method schema exists for {} with schema {}", table.getClass().getName(), schema);
383+
tableSchema = schema.json();
384+
} else {
385+
log.info("method schema does not exist for {}", table.getClass().getName());
386+
}
387+
388+
if (table.getClass().getMethod("properties") != null) {
389+
Map<String, String> propertyMap =
390+
(Map<String, String>) propertiesMethod.invoke(table);
391+
properties = propertyMap.toString();
392+
393+
log.info("method properties found with content of {}", properties);
394+
} else {
395+
log.info("method properties does not exist for {}", table.getClass().getName());
396+
}
397+
398+
return new LineageDataset(tableName, tableSchema, "", properties, datasetType);
399+
} catch (Throwable ignored) {
400+
log.info("Error while converting logical plan to dataset", ignored);
288401
return null;
289402
}
290-
291-
return null;
292403
}
293404
};
294405
}

0 commit comments

Comments
 (0)