Skip to content

Commit 0279fff

Browse files
committed
Use Jackson JSON parser instead of rolling own parsing
1 parent 750c68f commit 0279fff

File tree

6 files changed

+85
-83
lines changed

6 files changed

+85
-83
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
1919
Object obj = it.next();
2020
String key = String.format("_dd.unknown_key.%d", i);
2121

22-
String val = parsePlanProduct(obj);
22+
Object val = parsePlanProduct(obj);
2323
if (val != null) {
24-
args.put(key, val);
24+
args.put(key, writeObjectToString(val));
2525
} else {
2626
unparsed.put(key, obj.getClass().getName());
2727
}
@@ -31,7 +31,7 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
3131

3232
if (unparsed.size() > 0) {
3333
// For now, place what we can't parse here with the types so we're aware of them
34-
args.put("_dd.unparsed", unparsed.toString());
34+
args.put("_dd.unparsed", writeObjectToString(unparsed.toString()));
3535
}
3636
return args;
3737
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
1919
Object obj = it.next();
2020
String key = plan.productElementName(i);
2121

22-
String val = parsePlanProduct(obj);
22+
Object val = parsePlanProduct(obj);
2323
if (val != null) {
24-
args.put(key, val);
24+
args.put(key, writeObjectToString(val));
2525
} else {
2626
unparsed.put(key, obj.getClass().getName());
2727
}
@@ -31,7 +31,7 @@ public Map<String, String> extractPlanProduct(TreeNode plan) {
3131

3232
if (unparsed.size() > 0) {
3333
// For now, place what we can't parse here with the types so we're aware of them
34-
args.put("_dd.unparsed", unparsed.toString());
34+
args.put("_dd.unparsed", writeObjectToString(unparsed.toString()));
3535
}
3636
return args;
3737
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.trace.instrumentation.spark;
22

3+
import com.fasterxml.jackson.databind.DeserializationFeature;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import java.io.IOException;
36
import java.util.ArrayList;
47
import java.util.Map;
58
import org.apache.spark.sql.catalyst.plans.QueryPlan;
@@ -8,19 +11,34 @@
811
import scala.collection.Iterable;
912

1013
public abstract class AbstractSparkPlanUtils {
14+
private final ObjectMapper mapper =
15+
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
16+
1117
public abstract Map<String, String> extractPlanProduct(TreeNode node);
1218

13-
public String parsePlanProduct(Object value) {
19+
// Should only call on final values being written to `meta`
20+
public String writeObjectToString(Object value) {
21+
try {
22+
return mapper.writeValueAsString(value);
23+
} catch (IOException e) {
24+
return null;
25+
}
26+
}
27+
28+
// Should really only return valid JSON types (Array, Map, String, Boolean, Number, null)
29+
public Object parsePlanProduct(Object value) {
1430
if (value == null) {
1531
return "null";
1632
} else if (value instanceof Iterable) {
17-
ArrayList<String> list = new ArrayList<>();
33+
ArrayList<Object> list = new ArrayList<>();
1834
((Iterable) value).foreach(item -> list.add(parsePlanProduct(item)));
19-
return "[\"" + String.join("\", \"", list) + "\"]";
35+
return list;
2036
} else if (value instanceof Option) {
2137
return parsePlanProduct(((Option) value).getOrElse(() -> "none"));
2238
} else if (value instanceof QueryPlan) { // Filter out values referencing child nodes
2339
return null;
40+
} else if (value instanceof Boolean || Number.class.isInstance(value)) {
41+
return value;
2442
} else {
2543
return value.toString();
2644
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,16 @@ public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
151151

152152
ByteArrayOutputStream baos = new ByteArrayOutputStream();
153153
try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) {
154-
this.toJson(generator, accumulators);
154+
this.toJson(generator, accumulators, mapper);
155155
} catch (IOException e) {
156156
return null;
157157
}
158158

159159
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
160160
}
161161

162-
private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators)
162+
private void toJson(
163+
JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators, ObjectMapper mapper)
163164
throws IOException {
164165
generator.writeStartObject();
165166
generator.writeStringField("node", plan.nodeName());
@@ -180,18 +181,11 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
180181
generator.writeStartObject();
181182

182183
for (Tuple2<String, String> metadata : JavaConverters.asJavaCollection(plan.metadata())) {
183-
// If it looks like a string array, break apart and write as native JSON array
184-
if (metadata._2.startsWith("[\"") && metadata._2.endsWith("\"]")) {
185-
String[] list = metadata._2.substring(2, metadata._2.length() - 2).split("\", \"");
186-
187-
generator.writeFieldName(metadata._1);
188-
generator.writeStartArray();
189-
for (String entry : list) {
190-
generator.writeString(entry);
191-
}
192-
generator.writeEndArray();
193-
} else {
194-
generator.writeStringField(metadata._1, metadata._2);
184+
generator.writeFieldName(metadata._1);
185+
try {
186+
generator.writeTree(mapper.readTree(metadata._2));
187+
} catch (IOException e) {
188+
generator.writeString(metadata._2);
195189
}
196190
}
197191

@@ -219,7 +213,7 @@ private void toJson(JsonGenerator generator, Map<Long, AccumulatorWithStage> acc
219213
generator.writeFieldName("children");
220214
generator.writeStartArray();
221215
for (SparkPlanInfoForStage child : children) {
222-
child.toJson(generator, accumulators);
216+
child.toJson(generator, accumulators, mapper);
223217
}
224218
generator.writeEndArray();
225219
}

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.spark
22

3+
import com.fasterxml.jackson.databind.ObjectMapper
34
import datadog.trace.agent.test.InstrumentationSpecification
45
import groovy.json.JsonSlurper
56
import org.apache.spark.sql.Dataset
@@ -180,32 +181,21 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
180181
}
181182

182183
private static generateMetaExpectations(Object actual, String name) {
183-
var simpleString = actual["nodeDetailString"]
184-
var values = []
185-
var child = "N/A"
184+
ObjectMapper mapper = new ObjectMapper()
185+
186+
def simpleString = actual["nodeDetailString"]
187+
def child = "N/A"
188+
def values = [:]
189+
186190
actual["meta"].each { key, value ->
187191
if (key == "_dd.unparsed") {
188-
values.add("\"_dd.unparsed\": \"any\"")
192+
values.put("_dd.unparsed", "any")
189193
child = value
190-
} else if (value instanceof List) {
191-
var list = []
192-
value.each { it ->
193-
list.add("\"$it\"")
194-
}
195-
def prettyList = "[\n " + list.join(", \n ") + "\n ]"
196-
if (list.size() == 1) {
197-
prettyList = "[" + list.join(", ") + "]"
198-
}
199-
values.add("\"$key\": $prettyList")
200194
} else {
201-
values.add("\"$key\": \"$value\"")
195+
values.put(key, value)
202196
}
203197
}
204-
values.sort { it }
205-
def prettyValues = "\n\"meta\": {\n " + values.join(", \n ") + "\n},"
206-
if (values.size() == 1) {
207-
prettyValues = "\n\"meta\": {" + values.join(", ") + "},"
208-
}
198+
def prettyValues = "\n\"meta\": " + mapper.writerWithDefaultPrettyPrinter().writeValueAsString(values.sort { it -> it.key }) + ","
209199
System.err.println("$actual.node\n\tname=$name\n\tchild=$child\n\tvalues=$prettyValues\n\tsimpleString=$simpleString")
210200
}
211201

@@ -267,7 +257,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
267257
"sum#16",
268258
"count#17L"
269259
],
270-
"_dd.unknown_key.4": "0",
260+
"_dd.unknown_key.4": 0,
271261
"_dd.unknown_key.5": [
272262
"string_col#0",
273263
"sum#18",
@@ -348,7 +338,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
348338
"_dd.unknown_key.1": ["string_col#0"],
349339
"_dd.unknown_key.2": ["avg(double_col#1)"],
350340
"_dd.unknown_key.3": ["avg(double_col#1)#4"],
351-
"_dd.unknown_key.4": "1",
341+
"_dd.unknown_key.4": 1,
352342
"_dd.unknown_key.5": [
353343
"string_col#0",
354344
"avg(double_col#1)#4 AS avg(double_col)#5"
@@ -552,10 +542,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
552542
"nodeDetailString": "(keys=[], functions=[partial_count(1)])",
553543
"meta": {
554544
"_dd.unknown_key.0": "none",
555-
"_dd.unknown_key.1": [""],
545+
"_dd.unknown_key.1": [],
556546
"_dd.unknown_key.2": ["partial_count(1)"],
557547
"_dd.unknown_key.3": ["count#38L"],
558-
"_dd.unknown_key.4": "0",
548+
"_dd.unknown_key.4": 0,
559549
"_dd.unknown_key.5": ["count#39L"],
560550
"_dd.unparsed": "any"
561551
},
@@ -574,7 +564,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
574564
"node": "Project",
575565
"nodeId": 1355342585,
576566
"meta": {
577-
"_dd.unknown_key.0": [""],
567+
"_dd.unknown_key.0": [],
578568
"_dd.unparsed": "any"
579569
},
580570
"children": [
@@ -618,8 +608,8 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
618608
"nodeDetailString": "[string_col#21 ASC NULLS FIRST], false, 0",
619609
"meta": {
620610
"_dd.unknown_key.0": ["string_col#21 ASC NULLS FIRST"],
621-
"_dd.unknown_key.1": "false",
622-
"_dd.unknown_key.3": "0",
611+
"_dd.unknown_key.1": false,
612+
"_dd.unknown_key.3": 0,
623613
"_dd.unparsed": "any"
624614
},
625615
"metrics": [
@@ -666,8 +656,8 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
666656
"nodeDetailString": "[string_col#25 ASC NULLS FIRST], false, 0",
667657
"meta": {
668658
"_dd.unknown_key.0": ["string_col#25 ASC NULLS FIRST"],
669-
"_dd.unknown_key.1": "false",
670-
"_dd.unknown_key.3": "0",
659+
"_dd.unknown_key.1": false,
660+
"_dd.unknown_key.3": 0,
671661
"_dd.unparsed": "any"
672662
},
673663
"metrics": [
@@ -716,11 +706,11 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification {
716706
"nodeId": 724815342,
717707
"nodeDetailString": "(keys=[], functions=[count(1)])",
718708
"meta": {
719-
"_dd.unknown_key.0": [""],
720-
"_dd.unknown_key.1": [""],
709+
"_dd.unknown_key.0": [],
710+
"_dd.unknown_key.1": [],
721711
"_dd.unknown_key.2": ["count(1)"],
722712
"_dd.unknown_key.3": ["count(1)#35L"],
723-
"_dd.unknown_key.4": "0",
713+
"_dd.unknown_key.4": 0,
724714
"_dd.unknown_key.5": ["count(1)#35L AS count#36L"],
725715
"_dd.unparsed": "any"
726716
},

0 commit comments

Comments
 (0)