Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 371e4e2

Browse files
goldmedalHyukjinKwon
authored andcommitted
[SPARK-21513][SQL] Allow UDF to_json support converting MapType to json
# What changes were proposed in this pull request? UDF to_json only supports converting `StructType` or `ArrayType` of `StructType`s to a json output string now. According to the discussion of JIRA SPARK-21513, I allow to `to_json` support converting `MapType` and `ArrayType` of `MapType`s to a json output string. This PR is for SQL and Scala API only. # How was this patch tested? Adding unit test case. cc viirya HyukjinKwon Author: goldmedal <[email protected]> Author: Jia-Xuan Liu <[email protected]> Closes apache#18875 from goldmedal/SPARK-21513.
1 parent 1a98574 commit 371e4e2

File tree

8 files changed

+378
-69
lines changed

8 files changed

+378
-69
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2929
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
3030
import org.apache.spark.sql.catalyst.json._
3131
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
32-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData}
32+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData, MapData}
3333
import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
3535
import org.apache.spark.util.Utils
@@ -604,7 +604,8 @@ case class JsonToStructs(
604604
}
605605

606606
/**
607-
* Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string.
607+
* Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]]
608+
* or [[ArrayType]] of [[MapType]]s to a json output string.
608609
*/
609610
// scalastyle:off line.size.limit
610611
@ExpressionDescription(
@@ -617,6 +618,14 @@ case class JsonToStructs(
617618
{"time":"26/08/2015"}
618619
> SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
619620
[{"a":1,"b":2}]
621+
> SELECT _FUNC_(map('a',named_struct('b',1)));
622+
{"a":{"b":1}}
623+
> SELECT _FUNC_(map(named_struct('a',1),named_struct('b',2)));
624+
{"[1]":{"b":2}}
625+
> SELECT _FUNC_(map('a',1));
626+
{"a":1}
627+
> SELECT _FUNC_(array((map('a',1))));
628+
[{"a":1}]
620629
""",
621630
since = "2.2.0")
622631
// scalastyle:on line.size.limit
@@ -648,6 +657,8 @@ case class StructsToJson(
648657
lazy val rowSchema = child.dataType match {
649658
case st: StructType => st
650659
case ArrayType(st: StructType, _) => st
660+
case mt: MapType => mt
661+
case ArrayType(mt: MapType, _) => mt
651662
}
652663

653664
// This converts rows to the JSON output according to the given schema.
@@ -669,6 +680,14 @@ case class StructsToJson(
669680
(arr: Any) =>
670681
gen.write(arr.asInstanceOf[ArrayData])
671682
getAndReset()
683+
case _: MapType =>
684+
(map: Any) =>
685+
gen.write(map.asInstanceOf[MapData])
686+
getAndReset()
687+
case ArrayType(_: MapType, _) =>
688+
(arr: Any) =>
689+
gen.write(arr.asInstanceOf[ArrayData])
690+
getAndReset()
672691
}
673692
}
674693

@@ -677,14 +696,25 @@ case class StructsToJson(
677696
override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
678697
case _: StructType | ArrayType(_: StructType, _) =>
679698
try {
680-
JacksonUtils.verifySchema(rowSchema)
699+
JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType])
700+
TypeCheckResult.TypeCheckSuccess
701+
} catch {
702+
case e: UnsupportedOperationException =>
703+
TypeCheckResult.TypeCheckFailure(e.getMessage)
704+
}
705+
case _: MapType | ArrayType(_: MapType, _) =>
706+
// TODO: let `JacksonUtils.verifySchema` verify a `MapType`
707+
try {
708+
val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil)
709+
JacksonUtils.verifySchema(st)
681710
TypeCheckResult.TypeCheckSuccess
682711
} catch {
683712
case e: UnsupportedOperationException =>
684713
TypeCheckResult.TypeCheckFailure(e.getMessage)
685714
}
686715
case _ => TypeCheckResult.TypeCheckFailure(
687-
s"Input type ${child.dataType.simpleString} must be a struct or array of structs.")
716+
s"Input type ${child.dataType.simpleString} must be a struct, array of structs or " +
717+
"a map or array of map.")
688718
}
689719

690720
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,50 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
2626
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
2727
import org.apache.spark.sql.types._
2828

29+
/**
30+
* `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
31+
* Once it is initialized with `StructType`, it can be used to write out a struct or an array of
32+
* struct. Once it is initialized with `MapType`, it can be used to write out a map or an array
33+
* of map. An exception will be thrown if trying to write out a struct if it is initialized with
34+
* a `MapType`, and vice verse.
35+
*/
2936
private[sql] class JacksonGenerator(
30-
schema: StructType,
37+
dataType: DataType,
3138
writer: Writer,
3239
options: JSONOptions) {
3340
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
3441
// JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
3542
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
3643
private type ValueWriter = (SpecializedGetters, Int) => Unit
3744

45+
// `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
46+
require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType],
47+
"JacksonGenerator only supports to be initialized with a StructType " +
48+
s"or MapType but got ${dataType.simpleString}")
49+
3850
// `ValueWriter`s for all fields of the schema
39-
private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
51+
private lazy val rootFieldWriters: Array[ValueWriter] = dataType match {
52+
case st: StructType => st.map(_.dataType).map(makeWriter).toArray
53+
case _ => throw new UnsupportedOperationException(
54+
s"Initial type ${dataType.simpleString} must be a struct")
55+
}
56+
4057
// `ValueWriter` for array data storing rows of the schema.
41-
private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => {
42-
writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters))
58+
private lazy val arrElementWriter: ValueWriter = dataType match {
59+
case st: StructType =>
60+
(arr: SpecializedGetters, i: Int) => {
61+
writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters))
62+
}
63+
case mt: MapType =>
64+
(arr: SpecializedGetters, i: Int) => {
65+
writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter))
66+
}
67+
}
68+
69+
private lazy val mapElementWriter: ValueWriter = dataType match {
70+
case mt: MapType => makeWriter(mt.valueType)
71+
case _ => throw new UnsupportedOperationException(
72+
s"Initial type ${dataType.simpleString} must be a map")
4373
}
4474

4575
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
@@ -189,18 +219,37 @@ private[sql] class JacksonGenerator(
189219
def flush(): Unit = gen.flush()
190220

191221
/**
192-
* Transforms a single `InternalRow` to JSON object using Jackson
222+
* Transforms a single `InternalRow` to JSON object using Jackson.
223+
* This api calling will be validated through accessing `rootFieldWriters`.
193224
*
194225
* @param row The row to convert
195226
*/
196-
def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters))
227+
def write(row: InternalRow): Unit = {
228+
writeObject(writeFields(
229+
fieldWriters = rootFieldWriters,
230+
row = row,
231+
schema = dataType.asInstanceOf[StructType]))
232+
}
197233

198234
/**
199-
* Transforms multiple `InternalRow`s to JSON array using Jackson
235+
* Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson
200236
*
201-
* @param array The array of rows to convert
237+
* @param array The array of rows or maps to convert
202238
*/
203239
def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter))
204240

241+
/**
242+
* Transforms a single `MapData` to JSON object using Jackson
243+
* This api calling will will be validated through accessing `mapElementWriter`.
244+
*
245+
* @param map a map to convert
246+
*/
247+
def write(map: MapData): Unit = {
248+
writeObject(writeMapData(
249+
fieldWriter = mapElementWriter,
250+
map = map,
251+
mapType = dataType.asInstanceOf[MapType]))
252+
}
253+
205254
def writeLineEnding(): Unit = gen.writeRaw('\n')
206255
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.Calendar
2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.errors.TreeNodeException
25-
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode}
25+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode}
2626
import org.apache.spark.sql.types._
2727
import org.apache.spark.unsafe.types.UTF8String
2828

@@ -612,6 +612,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
612612
)
613613
}
614614

615+
test("SPARK-21513: to_json support map[string, struct] to json") {
616+
val schema = MapType(StringType, StructType(StructField("a", IntegerType) :: Nil))
617+
val input = Literal.create(ArrayBasedMapData(Map("test" -> InternalRow(1))), schema)
618+
checkEvaluation(
619+
StructsToJson(Map.empty, input),
620+
"""{"test":{"a":1}}"""
621+
)
622+
}
623+
624+
test("SPARK-21513: to_json support map[struct, struct] to json") {
625+
val schema = MapType(StructType(StructField("a", IntegerType) :: Nil),
626+
StructType(StructField("b", IntegerType) :: Nil))
627+
val input = Literal.create(ArrayBasedMapData(Map(InternalRow(1) -> InternalRow(2))), schema)
628+
checkEvaluation(
629+
StructsToJson(Map.empty, input),
630+
"""{"[1]":{"b":2}}"""
631+
)
632+
}
633+
634+
test("SPARK-21513: to_json support map[string, integer] to json") {
635+
val schema = MapType(StringType, IntegerType)
636+
val input = Literal.create(ArrayBasedMapData(Map("a" -> 1)), schema)
637+
checkEvaluation(
638+
StructsToJson(Map.empty, input),
639+
"""{"a":1}"""
640+
)
641+
}
642+
643+
test("to_json - array with maps") {
644+
val inputSchema = ArrayType(MapType(StringType, IntegerType))
645+
val input = new GenericArrayData(ArrayBasedMapData(
646+
Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil)
647+
val output = """[{"a":1},{"b":2}]"""
648+
checkEvaluation(
649+
StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
650+
output)
651+
}
652+
653+
test("to_json - array with single map") {
654+
val inputSchema = ArrayType(MapType(StringType, IntegerType))
655+
val input = new GenericArrayData(ArrayBasedMapData(Map("a" -> 1)) :: Nil)
656+
val output = """[{"a":1}]"""
657+
checkEvaluation(
658+
StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
659+
output)
660+
}
661+
615662
test("to_json: verify MapType's value type instead of key type") {
616663
// Keys in map are treated as strings when converting to JSON. The type doesn't matter at all.
617664
val mapType1 = MapType(CalendarIntervalType, IntegerType)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.json
19+
20+
import java.io.CharArrayWriter
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
25+
import org.apache.spark.sql.types._
26+
27+
class JacksonGeneratorSuite extends SparkFunSuite {
28+
29+
val gmtId = DateTimeUtils.TimeZoneGMT.getID
30+
val option = new JSONOptions(Map.empty, gmtId)
31+
32+
test("initial with StructType and write out a row") {
33+
val dataType = StructType(StructField("a", IntegerType) :: Nil)
34+
val input = InternalRow(1)
35+
val writer = new CharArrayWriter()
36+
val gen = new JacksonGenerator(dataType, writer, option)
37+
gen.write(input)
38+
gen.flush()
39+
assert(writer.toString === """{"a":1}""")
40+
}
41+
42+
test("initial with StructType and write out rows") {
43+
val dataType = StructType(StructField("a", IntegerType) :: Nil)
44+
val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
45+
val writer = new CharArrayWriter()
46+
val gen = new JacksonGenerator(dataType, writer, option)
47+
gen.write(input)
48+
gen.flush()
49+
assert(writer.toString === """[{"a":1},{"a":2}]""")
50+
}
51+
52+
test("initial with StructType and write out an array with single empty row") {
53+
val dataType = StructType(StructField("a", IntegerType) :: Nil)
54+
val input = new GenericArrayData(InternalRow(null) :: Nil)
55+
val writer = new CharArrayWriter()
56+
val gen = new JacksonGenerator(dataType, writer, option)
57+
gen.write(input)
58+
gen.flush()
59+
assert(writer.toString === """[{}]""")
60+
}
61+
62+
test("initial with StructType and write out an empty array") {
63+
val dataType = StructType(StructField("a", IntegerType) :: Nil)
64+
val input = new GenericArrayData(Nil)
65+
val writer = new CharArrayWriter()
66+
val gen = new JacksonGenerator(dataType, writer, option)
67+
gen.write(input)
68+
gen.flush()
69+
assert(writer.toString === """[]""")
70+
}
71+
72+
test("initial with Map and write out a map data") {
73+
val dataType = MapType(StringType, IntegerType)
74+
val input = ArrayBasedMapData(Map("a" -> 1))
75+
val writer = new CharArrayWriter()
76+
val gen = new JacksonGenerator(dataType, writer, option)
77+
gen.write(input)
78+
gen.flush()
79+
assert(writer.toString === """{"a":1}""")
80+
}
81+
82+
test("initial with Map and write out an array of maps") {
83+
val dataType = MapType(StringType, IntegerType)
84+
val input = new GenericArrayData(
85+
ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil)
86+
val writer = new CharArrayWriter()
87+
val gen = new JacksonGenerator(dataType, writer, option)
88+
gen.write(input)
89+
gen.flush()
90+
assert(writer.toString === """[{"a":1},{"b":2}]""")
91+
}
92+
93+
test("error handling: initial with StructType but error calling write a map") {
94+
val dataType = StructType(StructField("a", IntegerType) :: Nil)
95+
val input = ArrayBasedMapData(Map("a" -> 1))
96+
val writer = new CharArrayWriter()
97+
val gen = new JacksonGenerator(dataType, writer, option)
98+
intercept[UnsupportedOperationException] {
99+
gen.write(input)
100+
}
101+
}
102+
103+
test("error handling: initial with MapType and write out a row") {
104+
val dataType = MapType(StringType, IntegerType)
105+
val input = InternalRow(1)
106+
val writer = new CharArrayWriter()
107+
val gen = new JacksonGenerator(dataType, writer, option)
108+
intercept[UnsupportedOperationException] {
109+
gen.write(input)
110+
}
111+
}
112+
113+
}

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3119,9 +3119,9 @@ object functions {
31193119
}
31203120

31213121
/**
3122-
* (Scala-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
3123-
* into a JSON string with the specified schema. Throws an exception, in the case of an
3124-
* unsupported type.
3122+
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
3123+
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
3124+
* Throws an exception, in the case of an unsupported type.
31253125
*
31263126
* @param e a column containing a struct or array of the structs.
31273127
* @param options options to control how the struct column is converted into a json string.
@@ -3135,9 +3135,9 @@ object functions {
31353135
}
31363136

31373137
/**
3138-
* (Java-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
3139-
* into a JSON string with the specified schema. Throws an exception, in the case of an
3140-
* unsupported type.
3138+
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
3139+
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
3140+
* Throws an exception, in the case of an unsupported type.
31413141
*
31423142
* @param e a column containing a struct or array of the structs.
31433143
* @param options options to control how the struct column is converted into a json string.
@@ -3150,8 +3150,9 @@ object functions {
31503150
to_json(e, options.asScala.toMap)
31513151

31523152
/**
3153-
* Converts a column containing a `StructType` or `ArrayType` of `StructType`s into a JSON string
3154-
* with the specified schema. Throws an exception, in the case of an unsupported type.
3153+
* Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
3154+
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
3155+
* Throws an exception, in the case of an unsupported type.
31553156
*
31563157
* @param e a column containing a struct or array of the structs.
31573158
*

0 commit comments

Comments
 (0)