Skip to content

Commit 9c19e11

Browse files
committed
Extract Spark Plan product values for Spark 2.12
1 parent be7ada4 commit 9c19e11

File tree

5 files changed

+160
-33
lines changed

5 files changed

+160
-33
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import datadog.trace.api.Config;
99
import net.bytebuddy.asm.Advice;
1010
import org.apache.spark.SparkContext;
11+
import org.apache.spark.sql.execution.SparkPlan;
12+
import org.apache.spark.sql.execution.SparkPlanInfo;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
15+
import scala.collection.JavaConverters;
16+
import scala.collection.immutable.HashMap;
1317

1418
@AutoService(InstrumenterModule.class)
1519
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@@ -27,9 +31,22 @@ public String[] helperClassNames() {
2731
packageName + ".SparkSQLUtils",
2832
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2933
packageName + ".SparkSQLUtils$AccumulatorWithStage",
34+
packageName + ".Spark212PlanUtils",
3035
};
3136
}
3237

38+
@Override
39+
public String[] knownMatchingTypes() {
40+
String[] res = new String[super.knownMatchingTypes().length + 1];
41+
int idx = 0;
42+
for (String match : super.knownMatchingTypes()) {
43+
res[idx] = match;
44+
idx++;
45+
}
46+
res[idx] = "org.apache.spark.sql.execution.SparkPlanInfo$";
47+
return res;
48+
}
49+
3350
@Override
3451
public void methodAdvice(MethodTransformer transformer) {
3552
super.methodAdvice(transformer);
@@ -40,6 +57,13 @@ public void methodAdvice(MethodTransformer transformer) {
4057
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
4158
.and(takesNoArguments()),
4259
Spark212Instrumentation.class.getName() + "$InjectListener");
60+
61+
transformer.applyAdvice(
62+
isMethod()
63+
.and(named("fromSparkPlan"))
64+
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
65+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
66+
Spark212Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
4367
}
4468

4569
public static class InjectListener {
@@ -78,4 +102,23 @@ public static void enter(@Advice.This SparkContext sparkContext) {
78102
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
79103
}
80104
}
105+
106+
public static class SparkPlanInfoAdvice {
107+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
108+
public static void exit(
109+
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
110+
@Advice.Argument(0) SparkPlan plan) {
111+
if (planInfo.metadata().size() == 0) {
112+
HashMap<String, String> args = new HashMap<>();
113+
planInfo =
114+
new SparkPlanInfo(
115+
planInfo.nodeName(),
116+
planInfo.simpleString(),
117+
planInfo.children(),
118+
args.$plus$plus(
119+
JavaConverters.mapAsScalaMap(Spark212PlanUtils.extractPlanProduct(plan))),
120+
planInfo.metrics());
121+
}
122+
}
123+
}
81124
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.Iterator;
6+
import java.util.Map;
7+
import org.apache.spark.sql.execution.SparkPlan;
8+
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec;
9+
import scala.collection.JavaConverters;
10+
11+
// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
12+
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
13+
public class Spark212PlanUtils {
14+
public static ArrayList<SparkPlan> extractChildren(SparkPlan plan) {
15+
/*
16+
Get children of this node. Logic in Spark:
17+
18+
val children = plan match {
19+
case ReusedExchangeExec(_, child) => child :: Nil
20+
case _ => plan.children ++ plan.subqueries
21+
}
22+
*/
23+
ArrayList<SparkPlan> children = new ArrayList<>();
24+
if (plan instanceof ReusedExchangeExec) {
25+
children.add(((ReusedExchangeExec) plan).child());
26+
}
27+
28+
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.subqueries().iterator());
29+
it.hasNext(); ) {
30+
children.add(it.next());
31+
}
32+
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.children().iterator());
33+
it.hasNext(); ) {
34+
children.add(it.next());
35+
}
36+
37+
return children;
38+
}
39+
40+
public static Map<String, String> extractPlanProduct(SparkPlan plan) {
41+
HashMap<String, String> args = new HashMap<>();
42+
HashMap<String, String> unparsed = new HashMap<>();
43+
44+
int i = 0;
45+
for (Iterator<Object> it = JavaConverters.asJavaIterator(plan.productIterator());
46+
it.hasNext(); ) {
47+
Object obj = it.next();
48+
String key = String.format("_dd.unknown_key.%d", i);
49+
50+
String val = CommonSparkPlanUtils.parsePlanProduct(obj);
51+
if (val != null) {
52+
args.put(key, val);
53+
} else {
54+
unparsed.put(key, obj.getClass().getName());
55+
}
56+
57+
i++;
58+
}
59+
60+
if (unparsed.size() > 0) {
61+
// For now, place what we can't parse here with the types so we're aware of them
62+
args.put("_dd.unparsed", unparsed.toString());
63+
}
64+
return args;
65+
}
66+
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ public String[] helperClassNames() {
3131
packageName + ".SparkSQLUtils",
3232
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
3333
packageName + ".SparkSQLUtils$AccumulatorWithStage",
34-
packageName + ".SparkPlanUtils",
34+
packageName + ".Spark213PlanUtils",
35+
packageName + ".CommonSparkPlanUtils",
3536
};
3637
}
3738

@@ -116,7 +117,7 @@ public static void exit(
116117
planInfo.simpleString(),
117118
planInfo.children(),
118119
HashMap.from(
119-
JavaConverters.asScala(SparkPlanUtils.extractPlanProduct(plan)).toList()),
120+
JavaConverters.asScala(Spark213PlanUtils.extractPlanProduct(plan)).toList()),
120121
planInfo.metrics());
121122
}
122123
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/SparkPlanUtils.java renamed to dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,20 @@
11
package datadog.trace.instrumentation.spark;
22

33
import java.util.ArrayList;
4-
import java.util.Collection;
54
import java.util.HashMap;
65
import java.util.Iterator;
76
import java.util.Map;
8-
import org.apache.spark.sql.catalyst.trees.TreeNode;
97
import org.apache.spark.sql.execution.ReusedSubqueryExec;
108
import org.apache.spark.sql.execution.SparkPlan;
119
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
1210
import org.apache.spark.sql.execution.adaptive.QueryStageExec;
1311
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec;
1412
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec;
15-
import scala.None$;
1613
import scala.collection.JavaConverters;
17-
import scala.collection.immutable.$colon$colon;
18-
import scala.collection.immutable.Iterable;
19-
import scala.collection.immutable.Nil$;
2014

2115
// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
2216
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
23-
public class SparkPlanUtils {
17+
public class Spark213PlanUtils {
2418
public static ArrayList<SparkPlan> extractChildren(SparkPlan plan) {
2519
/*
2620
Get children of this node. Logic in Spark:
@@ -35,6 +29,8 @@ case EmptyRelationExec(logical) => (logical :: Nil)
3529
case _ => plan.children ++ plan.subqueries
3630
}
3731
*/
32+
// TODO: How does this interact with different versions of Spark? (specifically an older version
33+
// that does not have those types)
3834
ArrayList<SparkPlan> children = new ArrayList<>();
3935
if (plan instanceof ReusedExchangeExec) {
4036
children.add(((ReusedExchangeExec) plan).child());
@@ -71,32 +67,13 @@ public static Map<String, String> extractPlanProduct(SparkPlan plan) {
7167
for (Iterator<Object> it = JavaConverters.asJavaIterator(plan.productIterator());
7268
it.hasNext(); ) {
7369
Object obj = it.next();
70+
String key = plan.productElementName(i);
7471

75-
// TODO: improve parsing of certain types
76-
// 1. Some() should be unwrapped
77-
// 2. requiredSchema on Scan * (currently showing StructType)
78-
79-
// TODO: support a few more common types?
80-
// condition=org.apache.spark.sql.catalyst.expressions.objects.Invoke
81-
// joinType=org.apache.spark.sql.catalyst.plans.Inner$
82-
// buildSide=org.apache.spark.sql.catalyst.optimizer.BuildRight$
83-
// shuffleOrigin=org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS$
84-
// outputPartitioning=org.apache.spark.sql.catalyst.plans.physical.SinglePartition$
85-
if (obj instanceof String
86-
|| obj instanceof Boolean
87-
|| obj instanceof Collection
88-
|| obj instanceof None$
89-
|| obj instanceof Integer) {
90-
args.put(plan.productElementName(i), obj.toString());
91-
} else if (obj instanceof $colon$colon || obj instanceof Nil$) {
92-
args.put(plan.productElementName(i), JavaConverters.asJava(((Iterable) obj)).toString());
93-
} else if (obj instanceof TreeNode) {
94-
// Filter out any potential child nodes
95-
// TODO: Exempt conditions from this branch
96-
// e.g. condition=class org.apache.spark.sql.catalyst.expressions.objects.Invoke
97-
unparsed.put(plan.productElementName(i), obj.getClass().getName());
72+
String val = CommonSparkPlanUtils.parsePlanProduct(obj);
73+
if (val != null) {
74+
args.put(key, val);
9875
} else {
99-
args.put(plan.productElementName(i), obj.toString());
76+
unparsed.put(key, obj.getClass().getName());
10077
}
10178

10279
i++;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import java.util.Collection;
4+
import org.apache.spark.sql.catalyst.trees.TreeNode;
5+
import scala.None$;
6+
import scala.collection.JavaConverters;
7+
import scala.collection.immutable.$colon$colon;
8+
import scala.collection.immutable.Iterable;
9+
import scala.collection.immutable.Nil$;
10+
11+
public class CommonSparkPlanUtils {
12+
public static String parsePlanProduct(Object value) {
13+
// TODO: improve parsing of certain types
14+
// 1. Some() should be unwrapped
15+
// 2. requiredSchema on Scan * (currently showing StructType)
16+
17+
// TODO: support a few more common types?
18+
// condition=org.apache.spark.sql.catalyst.expressions.objects.Invoke
19+
// joinType=org.apache.spark.sql.catalyst.plans.Inner$
20+
// buildSide=org.apache.spark.sql.catalyst.optimizer.BuildRight$
21+
// shuffleOrigin=org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS$
22+
// outputPartitioning=org.apache.spark.sql.catalyst.plans.physical.SinglePartition$
23+
if (value instanceof String
24+
|| value instanceof Boolean
25+
|| value instanceof Collection
26+
|| value instanceof None$
27+
|| value instanceof Integer) {
28+
return value.toString();
29+
} else if (value instanceof $colon$colon || value instanceof Nil$) {
30+
return JavaConverters.asJavaIterable(((Iterable) value)).toString();
31+
} else if (value instanceof TreeNode) {
32+
// Filter out any potential child nodes
33+
// TODO: Exempt conditions from this branch
34+
// e.g. condition=class org.apache.spark.sql.catalyst.expressions.objects.Invoke
35+
return null;
36+
} else {
37+
return value.toString();
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)