Skip to content

Commit 4795725

Browse files
committed
Use SparkPlanInfo constructor that is compatible with Databricks' Spark fork
1 parent 9244e6b commit 4795725

File tree

2 files changed

+48
-14
lines changed

2 files changed

+48
-14
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,21 +110,38 @@ public static void exit(
110110
.toMap(Predef.$conforms());
111111
try {
112112
Constructor<?> targetCtor = null;
113+
Object[] args = null;
113114
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114115
if (c.getParameterCount() == 5) {
115116
targetCtor = c;
117+
args =
118+
new Object[] {
119+
planInfo.nodeName(),
120+
planInfo.simpleString(),
121+
planInfo.children(),
122+
meta,
123+
planInfo.metrics()
124+
};
116125
break;
117-
}
118-
}
119-
if (targetCtor != null) {
120-
Object newInst =
121-
targetCtor.newInstance(
126+
} else if (c.getParameterCount() == 8) {
127+
// Databricks fork of Spark has a different SparkPlanInfo constructor
128+
targetCtor = c;
129+
args =
130+
new Object[] {
122131
planInfo.nodeName(),
123132
planInfo.simpleString(),
124133
planInfo.children(),
125134
meta,
126-
planInfo.metrics());
127-
planInfo = (SparkPlanInfo) newInst;
135+
planInfo.metrics(),
136+
SparkPlanInfo.class.getMethod("estRowCount").invoke(new Object[] {}),
137+
SparkPlanInfo.class.getMethod("rddScopeId").invoke(new Object[] {}),
138+
SparkPlanInfo.class.getMethod("explainId").invoke(new Object[] {})
139+
};
140+
break;
141+
}
142+
}
143+
if (targetCtor != null && args != null) {
144+
planInfo = (SparkPlanInfo) targetCtor.newInstance(args);
128145
}
129146
} catch (Throwable ignored) {
130147
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,21 +110,38 @@ public static void exit(
110110
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan)));
111111
try {
112112
Constructor<?> targetCtor = null;
113+
Object[] args = null;
113114
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114115
if (c.getParameterCount() == 5) {
115116
targetCtor = c;
117+
args =
118+
new Object[] {
119+
planInfo.nodeName(),
120+
planInfo.simpleString(),
121+
planInfo.children(),
122+
meta,
123+
planInfo.metrics()
124+
};
116125
break;
117-
}
118-
}
119-
if (targetCtor != null) {
120-
Object newInst =
121-
targetCtor.newInstance(
126+
} else if (c.getParameterCount() == 8) {
127+
// Databricks fork of Spark has a different SparkPlanInfo constructor
128+
targetCtor = c;
129+
args =
130+
new Object[] {
122131
planInfo.nodeName(),
123132
planInfo.simpleString(),
124133
planInfo.children(),
125134
meta,
126-
planInfo.metrics());
127-
planInfo = (SparkPlanInfo) newInst;
135+
planInfo.metrics(),
136+
SparkPlanInfo.class.getMethod("estRowCount").invoke(new Object[] {}),
137+
SparkPlanInfo.class.getMethod("rddScopeId").invoke(new Object[] {}),
138+
SparkPlanInfo.class.getMethod("explainId").invoke(new Object[] {})
139+
};
140+
break;
141+
}
142+
}
143+
if (targetCtor != null && args != null) {
144+
planInfo = (SparkPlanInfo) targetCtor.newInstance(args);
128145
}
129146
} catch (Throwable ignored) {
130147
}

0 commit comments

Comments
 (0)