Skip to content

Commit 7cd5c4a

Browse files
committed
[SPARK-50641][SQL] Move GetJsonObjectEvaluator to JsonExpressionEvalUtils
### What changes were proposed in this pull request? The pr aims to move `GetJsonObjectEvaluator` to `JsonExpressionEvalUtils`. ### Why are the changes needed? Make code clearly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA - Manually test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49259 from panbingkun/SPARK-50641. Authored-by: panbingkun <[email protected]> Signed-off-by: panbingkun <[email protected]>
1 parent 876450c commit 7cd5c4a

File tree

3 files changed

+301
-308
lines changed

3 files changed

+301
-308
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.fasterxml.jackson.core.JsonParser;
2525
import com.fasterxml.jackson.core.JsonToken;
2626

27-
import org.apache.spark.sql.catalyst.expressions.SharedFactory;
2827
import org.apache.spark.sql.catalyst.json.CreateJacksonParser;
2928
import org.apache.spark.sql.catalyst.util.GenericArrayData;
3029
import org.apache.spark.unsafe.types.UTF8String;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala

Lines changed: 300 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
*/
1717
package org.apache.spark.sql.catalyst.expressions.json
1818

19-
import java.io.{ByteArrayOutputStream, CharArrayWriter}
19+
import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}
20+
21+
import scala.util.parsing.combinator.RegexParsers
2022

2123
import com.fasterxml.jackson.core._
24+
import com.fasterxml.jackson.core.json.JsonReadFeature
2225

2326
import org.apache.spark.SparkException
2427
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow, SharedFactory}
28+
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
2629
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
2730
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonGenerator, JacksonParser, JsonInferSchema, JSONOptions}
2831
import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, MapData, PermissiveMode}
@@ -32,6 +35,79 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, St
3235
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
3336
import org.apache.spark.util.Utils
3437

38+
private[this] sealed trait PathInstruction
39+
private[this] object PathInstruction {
40+
private[expressions] case object Subscript extends PathInstruction
41+
private[expressions] case object Wildcard extends PathInstruction
42+
private[expressions] case object Key extends PathInstruction
43+
private[expressions] case class Index(index: Long) extends PathInstruction
44+
private[expressions] case class Named(name: String) extends PathInstruction
45+
}
46+
47+
private[this] sealed trait WriteStyle
48+
private[this] object WriteStyle {
49+
private[expressions] case object RawStyle extends WriteStyle
50+
private[expressions] case object QuotedStyle extends WriteStyle
51+
private[expressions] case object FlattenStyle extends WriteStyle
52+
}
53+
54+
private[this] object JsonPathParser extends RegexParsers {
55+
import PathInstruction._
56+
57+
def root: Parser[Char] = '$'
58+
59+
def long: Parser[Long] = "\\d+".r ^? {
60+
case x => x.toLong
61+
}
62+
63+
// parse `[*]` and `[123]` subscripts
64+
def subscript: Parser[List[PathInstruction]] =
65+
for {
66+
operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']'
67+
} yield {
68+
Subscript :: operand :: Nil
69+
}
70+
71+
// parse `.name` or `['name']` child expressions
72+
def named: Parser[List[PathInstruction]] =
73+
for {
74+
name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\']+".r <~ "']"
75+
} yield {
76+
Key :: Named(name) :: Nil
77+
}
78+
79+
// child wildcards: `..`, `.*` or `['*']`
80+
def wildcard: Parser[List[PathInstruction]] =
81+
(".*" | "['*']") ^^^ List(Wildcard)
82+
83+
def node: Parser[List[PathInstruction]] =
84+
wildcard |
85+
named |
86+
subscript
87+
88+
val expression: Parser[List[PathInstruction]] = {
89+
phrase(root ~> rep(node) ^^ (x => x.flatten))
90+
}
91+
92+
def parse(str: String): Option[List[PathInstruction]] = {
93+
this.parseAll(expression, str) match {
94+
case Success(result, _) =>
95+
Some(result)
96+
97+
case _ =>
98+
None
99+
}
100+
}
101+
}
102+
103+
private[this] object SharedFactory {
104+
val jsonFactory: JsonFactory = new JsonFactoryBuilder()
105+
// The two options below enabled for Hive compatibility
106+
.enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS)
107+
.enable(JsonReadFeature.ALLOW_SINGLE_QUOTES)
108+
.build()
109+
}
110+
35111
case class JsonToStructsEvaluator(
36112
options: Map[String, String],
37113
nullableSchema: DataType,
@@ -278,3 +354,225 @@ case class JsonTupleEvaluator(foldableFieldNames: Array[Option[String]]) {
278354
}
279355
}
280356
}
357+
358+
/**
359+
* The expression `GetJsonObject` will utilize it to support codegen.
360+
*/
361+
case class GetJsonObjectEvaluator(cachedPath: UTF8String) {
362+
import com.fasterxml.jackson.core.JsonToken._
363+
import PathInstruction._
364+
import SharedFactory._
365+
import WriteStyle._
366+
367+
def this() = this(null)
368+
369+
@transient
370+
private lazy val parsedPath: Option[List[PathInstruction]] = parsePath(cachedPath)
371+
372+
@transient
373+
private var jsonStr: UTF8String = _
374+
375+
@transient
376+
private var pathStr: UTF8String = _
377+
378+
def setJson(arg: UTF8String): Unit = {
379+
jsonStr = arg
380+
}
381+
382+
def setPath(arg: UTF8String): Unit = {
383+
pathStr = arg
384+
}
385+
386+
def evaluate(): Any = {
387+
if (jsonStr == null) return null
388+
389+
val parsed = if (cachedPath != null) {
390+
parsedPath
391+
} else {
392+
parsePath(pathStr)
393+
}
394+
395+
if (parsed.isDefined) {
396+
try {
397+
/* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
398+
detect character encoding which could fail for some malformed strings */
399+
Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser =>
400+
val output = new ByteArrayOutputStream()
401+
val matched = Utils.tryWithResource(
402+
jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
403+
parser.nextToken()
404+
evaluatePath(parser, generator, RawStyle, parsed.get)
405+
}
406+
if (matched) {
407+
UTF8String.fromBytes(output.toByteArray)
408+
} else {
409+
null
410+
}
411+
}
412+
} catch {
413+
case _: JsonProcessingException => null
414+
}
415+
} else {
416+
null
417+
}
418+
}
419+
420+
private def parsePath(path: UTF8String): Option[List[PathInstruction]] = {
421+
if (path != null) {
422+
JsonPathParser.parse(path.toString)
423+
} else {
424+
None
425+
}
426+
}
427+
428+
// advance to the desired array index, assumes to start at the START_ARRAY token
429+
private def arrayIndex(p: JsonParser, f: () => Boolean): Long => Boolean = {
430+
case _ if p.getCurrentToken == END_ARRAY =>
431+
// terminate, nothing has been written
432+
false
433+
434+
case 0 =>
435+
// we've reached the desired index
436+
val dirty = f()
437+
438+
while (p.nextToken() != END_ARRAY) {
439+
// advance the token stream to the end of the array
440+
p.skipChildren()
441+
}
442+
443+
dirty
444+
445+
case i if i > 0 =>
446+
// skip this token and evaluate the next
447+
p.skipChildren()
448+
p.nextToken()
449+
arrayIndex(p, f)(i - 1)
450+
}
451+
452+
/**
453+
* Evaluate a list of JsonPath instructions, returning a bool that indicates if any leaf nodes
454+
* have been written to the generator
455+
*/
456+
private def evaluatePath(
457+
p: JsonParser,
458+
g: JsonGenerator,
459+
style: WriteStyle,
460+
path: List[PathInstruction]): Boolean = {
461+
(p.getCurrentToken, path) match {
462+
case (VALUE_STRING, Nil) if style == RawStyle =>
463+
// there is no array wildcard or slice parent, emit this string without quotes
464+
if (p.hasTextCharacters) {
465+
g.writeRaw(p.getTextCharacters, p.getTextOffset, p.getTextLength)
466+
} else {
467+
g.writeRaw(p.getText)
468+
}
469+
true
470+
471+
case (START_ARRAY, Nil) if style == FlattenStyle =>
472+
// flatten this array into the parent
473+
var dirty = false
474+
while (p.nextToken() != END_ARRAY) {
475+
dirty |= evaluatePath(p, g, style, Nil)
476+
}
477+
dirty
478+
479+
case (_, Nil) =>
480+
// general case: just copy the child tree verbatim
481+
g.copyCurrentStructure(p)
482+
true
483+
484+
case (START_OBJECT, Key :: xs) =>
485+
var dirty = false
486+
while (p.nextToken() != END_OBJECT) {
487+
if (dirty) {
488+
// once a match has been found we can skip other fields
489+
p.skipChildren()
490+
} else {
491+
dirty = evaluatePath(p, g, style, xs)
492+
}
493+
}
494+
dirty
495+
496+
case (START_ARRAY, Subscript :: Wildcard :: Subscript :: Wildcard :: xs) =>
497+
// special handling for the non-structure preserving double wildcard behavior in Hive
498+
var dirty = false
499+
g.writeStartArray()
500+
while (p.nextToken() != END_ARRAY) {
501+
dirty |= evaluatePath(p, g, FlattenStyle, xs)
502+
}
503+
g.writeEndArray()
504+
dirty
505+
506+
case (START_ARRAY, Subscript :: Wildcard :: xs) if style != QuotedStyle =>
507+
// retain Flatten, otherwise use Quoted... cannot use Raw within an array
508+
val nextStyle = style match {
509+
case RawStyle => QuotedStyle
510+
case FlattenStyle => FlattenStyle
511+
case QuotedStyle => throw SparkException.internalError("Unexpected the quoted style.")
512+
}
513+
514+
// temporarily buffer child matches, the emitted json will need to be
515+
// modified slightly if there is only a single element written
516+
val buffer = new StringWriter()
517+
518+
var dirty = 0
519+
Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>
520+
flattenGenerator.writeStartArray()
521+
522+
while (p.nextToken() != END_ARRAY) {
523+
// track the number of array elements and only emit an outer array if
524+
// we've written more than one element, this matches Hive's behavior
525+
dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
526+
}
527+
flattenGenerator.writeEndArray()
528+
}
529+
530+
val buf = buffer.getBuffer
531+
if (dirty > 1) {
532+
g.writeRawValue(buf.toString)
533+
} else if (dirty == 1) {
534+
// remove outer array tokens
535+
g.writeRawValue(buf.substring(1, buf.length() - 1))
536+
} // else do not write anything
537+
538+
dirty > 0
539+
540+
case (START_ARRAY, Subscript :: Wildcard :: xs) =>
541+
var dirty = false
542+
g.writeStartArray()
543+
while (p.nextToken() != END_ARRAY) {
544+
// wildcards can have multiple matches, continually update the dirty count
545+
dirty |= evaluatePath(p, g, QuotedStyle, xs)
546+
}
547+
g.writeEndArray()
548+
549+
dirty
550+
551+
case (START_ARRAY, Subscript :: Index(idx) :: (xs@Subscript :: Wildcard :: _)) =>
552+
p.nextToken()
553+
// we're going to have 1 or more results, switch to QuotedStyle
554+
arrayIndex(p, () => evaluatePath(p, g, QuotedStyle, xs))(idx)
555+
556+
case (START_ARRAY, Subscript :: Index(idx) :: xs) =>
557+
p.nextToken()
558+
arrayIndex(p, () => evaluatePath(p, g, style, xs))(idx)
559+
560+
case (FIELD_NAME, Named(name) :: xs) if p.currentName == name =>
561+
// exact field match
562+
if (p.nextToken() != JsonToken.VALUE_NULL) {
563+
evaluatePath(p, g, style, xs)
564+
} else {
565+
false
566+
}
567+
568+
case (FIELD_NAME, Wildcard :: xs) =>
569+
// wildcard field match
570+
p.nextToken()
571+
evaluatePath(p, g, style, xs)
572+
573+
case _ =>
574+
p.skipChildren()
575+
false
576+
}
577+
}
578+
}

0 commit comments

Comments
 (0)