Skip to content

Commit 0433b59

Browse files
committed
parse ReplaceData node from LogicalPlan to support dataset referenced in sql update and delete
1 parent 4650049 commit 0433b59

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,10 @@ private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sql
781781
List<SparkSQLUtils.LineageDataset> datasets =
782782
JavaConverters.seqAsJavaList(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset));
783783
if (!datasets.isEmpty()) {
784+
log.info("Adding {} datasets to query execution id {}", datasets.size(), sqlExecutionId);
784785
lineageDatasets.put(sqlExecutionId, datasets);
786+
} else {
787+
log.info("No datasets found for query execution id {}", sqlExecutionId);
785788
}
786789

787790
// if (relations.isEmpty()) {
@@ -851,6 +854,8 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
851854
}
852855

853856
span.finish(sqlEnd.time() * 1000);
857+
} else {
858+
log.info("End: Span not found for query execution id {}", sqlEnd.executionId());
854859
}
855860
}
856861

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class SparkSQLUtils {
3535
private static final Logger log = LoggerFactory.getLogger(SparkSQLUtils.class);
3636

3737
private static final Class<?> dataSourceV2RelationClass;
38+
private static final Class<?> replaceDataClass;
3839
private static final MethodHandle schemaMethod;
3940
private static final MethodHandle nameMethod;
4041
private static final MethodHandle propertiesMethod;
@@ -51,9 +52,15 @@ private static Class<?> findTable() throws ClassNotFoundException {
5152
return Class.forName("org.apache.spark.sql.connector.catalog.Table");
5253
}
5354

55+
@SuppressForbidden // Using reflection to avoid splitting the instrumentation once more
56+
private static Class<?> findReplaceData() throws ClassNotFoundException {
57+
return Class.forName("org.apache.spark.sql.catalyst.plans.logical.ReplaceData");
58+
}
59+
5460
static {
5561
Class<?> relationClassFound = null;
5662
Class<?> tableClassFound = null;
63+
Class<?> replaceDataClassFound = null;
5764

5865
MethodHandle nameMethodFound = null;
5966
MethodHandle schemaMethodFound = null;
@@ -64,6 +71,7 @@ private static Class<?> findTable() throws ClassNotFoundException {
6471

6572
relationClassFound = findDataSourceV2Relation();
6673
tableClassFound = findTable();
74+
replaceDataClassFound = findReplaceData();
6775

6876
schemaMethodFound =
6977
lookup.findVirtual(tableClassFound, "schema", MethodType.methodType(StructType.class));
@@ -76,6 +84,7 @@ private static Class<?> findTable() throws ClassNotFoundException {
7684
}
7785

7886
dataSourceV2RelationClass = relationClassFound;
87+
replaceDataClass = replaceDataClassFound;
7988
tableClass = tableClassFound;
8089
schemaMethod = schemaMethodFound;
8190
nameMethod = nameMethodFound;
@@ -313,7 +322,8 @@ public DataSourceV2Relation apply(LogicalPlan x) {
313322
public boolean isDefinedAt(LogicalPlan x) {
314323
return x instanceof DataSourceV2Relation
315324
|| (x instanceof AppendData
316-
&& ((AppendData) x).table() instanceof DataSourceV2Relation);
325+
&& ((AppendData) x).table() instanceof DataSourceV2Relation)
326+
|| (replaceDataClass != null && replaceDataClass.isInstance(x));
317327
}
318328

319329
@Override
@@ -336,6 +346,28 @@ public LineageDataset apply(LogicalPlan x) {
336346
dataSourceV2RelationClass.getName());
337347
return parseDataSourceV2Relation(table, "output");
338348
}
349+
} else if (replaceDataClass != null && replaceDataClass.isInstance(x)) {
350+
log.info(
351+
"class {} is instance of {}", x.getClass().getName(), replaceDataClass.getName());
352+
353+
try {
354+
if (x.getClass().getMethod("table") != null) {
355+
Object table = x.getClass().getMethod("table").invoke(x);
356+
if (table != null
357+
&& dataSourceV2RelationClass != null
358+
&& dataSourceV2RelationClass.isInstance(table)) {
359+
return parseDataSourceV2Relation(table, "output");
360+
} else {
361+
log.info(
362+
"table is null or not instance of {}, cannot parse current LogicalPlan",
363+
dataSourceV2RelationClass.getName());
364+
}
365+
} else {
366+
log.info("method table does not exist for {}", x.getClass().getName());
367+
}
368+
} catch (Throwable ignored) {
369+
log.info("Error while converting logical plan to dataset", ignored);
370+
}
339371
}
340372
return null;
341373
}

0 commit comments

Comments
 (0)