Skip to content

Commit 3ad22eb

Browse files
authored
chore: Refactor Parquet/DataFrame fuzz data generators (#2629)
1 parent eeb1566 commit 3ad22eb

File tree

11 files changed

+171
-173
lines changed

11 files changed

+171
-173
lines changed

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ These settings can be used to determine which parts of the plan are accelerated
268268
| `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true |
269269
| `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true |
270270
| `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true |
271+
| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true |
271272
| `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true |
272273
| `spark.comet.expression.ShiftLeft.enabled` | Enable Comet acceleration for `ShiftLeft` | true |
273274
| `spark.comet.expression.ShiftRight.enabled` | Enable Comet acceleration for `ShiftRight` | true |

fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.rogach.scallop.ScallopOption
2626

2727
import org.apache.spark.sql.SparkSession
2828

29-
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
29+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
3030

3131
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
3232
object generateData extends Subcommand("data") {
@@ -78,19 +78,19 @@ object Main {
7878
case Some(seed) => new Random(seed)
7979
case None => new Random()
8080
}
81-
val options = DataGenOptions(
82-
allowNull = true,
83-
generateArray = conf.generateData.generateArrays(),
84-
generateStruct = conf.generateData.generateStructs(),
85-
generateMap = conf.generateData.generateMaps(),
86-
generateNegativeZero = !conf.generateData.excludeNegativeZero())
8781
for (i <- 0 until conf.generateData.numFiles()) {
8882
ParquetGenerator.makeParquetFile(
8983
r,
9084
spark,
9185
s"test$i.parquet",
9286
numRows = conf.generateData.numRows(),
93-
options)
87+
SchemaGenOptions(
88+
generateArray = conf.generateData.generateArrays(),
89+
generateStruct = conf.generateData.generateStructs(),
90+
generateMap = conf.generateData.generateMaps()),
91+
DataGenOptions(
92+
allowNull = true,
93+
generateNegativeZero = !conf.generateData.excludeNegativeZero()))
9494
}
9595
case Some(conf.generateQueries) =>
9696
val r = conf.generateQueries.randomSeed.toOption match {

spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -44,50 +44,16 @@ object FuzzDataGenerator {
4444
val defaultBaseDate: Long =
4545
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("3333-05-25 12:34:56").getTime
4646

47-
private val primitiveTypes = Seq(
48-
DataTypes.BooleanType,
49-
DataTypes.ByteType,
50-
DataTypes.ShortType,
51-
DataTypes.IntegerType,
52-
DataTypes.LongType,
53-
DataTypes.FloatType,
54-
DataTypes.DoubleType,
55-
DataTypes.createDecimalType(10, 2),
56-
DataTypes.createDecimalType(36, 18),
57-
DataTypes.DateType,
58-
DataTypes.TimestampType,
59-
DataTypes.TimestampNTZType,
60-
DataTypes.StringType,
61-
DataTypes.BinaryType)
62-
63-
private def filteredPrimitives(excludeTypes: Seq[DataType]) = {
64-
65-
primitiveTypes.filterNot { dataType =>
66-
excludeTypes.exists {
67-
case _: DecimalType =>
68-
// For DecimalType, match if the type is also a DecimalType (ignore precision/scale)
69-
dataType.isInstanceOf[DecimalType]
70-
case excludeType =>
71-
dataType == excludeType
72-
}
73-
}
74-
}
75-
76-
def generateDataFrame(
77-
r: Random,
78-
spark: SparkSession,
79-
numRows: Int,
80-
options: DataGenOptions): DataFrame = {
81-
82-
val filteredPrimitiveTypes = filteredPrimitives(options.excludeTypes)
47+
def generateSchema(options: SchemaGenOptions): StructType = {
48+
val primitiveTypes = options.primitiveTypes
8349
val dataTypes = ListBuffer[DataType]()
84-
dataTypes.appendAll(filteredPrimitiveTypes)
50+
dataTypes.appendAll(primitiveTypes)
8551

86-
val arraysOfPrimitives = filteredPrimitiveTypes.map(DataTypes.createArrayType)
52+
val arraysOfPrimitives = primitiveTypes.map(DataTypes.createArrayType)
8753

8854
if (options.generateStruct) {
89-
dataTypes += StructType(filteredPrimitiveTypes.zipWithIndex.map(x =>
90-
StructField(s"c${x._2}", x._1, nullable = true)))
55+
dataTypes += StructType(
56+
primitiveTypes.zipWithIndex.map(x => StructField(s"c${x._2}", x._1, nullable = true)))
9157

9258
if (options.generateArray) {
9359
dataTypes += StructType(arraysOfPrimitives.zipWithIndex.map(x =>
@@ -103,9 +69,8 @@ object FuzzDataGenerator {
10369
dataTypes.appendAll(arraysOfPrimitives)
10470

10571
if (options.generateStruct) {
106-
dataTypes += DataTypes.createArrayType(
107-
StructType(filteredPrimitiveTypes.zipWithIndex.map(x =>
108-
StructField(s"c${x._2}", x._1, nullable = true))))
72+
dataTypes += DataTypes.createArrayType(StructType(primitiveTypes.zipWithIndex.map(x =>
73+
StructField(s"c${x._2}", x._1, nullable = true))))
10974
}
11075

11176
if (options.generateMap) {
@@ -117,7 +82,15 @@ object FuzzDataGenerator {
11782
// generate schema using random data types
11883
val fields = dataTypes.zipWithIndex
11984
.map(i => StructField(s"c${i._2}", i._1, nullable = true))
120-
val schema = StructType(fields.toSeq)
85+
StructType(fields.toSeq)
86+
}
87+
88+
def generateDataFrame(
89+
r: Random,
90+
spark: SparkSession,
91+
schema: StructType,
92+
numRows: Int,
93+
options: DataGenOptions): DataFrame = {
12194

12295
// generate columnar data
12396
val cols: Seq[Seq[Any]] =
@@ -247,11 +220,31 @@ object FuzzDataGenerator {
247220
}
248221
}
249222

250-
case class DataGenOptions(
251-
allowNull: Boolean = true,
252-
generateNegativeZero: Boolean = true,
253-
baseDate: Long = FuzzDataGenerator.defaultBaseDate,
223+
object SchemaGenOptions {
224+
val defaultPrimitiveTypes: Seq[DataType] = Seq(
225+
DataTypes.BooleanType,
226+
DataTypes.ByteType,
227+
DataTypes.ShortType,
228+
DataTypes.IntegerType,
229+
DataTypes.LongType,
230+
DataTypes.FloatType,
231+
DataTypes.DoubleType,
232+
DataTypes.createDecimalType(10, 2),
233+
DataTypes.createDecimalType(36, 18),
234+
DataTypes.DateType,
235+
DataTypes.TimestampType,
236+
DataTypes.TimestampNTZType,
237+
DataTypes.StringType,
238+
DataTypes.BinaryType)
239+
}
240+
241+
case class SchemaGenOptions(
254242
generateArray: Boolean = false,
255243
generateStruct: Boolean = false,
256244
generateMap: Boolean = false,
257-
excludeTypes: Seq[DataType] = Seq.empty)
245+
primitiveTypes: Seq[DataType] = SchemaGenOptions.defaultPrimitiveTypes)
246+
247+
case class DataGenOptions(
248+
allowNull: Boolean = true,
249+
generateNegativeZero: Boolean = true,
250+
baseDate: Long = FuzzDataGenerator.defaultBaseDate)

spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,32 @@ package org.apache.comet.testing
2222
import scala.util.Random
2323

2424
import org.apache.spark.sql.{SaveMode, SparkSession}
25+
import org.apache.spark.sql.types.StructType
2526

2627
object ParquetGenerator {
2728

29+
/** Generate a Parquet file using a generated schema */
2830
def makeParquetFile(
2931
r: Random,
3032
spark: SparkSession,
3133
filename: String,
3234
numRows: Int,
33-
options: DataGenOptions): Unit = {
34-
35-
val df = FuzzDataGenerator.generateDataFrame(r, spark, numRows, options)
35+
schemaGenOptions: SchemaGenOptions,
36+
dataGenOptions: DataGenOptions): Unit = {
37+
val schema = FuzzDataGenerator.generateSchema(schemaGenOptions)
38+
makeParquetFile(r, spark, filename, schema, numRows, dataGenOptions)
39+
}
3640

41+
/** Generate a Parquet file using the provided schema */
42+
def makeParquetFile(
43+
r: Random,
44+
spark: SparkSession,
45+
filename: String,
46+
schema: StructType,
47+
numRows: Int,
48+
options: DataGenOptions): Unit = {
49+
val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, numRows, options)
3750
df.write.mode(SaveMode.Overwrite).parquet(filename)
3851
}
52+
3953
}

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 41 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.functions._
3030
import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus}
3131
import org.apache.comet.DataTypeSupport.isComplexType
3232
import org.apache.comet.serde.{CometArrayExcept, CometArrayRemove, CometArrayReverse, CometFlatten}
33-
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
33+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
3434

3535
class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
3636

@@ -64,12 +64,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
6464
spark,
6565
filename,
6666
100,
67-
DataGenOptions(
68-
allowNull = true,
69-
generateNegativeZero = true,
70-
generateArray = false,
71-
generateStruct = false,
72-
generateMap = false))
67+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
68+
DataGenOptions(allowNull = true, generateNegativeZero = true))
7369
}
7470
val table = spark.read.parquet(filename)
7571
table.createOrReplaceTempView("t1")
@@ -95,13 +91,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
9591
val filename = path.toString
9692
val random = new Random(42)
9793
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
98-
val options = DataGenOptions(
99-
allowNull = true,
100-
generateNegativeZero = true,
101-
generateArray = true,
102-
generateStruct = true,
103-
generateMap = false)
104-
ParquetGenerator.makeParquetFile(random, spark, filename, 100, options)
94+
ParquetGenerator.makeParquetFile(
95+
random,
96+
spark,
97+
filename,
98+
100,
99+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false),
100+
DataGenOptions(allowNull = true, generateNegativeZero = true))
105101
}
106102
withSQLConf(
107103
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
@@ -266,12 +262,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
266262
spark,
267263
filename,
268264
100,
269-
DataGenOptions(
270-
allowNull = true,
271-
generateNegativeZero = true,
272-
generateArray = true,
273-
generateStruct = true,
274-
generateMap = false))
265+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false),
266+
DataGenOptions(allowNull = true, generateNegativeZero = true))
275267
}
276268
val table = spark.read.parquet(filename)
277269
table.createOrReplaceTempView("t1")
@@ -310,12 +302,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
310302
spark,
311303
filename,
312304
100,
313-
DataGenOptions(
314-
allowNull = true,
315-
generateNegativeZero = true,
316-
generateArray = false,
317-
generateStruct = false,
318-
generateMap = false))
305+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
306+
DataGenOptions(allowNull = true, generateNegativeZero = true))
319307
}
320308
val table = spark.read.parquet(filename)
321309
table.createOrReplaceTempView("t2")
@@ -340,12 +328,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
340328
spark,
341329
filename,
342330
100,
343-
DataGenOptions(
344-
allowNull = true,
345-
generateNegativeZero = true,
346-
generateArray = true,
347-
generateStruct = true,
348-
generateMap = false))
331+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false),
332+
DataGenOptions(allowNull = true, generateNegativeZero = true))
349333
}
350334
withSQLConf(
351335
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
@@ -588,12 +572,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
588572
spark,
589573
filename,
590574
100,
591-
DataGenOptions(
592-
allowNull = true,
593-
generateNegativeZero = true,
594-
generateArray = false,
595-
generateStruct = false,
596-
generateMap = false))
575+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
576+
DataGenOptions(allowNull = true, generateNegativeZero = true))
597577
}
598578
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
599579
withTempView("t1", "t2") {
@@ -622,13 +602,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
622602
val filename = path.toString
623603
val random = new Random(42)
624604
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
625-
val options = DataGenOptions(
626-
allowNull = true,
627-
generateNegativeZero = true,
628-
generateArray = true,
629-
generateStruct = true,
630-
generateMap = false)
631-
ParquetGenerator.makeParquetFile(random, spark, filename, 100, options)
605+
ParquetGenerator.makeParquetFile(
606+
random,
607+
spark,
608+
filename,
609+
100,
610+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false),
611+
DataGenOptions(allowNull = true, generateNegativeZero = true))
632612
}
633613
withSQLConf(
634614
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
@@ -692,12 +672,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
692672
spark,
693673
filename,
694674
100,
695-
DataGenOptions(
696-
allowNull = true,
697-
generateNegativeZero = true,
698-
generateArray = false,
699-
generateStruct = false,
700-
generateMap = false))
675+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
676+
DataGenOptions(allowNull = true, generateNegativeZero = true))
701677
}
702678
val table = spark.read.parquet(filename)
703679
table.createOrReplaceTempView("t1")
@@ -720,13 +696,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
720696
val filename = path.toString
721697
val random = new Random(42)
722698
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
723-
val options = DataGenOptions(
724-
allowNull = true,
725-
generateNegativeZero = true,
726-
generateArray = true,
727-
generateStruct = true,
728-
generateMap = false)
729-
ParquetGenerator.makeParquetFile(random, spark, filename, 100, options)
699+
ParquetGenerator.makeParquetFile(
700+
random,
701+
spark,
702+
filename,
703+
100,
704+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false),
705+
DataGenOptions(allowNull = true, generateNegativeZero = true))
730706
}
731707
withSQLConf(
732708
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",
@@ -773,13 +749,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
773749
val filename = path.toString
774750
val random = new Random(42)
775751
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
776-
val options = DataGenOptions(
777-
allowNull = true,
778-
generateNegativeZero = true,
779-
generateArray = true,
780-
generateStruct = true,
781-
generateMap = false)
782-
ParquetGenerator.makeParquetFile(random, spark, filename, 100, options)
752+
ParquetGenerator.makeParquetFile(
753+
random,
754+
spark,
755+
filename,
756+
100,
757+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false),
758+
DataGenOptions(allowNull = true, generateNegativeZero = true))
783759
}
784760
withSQLConf(
785761
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false",

0 commit comments

Comments
 (0)