Skip to content

Commit 94f3139

Browse files
committed
Remove unused logic to parse children, enrich product parsing to support more types and use JSON arrays
1 parent 3f9c26b commit 94f3139

File tree

4 files changed

+28
-109
lines changed

4 files changed

+28
-109
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,14 @@
11
package datadog.trace.instrumentation.spark;
22

3-
import java.util.ArrayList;
43
import java.util.HashMap;
54
import java.util.Iterator;
65
import java.util.Map;
76
import org.apache.spark.sql.execution.SparkPlan;
8-
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec;
97
import scala.collection.JavaConverters;
108

119
// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
1210
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
1311
public class Spark212PlanUtils {
14-
public static ArrayList<SparkPlan> extractChildren(SparkPlan plan) {
15-
/*
16-
Get children of this node. Logic in Spark:
17-
18-
val children = plan match {
19-
case ReusedExchangeExec(_, child) => child :: Nil
20-
case _ => plan.children ++ plan.subqueries
21-
}
22-
*/
23-
ArrayList<SparkPlan> children = new ArrayList<>();
24-
if (plan instanceof ReusedExchangeExec) {
25-
children.add(((ReusedExchangeExec) plan).child());
26-
}
27-
28-
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.subqueries().iterator());
29-
it.hasNext(); ) {
30-
children.add(it.next());
31-
}
32-
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.children().iterator());
33-
it.hasNext(); ) {
34-
children.add(it.next());
35-
}
36-
37-
return children;
38-
}
39-
4012
public static Map<String, String> extractPlanProduct(SparkPlan plan) {
4113
HashMap<String, String> args = new HashMap<>();
4214
HashMap<String, String> unparsed = new HashMap<>();

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,14 @@
11
package datadog.trace.instrumentation.spark;
22

3-
import java.util.ArrayList;
43
import java.util.HashMap;
54
import java.util.Iterator;
65
import java.util.Map;
7-
import org.apache.spark.sql.execution.ReusedSubqueryExec;
86
import org.apache.spark.sql.execution.SparkPlan;
9-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
10-
import org.apache.spark.sql.execution.adaptive.QueryStageExec;
11-
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec;
12-
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec;
137
import scala.collection.JavaConverters;
148

159
// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
1610
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
1711
public class Spark213PlanUtils {
18-
public static ArrayList<SparkPlan> extractChildren(SparkPlan plan) {
19-
/*
20-
Get children of this node. Logic in Spark:
21-
22-
val children = plan match {
23-
case ReusedExchangeExec(_, child) => child :: Nil
24-
case ReusedSubqueryExec(child) => child :: Nil
25-
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
26-
case stage: QueryStageExec => stage.plan :: Nil
27-
case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil
28-
case EmptyRelationExec(logical) => (logical :: Nil)
29-
case _ => plan.children ++ plan.subqueries
30-
}
31-
*/
32-
// TODO: How does this interact with different versions of Spark? (specifically an older version
33-
// that does not have those types)
34-
ArrayList<SparkPlan> children = new ArrayList<>();
35-
if (plan instanceof ReusedExchangeExec) {
36-
children.add(((ReusedExchangeExec) plan).child());
37-
} else if (plan instanceof ReusedSubqueryExec) {
38-
children.add(((ReusedSubqueryExec) plan).child());
39-
} else if (plan instanceof AdaptiveSparkPlanExec) {
40-
children.add(((AdaptiveSparkPlanExec) plan).executedPlan());
41-
} else if (plan instanceof QueryStageExec) {
42-
children.add(((QueryStageExec) plan).plan());
43-
} else if (plan instanceof InMemoryTableScanExec) {
44-
children.add(((InMemoryTableScanExec) plan).relation().cachedPlan());
45-
// New as of Spark 4.0.0
46-
// } else if (plan instanceof EmptyRelationExec) {
47-
// children.add(((EmptyRelationExec) plan).logical);
48-
}
49-
50-
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.subqueries().iterator());
51-
it.hasNext(); ) {
52-
children.add(it.next());
53-
}
54-
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.children().iterator());
55-
it.hasNext(); ) {
56-
children.add(it.next());
57-
}
58-
59-
return children;
60-
}
61-
6212
public static Map<String, String> extractPlanProduct(SparkPlan plan) {
6313
HashMap<String, String> args = new HashMap<>();
6414
HashMap<String, String> unparsed = new HashMap<>();

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/CommonSparkPlanUtils.java

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,21 @@
11
package datadog.trace.instrumentation.spark;
22

3-
import java.util.Collection;
4-
import org.apache.spark.sql.catalyst.trees.TreeNode;
5-
import scala.None$;
6-
import scala.collection.JavaConverters;
7-
import scala.collection.immutable.$colon$colon;
8-
import scala.collection.immutable.Iterable;
9-
import scala.collection.immutable.Nil$;
3+
import java.util.ArrayList;
4+
import org.apache.spark.sql.catalyst.plans.QueryPlan;
5+
import scala.Option;
6+
import scala.collection.Iterable;
107

118
public class CommonSparkPlanUtils {
129
public static String parsePlanProduct(Object value) {
13-
// TODO: improve parsing of certain types
14-
// 1. Some() should be unwrapped
15-
// 2. requiredSchema on Scan * (currently showing StructType)
16-
17-
// TODO: support a few more common types?
18-
// condition=org.apache.spark.sql.catalyst.expressions.objects.Invoke
19-
// joinType=org.apache.spark.sql.catalyst.plans.Inner$
20-
// buildSide=org.apache.spark.sql.catalyst.optimizer.BuildRight$
21-
// shuffleOrigin=org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS$
22-
// outputPartitioning=org.apache.spark.sql.catalyst.plans.physical.SinglePartition$
23-
if (value instanceof String
24-
|| value instanceof Boolean
25-
|| value instanceof Collection
26-
|| value instanceof None$
27-
|| value instanceof Integer) {
28-
return value.toString();
29-
} else if (value instanceof $colon$colon || value instanceof Nil$) {
30-
return JavaConverters.asJavaIterable(((Iterable) value)).toString();
31-
} else if (value instanceof TreeNode) {
32-
// Filter out any potential child nodes
33-
// TODO: Exempt conditions from this branch
34-
// e.g. condition=class org.apache.spark.sql.catalyst.expressions.objects.Invoke
10+
if (value == null) {
11+
return "null";
12+
} else if (value instanceof Iterable) {
13+
ArrayList<String> list = new ArrayList<>();
14+
((Iterable) value).foreach(item -> list.add(parsePlanProduct(item)));
15+
return "[\"" + String.join("\", \"", list) + "\"]";
16+
} else if (value instanceof Option) {
17+
return parsePlanProduct(((Option) value).getOrElse(() -> "none"));
18+
} else if (value instanceof QueryPlan) { // Filter out values referencing child nodes
3519
return null;
3620
} else {
3721
return value.toString();

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,26 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
173173
generator.writeStringField("nodeDetailString", nodeDetails);
174174
}
175175

176-
// Metadata is only present for FileSourceScan nodes
176+
// Metadata is only added natively by Spark for FileSourceScan nodes
177+
// We leverage this to extract & inject additional argument-level data
177178
if (!plan.metadata().isEmpty()) {
178179
generator.writeFieldName("meta");
179180
generator.writeStartObject();
180181

181182
for (Tuple2<String, String> metadata : JavaConverters.asJavaCollection(plan.metadata())) {
182-
generator.writeStringField(metadata._1, metadata._2);
183+
// If it looks like a string array, break apart and write as native JSON array
184+
if (metadata._2.startsWith("[\"") && metadata._2.endsWith("\"]")) {
185+
String[] list = metadata._2.substring(2, metadata._2.length() - 2).split("\", \"");
186+
187+
generator.writeFieldName(metadata._1);
188+
generator.writeStartArray();
189+
for (String entry : list) {
190+
generator.writeString(entry);
191+
}
192+
generator.writeEndArray();
193+
} else {
194+
generator.writeStringField(metadata._1, metadata._2);
195+
}
183196
}
184197

185198
generator.writeEndObject();

0 commit comments

Comments
 (0)