Skip to content

Commit be7ada4

Browse files
committed
Extract Spark Plan product with keys for Scala 2.13
1 parent d19bf60 commit be7ada4

File tree

2 files changed

+153
-0
lines changed

2 files changed

+153
-0
lines changed

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import datadog.trace.api.Config;
99
import net.bytebuddy.asm.Advice;
1010
import org.apache.spark.SparkContext;
11+
import org.apache.spark.sql.execution.SparkPlan;
12+
import org.apache.spark.sql.execution.SparkPlanInfo;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
15+
import scala.collection.JavaConverters;
16+
import scala.collection.immutable.HashMap;
1317

1418
@AutoService(InstrumenterModule.class)
1519
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@@ -27,9 +31,22 @@ public String[] helperClassNames() {
2731
packageName + ".SparkSQLUtils",
2832
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
2933
packageName + ".SparkSQLUtils$AccumulatorWithStage",
34+
packageName + ".SparkPlanUtils",
3035
};
3136
}
3237

38+
@Override
39+
public String[] knownMatchingTypes() {
40+
String[] res = new String[super.knownMatchingTypes().length + 1];
41+
int idx = 0;
42+
for (String match : super.knownMatchingTypes()) {
43+
res[idx] = match;
44+
idx++;
45+
}
46+
res[idx] = "org.apache.spark.sql.execution.SparkPlanInfo$";
47+
return res;
48+
}
49+
3350
@Override
3451
public void methodAdvice(MethodTransformer transformer) {
3552
super.methodAdvice(transformer);
@@ -40,6 +57,13 @@ public void methodAdvice(MethodTransformer transformer) {
4057
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
4158
.and(takesNoArguments()),
4259
Spark213Instrumentation.class.getName() + "$InjectListener");
60+
61+
transformer.applyAdvice(
62+
isMethod()
63+
.and(named("fromSparkPlan"))
64+
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
65+
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
66+
Spark213Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
4367
}
4468

4569
public static class InjectListener {
@@ -79,4 +103,22 @@ public static void enter(@Advice.This SparkContext sparkContext) {
79103
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
80104
}
81105
}
106+
107+
public static class SparkPlanInfoAdvice {
108+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
109+
public static void exit(
110+
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
111+
@Advice.Argument(0) SparkPlan plan) {
112+
if (planInfo.metadata().size() == 0) {
113+
planInfo =
114+
new SparkPlanInfo(
115+
planInfo.nodeName(),
116+
planInfo.simpleString(),
117+
planInfo.children(),
118+
HashMap.from(
119+
JavaConverters.asScala(SparkPlanUtils.extractPlanProduct(plan)).toList()),
120+
planInfo.metrics());
121+
}
122+
}
123+
}
82124
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.HashMap;
6+
import java.util.Iterator;
7+
import java.util.Map;
8+
import org.apache.spark.sql.catalyst.trees.TreeNode;
9+
import org.apache.spark.sql.execution.ReusedSubqueryExec;
10+
import org.apache.spark.sql.execution.SparkPlan;
11+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
12+
import org.apache.spark.sql.execution.adaptive.QueryStageExec;
13+
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec;
14+
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec;
15+
import scala.None$;
16+
import scala.collection.JavaConverters;
17+
import scala.collection.immutable.$colon$colon;
18+
import scala.collection.immutable.Iterable;
19+
import scala.collection.immutable.Nil$;
20+
21+
// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
22+
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
23+
public class SparkPlanUtils {
24+
public static ArrayList<SparkPlan> extractChildren(SparkPlan plan) {
25+
/*
26+
Get children of this node. Logic in Spark:
27+
28+
val children = plan match {
29+
case ReusedExchangeExec(_, child) => child :: Nil
30+
case ReusedSubqueryExec(child) => child :: Nil
31+
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
32+
case stage: QueryStageExec => stage.plan :: Nil
33+
case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil
34+
case EmptyRelationExec(logical) => (logical :: Nil)
35+
case _ => plan.children ++ plan.subqueries
36+
}
37+
*/
38+
ArrayList<SparkPlan> children = new ArrayList<>();
39+
if (plan instanceof ReusedExchangeExec) {
40+
children.add(((ReusedExchangeExec) plan).child());
41+
} else if (plan instanceof ReusedSubqueryExec) {
42+
children.add(((ReusedSubqueryExec) plan).child());
43+
} else if (plan instanceof AdaptiveSparkPlanExec) {
44+
children.add(((AdaptiveSparkPlanExec) plan).executedPlan());
45+
} else if (plan instanceof QueryStageExec) {
46+
children.add(((QueryStageExec) plan).plan());
47+
} else if (plan instanceof InMemoryTableScanExec) {
48+
children.add(((InMemoryTableScanExec) plan).relation().cachedPlan());
49+
// New as of Spark 4.0.0
50+
// } else if (plan instanceof EmptyRelationExec) {
51+
// children.add(((EmptyRelationExec) plan).logical);
52+
}
53+
54+
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.subqueries().iterator());
55+
it.hasNext(); ) {
56+
children.add(it.next());
57+
}
58+
for (Iterator<SparkPlan> it = JavaConverters.asJavaIterator(plan.children().iterator());
59+
it.hasNext(); ) {
60+
children.add(it.next());
61+
}
62+
63+
return children;
64+
}
65+
66+
public static Map<String, String> extractPlanProduct(SparkPlan plan) {
67+
HashMap<String, String> args = new HashMap<>();
68+
HashMap<String, String> unparsed = new HashMap<>();
69+
70+
int i = 0;
71+
for (Iterator<Object> it = JavaConverters.asJavaIterator(plan.productIterator());
72+
it.hasNext(); ) {
73+
Object obj = it.next();
74+
75+
// TODO: improve parsing of certain types
76+
// 1. Some() should be unwrapped
77+
// 2. requiredSchema on Scan * (currently showing StructType)
78+
79+
// TODO: support a few more common types?
80+
// condition=org.apache.spark.sql.catalyst.expressions.objects.Invoke
81+
// joinType=org.apache.spark.sql.catalyst.plans.Inner$
82+
// buildSide=org.apache.spark.sql.catalyst.optimizer.BuildRight$
83+
// shuffleOrigin=org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS$
84+
// outputPartitioning=org.apache.spark.sql.catalyst.plans.physical.SinglePartition$
85+
if (obj instanceof String
86+
|| obj instanceof Boolean
87+
|| obj instanceof Collection
88+
|| obj instanceof None$
89+
|| obj instanceof Integer) {
90+
args.put(plan.productElementName(i), obj.toString());
91+
} else if (obj instanceof $colon$colon || obj instanceof Nil$) {
92+
args.put(plan.productElementName(i), JavaConverters.asJava(((Iterable) obj)).toString());
93+
} else if (obj instanceof TreeNode) {
94+
// Filter out any potential child nodes
95+
// TODO: Exempt conditions from this branch
96+
// e.g. condition=class org.apache.spark.sql.catalyst.expressions.objects.Invoke
97+
unparsed.put(plan.productElementName(i), obj.getClass().getName());
98+
} else {
99+
args.put(plan.productElementName(i), obj.toString());
100+
}
101+
102+
i++;
103+
}
104+
105+
if (unparsed.size() > 0) {
106+
// For now, place what we can't parse here with the types so we're aware of them
107+
args.put("_dd.unparsed", unparsed.toString());
108+
}
109+
return args;
110+
}
111+
}

0 commit comments

Comments
 (0)