Skip to content

Commit edec461

Browse files
Chore: to_json unit/benchmark tests (apache#3011)
1 parent 069681a commit edec461

File tree

4 files changed

+123
-30
lines changed

4 files changed

+123
-30
lines changed

spark/src/main/scala/org/apache/comet/serde/structs.scala

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,6 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
111111
withInfo(expr, "StructsToJson with options is not supported")
112112
None
113113
} else {
114-
115-
def isSupportedType(dt: DataType): Boolean = {
116-
dt match {
117-
case StructType(fields) =>
118-
fields.forall(f => isSupportedType(f.dataType))
119-
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
120-
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
121-
DataTypes.DoubleType | DataTypes.StringType =>
122-
true
123-
case DataTypes.DateType | DataTypes.TimestampType =>
124-
// TODO implement these types with tests for formatting options and timezone
125-
false
126-
case _: MapType | _: ArrayType =>
127-
// Spark supports map and array in StructsToJson but this is not yet
128-
// implemented in Comet
129-
false
130-
case _ => false
131-
}
132-
}
133-
134114
val isSupported = expr.child.dataType match {
135115
case s: StructType =>
136116
s.fields.forall(f => isSupportedType(f.dataType))
@@ -166,6 +146,25 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
166146
}
167147
}
168148
}
149+
150+
def isSupportedType(dt: DataType): Boolean = {
151+
dt match {
152+
case StructType(fields) =>
153+
fields.forall(f => isSupportedType(f.dataType))
154+
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
155+
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
156+
DataTypes.DoubleType | DataTypes.StringType =>
157+
true
158+
case DataTypes.DateType | DataTypes.TimestampType =>
159+
// TODO implement these types with tests for formatting options and timezone
160+
false
161+
case _: MapType | _: ArrayType =>
162+
// Spark supports map and array in StructsToJson but this is not yet
163+
// implemented in Comet
164+
false
165+
case _ => false
166+
}
167+
}
169168
}
170169

171170
object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {

spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ object FuzzDataGenerator {
229229
Range(0, numRows).map(_ => {
230230
r.nextInt(20) match {
231231
case 0 if options.allowNull => null
232-
case 1 => Float.NegativeInfinity
233-
case 2 => Float.PositiveInfinity
232+
case 1 if options.generateInfinity => Float.NegativeInfinity
233+
case 2 if options.generateInfinity => Float.PositiveInfinity
234234
case 3 => Float.MinValue
235235
case 4 => Float.MaxValue
236236
case 5 => 0.0f
@@ -243,8 +243,8 @@ object FuzzDataGenerator {
243243
Range(0, numRows).map(_ => {
244244
r.nextInt(20) match {
245245
case 0 if options.allowNull => null
246-
case 1 => Double.NegativeInfinity
247-
case 2 => Double.PositiveInfinity
246+
case 1 if options.generateInfinity => Double.NegativeInfinity
247+
case 2 if options.generateInfinity => Double.PositiveInfinity
248248
case 3 => Double.MinValue
249249
case 4 => Double.MaxValue
250250
case 5 => 0.0
@@ -329,4 +329,5 @@ case class DataGenOptions(
329329
generateNaN: Boolean = true,
330330
baseDate: Long = FuzzDataGenerator.defaultBaseDate,
331331
customStrings: Seq[String] = Seq.empty,
332-
maxStringLength: Int = 8)
332+
maxStringLength: Int = 8,
333+
generateInfinity: Boolean = true)

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,59 @@
1919

2020
package org.apache.comet
2121

22+
import scala.util.Random
23+
2224
import org.scalactic.source.Position
2325
import org.scalatest.Tag
2426

27+
import org.apache.hadoop.fs.Path
2528
import org.apache.spark.sql.CometTestBase
26-
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
29+
import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
2730
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
31+
import org.apache.spark.sql.functions._
32+
33+
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
34+
import org.apache.comet.serde.CometStructsToJson
35+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
2836

2937
class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
3038

3139
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
3240
pos: Position): Unit = {
3341
super.test(testName, testTags: _*) {
34-
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true") {
42+
withSQLConf(
43+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
44+
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") {
3545
testFun
3646
}
3747
}
3848
}
3949

50+
test("to_json - all supported types") {
51+
assume(!isSpark40Plus)
52+
withTempDir { dir =>
53+
val path = new Path(dir.toURI.toString, "test.parquet")
54+
val filename = path.toString
55+
val random = new Random(42)
56+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
57+
ParquetGenerator.makeParquetFile(
58+
random,
59+
spark,
60+
filename,
61+
100,
62+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
63+
DataGenOptions(generateNaN = false, generateInfinity = false))
64+
}
65+
val table = spark.read.parquet(filename)
66+
val fieldsNames = table.schema.fields
67+
.filter(sf => CometStructsToJson.isSupportedType(sf.dataType))
68+
.map(sf => col(sf.name))
69+
.toSeq
70+
val df = table.select(to_json(struct(fieldsNames: _*)))
71+
checkSparkAnswerAndOperator(df)
72+
}
73+
}
74+
4075
test("from_json - basic primitives") {
4176
Seq(true, false).foreach { dictionaryEnabled =>
4277
withParquetTable(

spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
22+
import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
2323

2424
import org.apache.comet.CometConf
2525

@@ -106,6 +106,44 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
106106
FROM $tbl
107107
""")
108108

109+
case "to_json - simple primitives" =>
110+
spark.sql(
111+
s"""SELECT named_struct("a", CAST(value AS INT), "b", concat("str_", CAST(value AS STRING))) AS json_struct FROM $tbl""")
112+
113+
case "to_json - all primitive types" =>
114+
spark.sql(s"""
115+
SELECT named_struct(
116+
"i32", CAST(value % 1000 AS INT),
117+
"i64", CAST(value * 1000000000L AS LONG),
118+
"f32", CAST(value * 1.5 AS FLOAT),
119+
"f64", CAST(value * 2.5 AS DOUBLE),
120+
"bool", CASE WHEN value % 2 = 0 THEN true ELSE false END,
121+
"str", concat("value_", CAST(value AS STRING))
122+
) AS json_struct FROM $tbl
123+
""")
124+
125+
case "to_json - with nulls" =>
126+
spark.sql(s"""
127+
SELECT
128+
CASE
129+
WHEN value % 10 = 0 THEN CAST(NULL AS STRUCT<a: INT, b: STRING>)
130+
WHEN value % 5 = 0 THEN named_struct("a", CAST(NULL AS INT), "b", "test")
131+
WHEN value % 3 = 0 THEN named_struct("a", CAST(123 AS INT), "b", CAST(NULL AS STRING))
132+
ELSE named_struct("a", CAST(value AS INT), "b", concat("str_", CAST(value AS STRING)))
133+
END AS json_struct
134+
FROM $tbl
135+
""")
136+
137+
case "to_json - nested struct" =>
138+
spark.sql(s"""
139+
SELECT named_struct(
140+
"outer", named_struct(
141+
"inner_a", CAST(value AS INT),
142+
"inner_b", concat("nested_", CAST(value AS STRING))
143+
)
144+
) AS json_struct FROM $tbl
145+
""")
146+
109147
case _ =>
110148
spark.sql(s"""
111149
SELECT
@@ -117,8 +155,9 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
117155
prepareTable(dir, jsonData)
118156

119157
val extraConfigs = Map(
158+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
120159
CometConf.getExprAllowIncompatConfigKey(
121-
classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs
160+
classOf[StructsToJson]) -> "true") ++ config.extraCometConfigs
122161

123162
runExpressionBenchmark(config.name, values, config.query, extraConfigs)
124163
}
@@ -127,6 +166,7 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
127166

128167
// Configuration for all JSON expression benchmarks
129168
private val jsonExpressions = List(
169+
// from_json tests
130170
JsonExprConfig(
131171
"from_json - simple primitives",
132172
"a INT, b STRING",
@@ -146,7 +186,25 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
146186
JsonExprConfig(
147187
"from_json - field access",
148188
"a INT, b STRING",
149-
"SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"))
189+
"SELECT from_json(json_str, 'a INT, b STRING').a FROM parquetV1Table"),
190+
191+
// to_json tests
192+
JsonExprConfig(
193+
"to_json - simple primitives",
194+
"a INT, b STRING",
195+
"SELECT to_json(json_struct) FROM parquetV1Table"),
196+
JsonExprConfig(
197+
"to_json - all primitive types",
198+
"i32 INT, i64 BIGINT, f32 FLOAT, f64 DOUBLE, bool BOOLEAN, str STRING",
199+
"SELECT to_json(json_struct) FROM parquetV1Table"),
200+
JsonExprConfig(
201+
"to_json - with nulls",
202+
"a INT, b STRING",
203+
"SELECT to_json(json_struct) FROM parquetV1Table"),
204+
JsonExprConfig(
205+
"to_json - nested struct",
206+
"outer STRUCT<inner_a: INT, inner_b: STRING>",
207+
"SELECT to_json(json_struct) FROM parquetV1Table"))
150208

151209
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
152210
val values = 1024 * 1024

0 commit comments

Comments
 (0)