Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
be7ada4
Extract Spark Plan product with keys for Scala 2.13
charlesmyu Oct 10, 2025
9c19e11
Extract Spark Plan product values for Spark 2.12
charlesmyu Oct 10, 2025
3f9c26b
Update tests for meta field in Spark SQL plans
charlesmyu Oct 14, 2025
94f3139
Remove unused logic to parse children, enrich product parsing to supp…
charlesmyu Oct 15, 2025
beb4d5f
Update tests to assert on meta values
charlesmyu Oct 16, 2025
750c68f
Use Abstract class for common functions
charlesmyu Oct 16, 2025
0279fff
Use Jackson JSON parser instead of rolling own parsing
charlesmyu Oct 16, 2025
50fa41a
Refactor AbstractSparkPlanUtils to only require key generation on impl
charlesmyu Oct 17, 2025
51835fa
Default to returning null if class not recognized, limit recursion de…
charlesmyu Oct 20, 2025
c68b356
Improve testing scheme for Spark32 on Scala 212 with unknown keys
charlesmyu Oct 20, 2025
8bf8488
Improve method & class naming, reuse ObjectMapper from listener
charlesmyu Oct 21, 2025
55b917b
Gate Spark Plan parsing with flag
charlesmyu Oct 21, 2025
047880a
Match classes by string comparison, add negative cache
charlesmyu Oct 21, 2025
3619b77
Add unit tests for AbstractSparkPlanSerializer
charlesmyu Oct 23, 2025
53918a3
Make ObjectMapper protected on AbstractDatadogSparkListener instead o…
charlesmyu Oct 23, 2025
6e68c69
Specify correct helper class names
charlesmyu Oct 23, 2025
5483ba8
Add dd.data.jobs.experimental_features.enabled FF
charlesmyu Oct 23, 2025
e4973fc
Remove knownMatchingTypes override from version-specific impls
charlesmyu Oct 24, 2025
5527ad0
Catch NullPointerException for getDeclaredMethod calls
charlesmyu Oct 24, 2025
1f31add
Adjust more gates to match classes using string comparison
charlesmyu Oct 24, 2025
18e51d5
Revert "Catch NullPointerException for getDeclaredMethod calls"
charlesmyu Oct 24, 2025
de336b9
Explicit cast to String on simpleString calls
charlesmyu Oct 27, 2025
160558e
Use toMap to convert mutable to immutable map in Scala 2.12
charlesmyu Oct 27, 2025
d4c8264
Improvements from comments: use singleton, string concat instead of f…
charlesmyu Oct 27, 2025
ef18062
Avoid merge conflict, reorder flags
charlesmyu Oct 27, 2025
34528dc
Exit loop early, store reflected methods as class fields
charlesmyu Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.HashMap;

@AutoService(InstrumenterModule.class)
public class Spark212Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".AbstractSparkPlanUtils",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark212Listener",
Expand All @@ -27,9 +32,22 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".Spark212PlanUtils"
};
}

@Override
public String[] knownMatchingTypes() {
String[] res = new String[super.knownMatchingTypes().length + 1];
int idx = 0;
for (String match : super.knownMatchingTypes()) {
res[idx] = match;
idx++;
}
res[idx] = "org.apache.spark.sql.execution.SparkPlanInfo$";
return res;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
super.methodAdvice(transformer);
Expand All @@ -40,6 +58,13 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark212Instrumentation.class.getName() + "$InjectListener");

transformer.applyAdvice(
isMethod()
.and(named("fromSparkPlan"))
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
Spark212Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
}

public static class InjectListener {
Expand Down Expand Up @@ -78,4 +103,24 @@ public static void enter(@Advice.This SparkContext sparkContext) {
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}

public static class SparkPlanInfoAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
@Advice.Argument(0) SparkPlan plan) {
if (planInfo.metadata().size() == 0) {
Spark212PlanUtils planUtils = new Spark212PlanUtils();
HashMap<String, String> args = new HashMap<>();
planInfo =
new SparkPlanInfo(
planInfo.nodeName(),
planInfo.simpleString(),
planInfo.children(),
args.$plus$plus(
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need args here? It seems like it would always be an empty map, so there isn't a need to concatenate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment from above:

To be frank, I was struggling with this a lot - I was trying to convert scala.collection.mutable.Map to scala.collection.immutable.Map, but I didn't quite know how to do those with a lot of the Scala implicits. Updated now to use toMap instead (figured out how to get the <:< implicit sorted properly). Let me know if this looks better!

160558e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops didn't see the review earlier. Thanks for the update!

planInfo.metrics());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package datadog.trace.instrumentation.spark;

import org.apache.spark.sql.catalyst.trees.TreeNode;

public class Spark212PlanUtils extends AbstractSparkPlanUtils {
public String getKey(int idx, TreeNode node) {
return String.format("_dd.unknown_key.%d", idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import datadog.trace.api.Config;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.immutable.HashMap;

@AutoService(InstrumenterModule.class)
public class Spark213Instrumentation extends AbstractSparkInstrumentation {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".AbstractSparkPlanUtils",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark213Listener",
Expand All @@ -27,9 +32,22 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".Spark213PlanUtils"
};
}

@Override
public String[] knownMatchingTypes() {
String[] res = new String[super.knownMatchingTypes().length + 1];
int idx = 0;
for (String match : super.knownMatchingTypes()) {
res[idx] = match;
idx++;
}
res[idx] = "org.apache.spark.sql.execution.SparkPlanInfo$";
return res;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
super.methodAdvice(transformer);
Expand All @@ -40,6 +58,13 @@ public void methodAdvice(MethodTransformer transformer) {
.and(isDeclaredBy(named("org.apache.spark.SparkContext")))
.and(takesNoArguments()),
Spark213Instrumentation.class.getName() + "$InjectListener");

transformer.applyAdvice(
isMethod()
.and(named("fromSparkPlan"))
.and(takesArgument(0, named("org.apache.spark.sql.execution.SparkPlan")))
.and(isDeclaredBy(named("org.apache.spark.sql.execution.SparkPlanInfo$"))),
Spark213Instrumentation.class.getName() + "$SparkPlanInfoAdvice");
}

public static class InjectListener {
Expand Down Expand Up @@ -79,4 +104,23 @@ public static void enter(@Advice.This SparkContext sparkContext) {
sparkContext.listenerBus().addToSharedQueue(AbstractDatadogSparkListener.listener);
}
}

public static class SparkPlanInfoAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(
@Advice.Return(readOnly = false) SparkPlanInfo planInfo,
@Advice.Argument(0) SparkPlan plan) {
if (planInfo.metadata().size() == 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using the existing metadata on the DataSourceScanExec nodes, we open ourselves to a bit of inconsistency in the JSON parsing:

"meta": {
    "Format": "Parquet",
    "Batched": true,
    ...,
    "DataFilters": "[CASE WHEN PULocationID#28 IN (236,132,161) THEN true ELSE isnotnull(PULocationID#28) END]"
},

Specifically the lists are not quoted & escaped, which means when we read out the field it's treated as a string rather than a JSON native array. Ideally we would parse this ourselves and upsert it so we can control that formatting, but obviously there's a risk of the parsing going wrong and impacting something that actually uses the field. Leaning slightly towards keeping the formatting as-is in favour of not touching existing fields but happy to hear any other thoughts on this...

Spark213PlanUtils planUtils = new Spark213PlanUtils();
planInfo =
new SparkPlanInfo(
planInfo.nodeName(),
planInfo.simpleString(),
planInfo.children(),
HashMap.from(
JavaConverters.asScala(planUtils.extractFormattedProduct(plan)).toList()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JavaConverters.asScala(planUtils.extractFormattedProduct(plan)).toList()),
JavaConverters.asScala(planUtils.extractFormattedProduct(plan))),

Do we need to convert to a List first before converting to a HashMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated

d4c8264

planInfo.metrics());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package datadog.trace.instrumentation.spark;

import org.apache.spark.sql.catalyst.trees.TreeNode;

public class Spark213PlanUtils extends AbstractSparkPlanUtils {
public String getKey(int idx, TreeNode node) {
return node.productElementName(idx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.spark.Partitioner;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.plans.JoinType;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode;
import org.apache.spark.sql.catalyst.plans.physical.Partitioning;
import org.apache.spark.sql.catalyst.trees.TreeNode;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters;

// An extension of how Spark translates `SparkPlan`s to `SparkPlanInfo`, see here:
// https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala#L54
public abstract class AbstractSparkPlanUtils {
private final int MAX_DEPTH = 4;
private final int MAX_LENGTH = 50;
private final ObjectMapper mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final Class[] SAFE_CLASSES = {
Attribute.class, // simpleString appends data type, avoid by using toString
JoinType.class, // enum
Partitioner.class, // not a product or TreeNode
BroadcastMode.class, // not a product or TreeNode
maybeGetClass("org.apache.spark.sql.execution.exchange.ShuffleOrigin"), // enum (v3+)
maybeGetClass("org.apache.spark.sql.catalyst.optimizer.BuildSide"), // enum (v3+)
maybeGetClass(
"org.apache.spark.sql.execution.ShufflePartitionSpec"), // not a product or TreeNode (v3+)
};

public abstract String getKey(int idx, TreeNode node);

public Map<String, String> extractFormattedProduct(TreeNode plan) {
HashMap<String, String> result = new HashMap<>();
extractPlanProduct(plan, 0)
.forEach(
(key, value) -> {
result.put(key, writeObjectToString(value));
});
return result;
}

protected Map<String, Object> extractPlanProduct(TreeNode node, int depth) {
HashMap<String, Object> args = new HashMap<>();
HashMap<String, String> unparsed = new HashMap<>();

int i = 0;
for (Iterator<Object> it = JavaConverters.asJavaIterator(node.productIterator());
it.hasNext(); ) {
Object obj = it.next();

Object val = parsePlanProduct(obj, depth);
if (val != null) {
args.put(getKey(i, node), val);
} else {
unparsed.put(getKey(i, node), obj.getClass().getName());
}

i++;
}

if (unparsed.size() > 0) {
// For now, place what we can't parse here with the types so we're aware of them
args.put("_dd.unparsed", unparsed);
}
return args;
}

// Should only call on final values being written to `meta`
protected String writeObjectToString(Object value) {
try {
return mapper.writeValueAsString(value);
} catch (IOException e) {
return null;
}
}

protected Object parsePlanProduct(Object value, int depth) {
// This function MUST not arbitrarily serialize the object as we can't be sure what it is.
// A null return indicates object is unserializable, otherwise it should really only return
// valid
// JSON types (Array, Map, String, Boolean, Number, null)

if (value == null) {
return "null";
} else if (value instanceof String
|| value instanceof Boolean
|| Number.class.isInstance(value)) {
return value;
} else if (value instanceof Option) {
return parsePlanProduct(((Option) value).getOrElse(() -> "none"), depth);
} else if (value instanceof QueryPlan) {
// don't duplicate child nodes
return null;
} else if (value instanceof Iterable && depth < MAX_DEPTH) {
ArrayList<Object> list = new ArrayList<>();
for (Object item : JavaConverters.asJavaIterable((Iterable) value)) {
Object res = parsePlanProduct(item, depth + 1);
if (list.size() < MAX_LENGTH && res != null) {
list.add(res);
}
}
return list;
} else if (value instanceof Partitioning) {
if (value instanceof TreeNode && depth + 1 < MAX_DEPTH) {
HashMap<String, Object> inner = new HashMap<>();
inner.put(
value.getClass().getSimpleName(), extractPlanProduct(((TreeNode) value), depth + 2));
return inner;
} else {
return value.toString();
}
} else if (instanceOf(value, SAFE_CLASSES)) {
return value.toString();
} else if (value instanceof TreeNode) {
// fallback case, leave at bottom
return getSimpleString((TreeNode) value);
}

return null;
}

private String getSimpleString(TreeNode value) {
try {
// in Spark v3+, the signature of `simpleString` includes an int parameter for `maxFields`
return TreeNode.class
.getDeclaredMethod("simpleString", new Class[] {int.class})
.invoke(value, MAX_LENGTH)
.toString();
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException exception) {
try {
// Attempt the Spark v2 `simpleString` signature
return TreeNode.class.getDeclaredMethod("simpleString").invoke(value).toString();
} catch (NoSuchMethodException
| IllegalAccessException
| InvocationTargetException innerException) {
}

return null;
}
}

// Use reflection rather than native `instanceof` for classes added in later Spark versions
private boolean instanceOf(Object value, Class[] classes) {
for (Class cls : classes) {
if (cls != null && cls.isInstance(value)) {
return true;
}
}

return false;
}

private Class maybeGetClass(String cls) {
try {
return Class.forName(cls);
} catch (ClassNotFoundException e) {
return null;
}
}
}
Loading