Skip to content

Commit 4740e94

Browse files
authored
chore: Parquet fuzz testing (#1623)
1 parent 6a100ad commit 4740e94

File tree

2 files changed

+90
-8
lines changed

2 files changed

+90
-8
lines changed

spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType,
3434
object ParquetGenerator {
3535

3636
/**
37-
* Arbitrary date to use as base for generating temporal columns. Random integers will be added
38-
* to or subtracted from this value.
37+
* Date to use as base for generating temporal columns. Random integers will be added to or
38+
* subtracted from this value.
39+
*
40+
* Date was chosen to trigger generating a timestamp that's larger than a 64-bit nanosecond
41+
* timestamp can represent so that we test support for INT96 timestamps.
3942
*/
40-
private val baseDate =
41-
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime
43+
val defaultBaseDate: Long =
44+
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("3333-05-25 12:34:56").getTime
4245

4346
private val primitiveTypes = Seq(
4447
DataTypes.BooleanType,
@@ -217,13 +220,13 @@ object ParquetGenerator {
217220
null
218221
}
219222
case DataTypes.DateType =>
220-
Range(0, numRows).map(_ => new java.sql.Date(baseDate + r.nextInt()))
223+
Range(0, numRows).map(_ => new java.sql.Date(options.baseDate + r.nextInt()))
221224
case DataTypes.TimestampType =>
222-
Range(0, numRows).map(_ => new Timestamp(baseDate + r.nextInt()))
225+
Range(0, numRows).map(_ => new Timestamp(options.baseDate + r.nextInt()))
223226
case DataTypes.TimestampNTZType =>
224227
Range(0, numRows).map(_ =>
225228
LocalDateTime.ofInstant(
226-
Instant.ofEpochMilli(baseDate + r.nextInt()),
229+
Instant.ofEpochMilli(options.baseDate + r.nextInt()),
227230
ZoneId.systemDefault()))
228231
case _ => throw new IllegalStateException(s"Cannot generate data for $dataType yet")
229232
}
@@ -234,6 +237,7 @@ object ParquetGenerator {
234237
case class DataGenOptions(
235238
allowNull: Boolean = true,
236239
generateNegativeZero: Boolean = true,
240+
baseDate: Long = ParquetGenerator.defaultBaseDate,
237241
generateArray: Boolean = false,
238242
generateStruct: Boolean = false,
239243
generateMap: Boolean = false)

spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.comet
2121

2222
import java.io.File
23+
import java.text.SimpleDateFormat
2324

2425
import scala.util.Random
2526

@@ -32,6 +33,8 @@ import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
3233
import org.apache.spark.sql.execution.SparkPlan
3334
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3435
import org.apache.spark.sql.internal.SQLConf
36+
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
37+
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, StructType}
3538

3639
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
3740

@@ -57,7 +60,13 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
5760
CometConf.COMET_ENABLED.key -> "false",
5861
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
5962
val options =
60-
DataGenOptions(generateArray = true, generateStruct = true, generateNegativeZero = false)
63+
DataGenOptions(
64+
generateArray = true,
65+
generateStruct = true,
66+
generateNegativeZero = false,
67+
// override base date due to known issues with experimental scans
68+
baseDate =
69+
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime)
6170
ParquetGenerator.makeParquetFile(random, spark, filename, 1000, options)
6271
}
6372
}
@@ -166,6 +175,75 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
166175
}
167176
}
168177

178+
test("Parquet temporal types written as INT96") {
179+
180+
// there are known issues with INT96 support in the new experimental scans
181+
// https://github.com/apache/datafusion-comet/issues/1441
182+
assume(!CometConf.isExperimentalNativeScan)
183+
184+
testParquetTemporalTypes(ParquetOutputTimestampType.INT96)
185+
}
186+
187+
test("Parquet temporal types written as TIMESTAMP_MICROS") {
188+
testParquetTemporalTypes(ParquetOutputTimestampType.TIMESTAMP_MICROS)
189+
}
190+
191+
test("Parquet temporal types written as TIMESTAMP_MILLIS") {
192+
testParquetTemporalTypes(ParquetOutputTimestampType.TIMESTAMP_MILLIS)
193+
}
194+
195+
private def testParquetTemporalTypes(
196+
outputTimestampType: ParquetOutputTimestampType.Value): Unit = {
197+
198+
val options =
199+
DataGenOptions(generateArray = true, generateStruct = true, generateNegativeZero = false)
200+
201+
withTempPath { filename =>
202+
val random = new Random(42)
203+
withSQLConf(
204+
CometConf.COMET_ENABLED.key -> "false",
205+
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString,
206+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) {
207+
ParquetGenerator.makeParquetFile(random, spark, filename.toString, 100, options)
208+
}
209+
210+
Seq(defaultTimezone, "UTC", "America/Denver").foreach { tz =>
211+
Seq(true, false).foreach { inferTimestampNtzEnabled =>
212+
Seq(true, false).foreach { int96TimestampConversion =>
213+
Seq(true, false).foreach { int96AsTimestamp =>
214+
withSQLConf(
215+
CometConf.COMET_ENABLED.key -> "true",
216+
SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz,
217+
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> int96AsTimestamp.toString,
218+
SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> int96TimestampConversion.toString,
219+
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> inferTimestampNtzEnabled.toString) {
220+
221+
val df = spark.read.parquet(filename.toString)
222+
df.createOrReplaceTempView("t1")
223+
224+
def hasTemporalType(t: DataType): Boolean = t match {
225+
case DataTypes.DateType | DataTypes.TimestampType |
226+
DataTypes.TimestampNTZType =>
227+
true
228+
case t: StructType => t.exists(f => hasTemporalType(f.dataType))
229+
case t: ArrayType => hasTemporalType(t.elementType)
230+
case _ => false
231+
}
232+
233+
val columns =
234+
df.schema.fields.filter(f => hasTemporalType(f.dataType)).map(_.name)
235+
236+
for (col <- columns) {
237+
checkSparkAnswer(s"SELECT $col FROM t1 ORDER BY $col")
238+
}
239+
}
240+
}
241+
}
242+
}
243+
}
244+
}
245+
}
246+
169247
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
170248
pos: Position): Unit = {
171249
Seq("native", "jvm").foreach { shuffleMode =>

0 commit comments

Comments
 (0)