Skip to content

Commit 1007cae

Browse files
MaxGekkHyukjinKwon
authored andcommitted
[SPARK-25447][SQL] Support JSON options by schema_of_json()
## What changes were proposed in this pull request? In the PR, I propose to extended the `schema_of_json()` function, and accept JSON options since they can impact on schema inferring. Purpose is to support the same options that `from_json` can use during schema inferring. ## How was this patch tested? Added SQL, Python and Scala tests (`JsonExpressionsSuite` and `JsonFunctionsSuite`) that checks JSON options are used. Closes apache#22442 from MaxGekk/schema_of_json-options. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 1e43783 commit 1007cae

File tree

7 files changed

+85
-11
lines changed

7 files changed

+85
-11
lines changed

python/pyspark/sql/functions.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,11 +2348,15 @@ def to_json(col, options={}):
23482348

23492349
@ignore_unicode_prefix
23502350
@since(2.4)
2351-
def schema_of_json(col):
2351+
def schema_of_json(col, options={}):
23522352
"""
23532353
Parses a column containing a JSON string and infers its schema in DDL format.
23542354
23552355
:param col: string column in json format
2356+
:param options: options to control parsing. accepts the same options as the JSON datasource
2357+
2358+
.. versionchanged:: 2.5
2359+
It accepts `options` parameter to control schema inferring.
23562360
23572361
>>> from pyspark.sql.types import *
23582362
>>> data = [(1, '{"a": 1}')]
@@ -2361,10 +2365,13 @@ def schema_of_json(col):
23612365
[Row(json=u'struct<a:bigint>')]
23622366
>>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
23632367
[Row(json=u'struct<a:bigint>')]
2368+
>>> schema = schema_of_json(lit('{a: 1}'), {'allowUnquotedFieldNames':'true'})
2369+
>>> df.select(schema.alias("json")).collect()
2370+
[Row(json=u'struct<a:bigint>')]
23642371
"""
23652372

23662373
sc = SparkContext._active_spark_context
2367-
jc = sc._jvm.functions.schema_of_json(_to_java_column(col))
2374+
jc = sc._jvm.functions.schema_of_json(_to_java_column(col), options)
23682375
return Column(jc)
23692376

23702377

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -740,15 +740,31 @@ case class StructsToJson(
740740
examples = """
741741
Examples:
742742
> SELECT _FUNC_('[{"col":0}]');
743-
array<struct<col:int>>
743+
array<struct<col:bigint>>
744+
> SELECT _FUNC_('[{"col":01}]', map('allowNumericLeadingZeros', 'true'));
745+
array<struct<col:bigint>>
744746
""",
745747
since = "2.4.0")
746-
case class SchemaOfJson(child: Expression)
748+
case class SchemaOfJson(
749+
child: Expression,
750+
options: Map[String, String])
747751
extends UnaryExpression with String2StringExpression with CodegenFallback {
748752

749-
private val jsonOptions = new JSONOptions(Map.empty, "UTC")
750-
private val jsonFactory = new JsonFactory()
751-
jsonOptions.setJacksonOptions(jsonFactory)
753+
def this(child: Expression) = this(child, Map.empty[String, String])
754+
755+
def this(child: Expression, options: Expression) = this(
756+
child = child,
757+
options = JsonExprUtils.convertToMapData(options))
758+
759+
@transient
760+
private lazy val jsonOptions = new JSONOptions(options, "UTC")
761+
762+
@transient
763+
private lazy val jsonFactory = {
764+
val factory = new JsonFactory()
765+
jsonOptions.setJacksonOptions(factory)
766+
factory
767+
}
752768

753769
override def convert(v: UTF8String): UTF8String = {
754770
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser =>
@@ -764,7 +780,7 @@ object JsonExprUtils {
764780

765781
def evalSchemaExpr(exp: Expression): DataType = exp match {
766782
case Literal(s, StringType) => DataType.fromDDL(s.toString)
767-
case e @ SchemaOfJson(_: Literal) =>
783+
case e @ SchemaOfJson(_: Literal, _) =>
768784
val ddlSchema = e.eval().asInstanceOf[UTF8String]
769785
DataType.fromDDL(ddlSchema.toString)
770786
case e => throw new AnalysisException(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -707,9 +707,17 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
707707
}
708708

709709
test("SPARK-24709: infer schema of json strings") {
710-
checkEvaluation(SchemaOfJson(Literal.create("""{"col":0}""")), "struct<col:bigint>")
710+
checkEvaluation(new SchemaOfJson(Literal.create("""{"col":0}""")),
711+
"struct<col:bigint>")
711712
checkEvaluation(
712-
SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")),
713+
new SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")),
713714
"struct<col0:array<string>,col1:struct<col2:string>>")
714715
}
716+
717+
test("infer schema of JSON strings by using options") {
718+
checkEvaluation(
719+
new SchemaOfJson(Literal.create("""{"col":01}"""),
720+
CreateMap(Seq(Literal.create("allowNumericLeadingZeros"), Literal.create("true")))),
721+
"struct<col:bigint>")
722+
}
715723
}

sql/core/src/main/scala/org/apache/spark/sql/functions.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3611,6 +3611,21 @@ object functions {
36113611
*/
36123612
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))
36133613

3614+
/**
3615+
* Parses a column containing a JSON string and infers its schema using options.
3616+
*
3617+
* @param e a string column containing JSON data.
3618+
* @param options options to control how the json is parsed. accepts the same options and the
3619+
* json data source. See [[DataFrameReader#json]].
3620+
* @return a column with string literal containing schema in DDL format.
3621+
*
3622+
* @group collection_funcs
3623+
* @since 2.5.0
3624+
*/
3625+
def schema_of_json(e: Column, options: java.util.Map[String, String]): Column = {
3626+
withExpr(SchemaOfJson(e.expr, options.asScala.toMap))
3627+
}
3628+
36143629
/**
36153630
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or
36163631
* a `MapType` into a JSON string with the specified schema.

sql/core/src/test/resources/sql-tests/inputs/json-functions.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,7 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');
5656
select to_json(array('1', '2', '3'));
5757
select to_json(array(array(1, 2, 3), array(4)));
5858

59+
-- infer schema of json literal using options
60+
select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'));
61+
select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'true', 'prefersDecimal', 'true'));
62+

sql/core/src/test/resources/sql-tests/results/json-functions.sql.out

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 40
2+
-- Number of queries: 42
33

44

55
-- !query 0
@@ -370,3 +370,19 @@ select to_json(array(array(1, 2, 3), array(4)))
370370
struct<structstojson(array(array(1, 2, 3), array(4))):string>
371371
-- !query 39 output
372372
[[1,2,3],[4]]
373+
374+
375+
-- !query 40
376+
select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'))
377+
-- !query 40 schema
378+
struct<schemaofjson({"c1":1}):string>
379+
-- !query 40 output
380+
struct<c1:string>
381+
382+
383+
-- !query 41
384+
select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'true', 'prefersDecimal', 'true'))
385+
-- !query 41 schema
386+
struct<schemaofjson({"c1":01, "c2":0.1}):string>
387+
-- !query 41 output
388+
struct<c1:bigint,c2:decimal(1,1)>

sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import collection.JavaConverters._
21+
2022
import org.apache.spark.sql.functions._
2123
import org.apache.spark.sql.test.SharedSQLContext
2224
import org.apache.spark.sql.types._
@@ -402,6 +404,12 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
402404
assert(out.schema == expected)
403405
}
404406

407+
test("infers schemas using options") {
408+
val df = spark.range(1)
409+
.select(schema_of_json(lit("{a:1}"), Map("allowUnquotedFieldNames" -> "true").asJava))
410+
checkAnswer(df, Seq(Row("struct<a:bigint>")))
411+
}
412+
405413
test("from_json - array of primitive types") {
406414
val df = Seq("[1, 2, 3]").toDF("a")
407415
val schema = new ArrayType(IntegerType, false)

0 commit comments

Comments
 (0)