Skip to content

Commit 593e004

Browse files
committed
add hive table parsing in spark listener for insert and select
1 parent 0433b59 commit 593e004

File tree

2 files changed

+61
-19
lines changed

2 files changed

+61
-19
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -765,19 +765,6 @@ private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sql
765765
log.info("Logical plan for query execution id {}: {}", sqlExecutionId, logicalPlan);
766766

767767
if (logicalPlan != null) {
768-
// Collection<DataSourceV2Relation> relations =
769-
// JavaConverters.asJavaCollection(logicalPlan.collect(SparkSQLUtils.pf));
770-
// List<SparkSQLUtils.LineageDataset> datasets = new ArrayList<>();
771-
//
772-
// for (DataSourceV2Relation relation : relations) {
773-
// String name = relation.table().name();
774-
// String schema = relation.schema().json();
775-
// String stats = relation.stats().toString();
776-
// String properties = relation.table().properties().toString();
777-
//
778-
// datasets.add(new SparkSQLUtils.LineageDataset(name, schema, stats, properties));
779-
// }
780-
781768
List<SparkSQLUtils.LineageDataset> datasets =
782769
JavaConverters.seqAsJavaList(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset));
783770
if (!datasets.isEmpty()) {
@@ -786,11 +773,6 @@ private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sql
786773
} else {
787774
log.info("No datasets found for query execution id {}", sqlExecutionId);
788775
}
789-
790-
// if (relations.isEmpty()) {
791-
// log.info("No DataSourceV2Relation found for query execution id {}",
792-
// sqlExecutionId);
793-
// }
794776
}
795777
} else {
796778
log.warn("Start: QueryExecution not found for sqlEnd queryExecutionId: {}", sqlExecutionId);

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
import java.nio.charset.StandardCharsets;
1414
import java.util.ArrayList;
1515
import java.util.Collection;
16+
import java.util.HashMap;
1617
import java.util.HashSet;
1718
import java.util.List;
1819
import java.util.Map;
1920
import java.util.Set;
2021
import org.apache.spark.scheduler.AccumulableInfo;
2122
import org.apache.spark.sql.catalyst.analysis.NamedRelation;
23+
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
24+
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation;
2225
import org.apache.spark.sql.catalyst.plans.logical.AppendData;
2326
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
2427
import org.apache.spark.sql.execution.SparkPlanInfo;
@@ -36,6 +39,7 @@ public class SparkSQLUtils {
3639

3740
private static final Class<?> dataSourceV2RelationClass;
3841
private static final Class<?> replaceDataClass;
42+
private static final Class<?> insertIntoHiveTableClass;
3943
private static final MethodHandle schemaMethod;
4044
private static final MethodHandle nameMethod;
4145
private static final MethodHandle propertiesMethod;
@@ -57,10 +61,16 @@ private static Class<?> findReplaceData() throws ClassNotFoundException {
5761
return Class.forName("org.apache.spark.sql.catalyst.plans.logical.ReplaceData");
5862
}
5963

64+
@SuppressForbidden // Using reflection to avoid splitting the instrumentation once more
65+
private static Class<?> findInsertIntoHiveTable() throws ClassNotFoundException {
66+
return Class.forName("org.apache.spark.sql.hive.execution.InsertIntoHiveTable");
67+
}
68+
6069
static {
6170
Class<?> relationClassFound = null;
6271
Class<?> tableClassFound = null;
6372
Class<?> replaceDataClassFound = null;
73+
Class<?> insertIntoHiveTableClassFound = null;
6474

6575
MethodHandle nameMethodFound = null;
6676
MethodHandle schemaMethodFound = null;
@@ -72,6 +82,7 @@ private static Class<?> findReplaceData() throws ClassNotFoundException {
7282
relationClassFound = findDataSourceV2Relation();
7383
tableClassFound = findTable();
7484
replaceDataClassFound = findReplaceData();
85+
insertIntoHiveTableClassFound = findInsertIntoHiveTable();
7586

7687
schemaMethodFound =
7788
lookup.findVirtual(tableClassFound, "schema", MethodType.methodType(StructType.class));
@@ -85,7 +96,9 @@ private static Class<?> findReplaceData() throws ClassNotFoundException {
8596

8697
dataSourceV2RelationClass = relationClassFound;
8798
replaceDataClass = replaceDataClassFound;
99+
insertIntoHiveTableClass = insertIntoHiveTableClassFound;
88100
tableClass = tableClassFound;
101+
89102
schemaMethod = schemaMethodFound;
90103
nameMethod = nameMethodFound;
91104
propertiesMethod = propertiesMethodFound;
@@ -323,7 +336,9 @@ public boolean isDefinedAt(LogicalPlan x) {
323336
return x instanceof DataSourceV2Relation
324337
|| (x instanceof AppendData
325338
&& ((AppendData) x).table() instanceof DataSourceV2Relation)
326-
|| (replaceDataClass != null && replaceDataClass.isInstance(x));
339+
|| (replaceDataClass != null && replaceDataClass.isInstance(x))
340+
|| (insertIntoHiveTableClass != null && insertIntoHiveTableClass.isInstance(x))
341+
|| x instanceof HiveTableRelation;
327342
}
328343

329344
@Override
@@ -368,10 +383,55 @@ public LineageDataset apply(LogicalPlan x) {
368383
} catch (Throwable ignored) {
369384
log.info("Error while converting logical plan to dataset", ignored);
370385
}
386+
} else if (insertIntoHiveTableClass != null && insertIntoHiveTableClass.isInstance(x)) {
387+
log.info(
388+
"class {} is instance of {}",
389+
x.getClass().getName(),
390+
insertIntoHiveTableClass.getName());
391+
392+
try {
393+
return parseCatalogTable(
394+
(CatalogTable) x.getClass().getMethod("table").invoke(x), "output");
395+
} catch (Throwable ignored) {
396+
log.info("Error while converting logical plan to dataset", ignored);
397+
}
398+
} else if (x instanceof HiveTableRelation) {
399+
log.info(
400+
"class {} is instance of {}",
401+
x.getClass().getName(),
402+
HiveTableRelation.class.getName());
403+
return parseCatalogTable(((HiveTableRelation) x).tableMeta(), "input");
371404
}
372405
return null;
373406
}
374407

408+
private LineageDataset parseCatalogTable(CatalogTable table, String datasetType) {
409+
Map<String, String> properties = new HashMap<>();
410+
411+
if (table.provider().isDefined()) {
412+
properties.put("provider", table.provider().get());
413+
}
414+
415+
if (table.storage().locationUri().isDefined()) {
416+
properties.put("location", table.storage().locationUri().get().toString());
417+
}
418+
properties.put("storage", table.storage().toString());
419+
properties.put("created_time", Long.toString(table.createTime()));
420+
properties.put("last_access_time", Long.toString(table.lastAccessTime()));
421+
properties.put("owner", table.owner());
422+
properties.put("comment", table.comment().getOrElse(() -> ""));
423+
properties.put(
424+
"partition_columns",
425+
JavaConverters.asJavaCollection(table.partitionColumnNames()).toString());
426+
427+
return new LineageDataset(
428+
table.qualifiedName(),
429+
table.schema().json(),
430+
table.stats().toString(),
431+
properties.toString(),
432+
datasetType);
433+
}
434+
375435
private LineageDataset parseDataSourceV2Relation(Object logicalPlan, String datasetType) {
376436
try {
377437
String tableName = null;

0 commit comments

Comments
 (0)