Skip to content

Commit 0cae20c

Browse files
committed
reflection against SparkPlanInfoAdvice
1 parent 73c3ac0 commit 0cae20c

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
99
import net.bytebuddy.asm.Advice;
10+
import de.thetaphi.forbiddenapis.SuppressForbidden;
1011
import org.apache.spark.SparkContext;
1112
import org.apache.spark.sql.execution.SparkPlan;
1213
import org.apache.spark.sql.execution.SparkPlanInfo;
@@ -95,20 +96,37 @@ public static void enter(@Advice.This SparkContext sparkContext) {
9596

9697
public static class SparkPlanInfoAdvice {
9798
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
99+
@SuppressForbidden
98100
public static void exit(
99101
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
100102
@Advice.Argument(0) SparkPlan plan) {
101103
if (planInfo.metadata().size() == 0
102104
&& (Config.get().isDataJobsParseSparkPlanEnabled()
103105
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
104106
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
105-
planInfo =
106-
new SparkPlanInfo(
107-
planInfo.nodeName(),
108-
planInfo.simpleString(),
109-
planInfo.children(),
110-
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan))),
111-
planInfo.metrics());
107+
scala.collection.immutable.Map<String, String> meta =
108+
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan)));
109+
try {
110+
Class<?> spiClass = Class.forName("org.apache.spark.sql.execution.SparkPlanInfo");
111+
java.lang.reflect.Constructor<?> targetCtor = null;
112+
for (java.lang.reflect.Constructor<?> c : spiClass.getConstructors()) {
113+
if (c.getParameterCount() == 5) {
114+
targetCtor = c;
115+
break;
116+
}
117+
}
118+
if (targetCtor != null) {
119+
Object newInst =
120+
targetCtor.newInstance(
121+
planInfo.nodeName(),
122+
planInfo.simpleString(),
123+
planInfo.children(),
124+
meta,
125+
planInfo.metrics());
126+
planInfo = (SparkPlanInfo) newInst;
127+
}
128+
} catch (Throwable ignored) {
129+
}
112130
}
113131
}
114132
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
66
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
77
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
8-
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
98

109
import datadog.trace.agent.tooling.Instrumenter;
1110
import datadog.trace.agent.tooling.InstrumenterModule;
@@ -69,8 +68,8 @@ public void methodAdvice(MethodTransformer transformer) {
6968
// LiveListenerBus class is used to manage spark listeners
7069
transformer.applyAdvice(
7170
isMethod()
72-
.and(named("addToSharedQueue").or(named("addToQueue")))
73-
.and(takesArguments(1))
71+
.and(named("addToSharedQueue"))
72+
.and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface")))
7473
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
7574
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
7675
}

0 commit comments

Comments
 (0)