From e365ac31574b943356848e68eaa6d5c364ca3bab Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 09:34:53 -0600 Subject: [PATCH 01/22] add version of generateDataFrame that accepts a custom schema --- .../org/apache/comet/testing/FuzzDataGenerator.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index 7c7a6727fb..a90d179700 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -78,7 +78,6 @@ object FuzzDataGenerator { spark: SparkSession, numRows: Int, options: DataGenOptions): DataFrame = { - val filteredPrimitiveTypes = filteredPrimitives(options.excludeTypes) val dataTypes = ListBuffer[DataType]() dataTypes.appendAll(filteredPrimitiveTypes) @@ -119,6 +118,16 @@ object FuzzDataGenerator { .map(i => StructField(s"c${i._2}", i._1, nullable = true)) val schema = StructType(fields.toSeq) + generateDataFrame(r, spark, schema, numRows, options) + } + + def generateDataFrame( + r: Random, + spark: SparkSession, + schema: StructType, + numRows: Int, + options: DataGenOptions): DataFrame = { + // generate columnar data val cols: Seq[Seq[Any]] = schema.fields.map(f => generateColumn(r, f.dataType, numRows, options)).toSeq From 10c960e805eb75996a3d842cb35277b9a3404b7e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 09:49:41 -0600 Subject: [PATCH 02/22] refactor --- .../scala/org/apache/comet/fuzz/Main.scala | 4 +- .../comet/testing/FuzzDataGenerator.scala | 89 +------------------ .../comet/testing/ParquetGenerator.scala | 77 +++++++++++++++- .../comet/CometArrayExpressionSuite.scala | 22 ++--- .../comet/CometBitwiseExpressionSuite.scala | 6 +- .../org/apache/comet/CometFuzzTestBase.scala | 4 +- .../org/apache/comet/CometFuzzTestSuite.scala | 4 +- .../comet/CometMapExpressionSuite.scala | 4 +- .../comet/exec/CometAggregateSuite.scala | 4 +- .../apache/comet/exec/CometExecSuite.scala | 4 +- 10 files changed, 105 insertions(+), 113 deletions(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala index 9b9a4b6f3e..74edd94008 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala @@ -26,7 +26,7 @@ import org.rogach.scallop.ScallopOption import org.apache.spark.sql.SparkSession -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { object generateData extends Subcommand("data") { @@ -78,7 +78,7 @@ object Main { case Some(seed) => new Random(seed) case None => new Random() } - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = true, generateArray = conf.generateData.generateArrays(), generateStruct = conf.generateData.generateStructs(), diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index a90d179700..d75c53259f 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -44,89 +44,12 @@ object FuzzDataGenerator { val defaultBaseDate: Long = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("3333-05-25 12:34:56").getTime - private val primitiveTypes = Seq( - DataTypes.BooleanType, - DataTypes.ByteType, - DataTypes.ShortType, - DataTypes.IntegerType, - DataTypes.LongType, - DataTypes.FloatType, - DataTypes.DoubleType, - DataTypes.createDecimalType(10, 2), - DataTypes.createDecimalType(36, 18), - DataTypes.DateType, - DataTypes.TimestampType, - DataTypes.TimestampNTZType, - DataTypes.StringType, - DataTypes.BinaryType) - - private def filteredPrimitives(excludeTypes: Seq[DataType]) = { - - primitiveTypes.filterNot { dataType => - excludeTypes.exists { - case _: DecimalType => - // For DecimalType, match if the type is also a DecimalType (ignore precision/scale) - dataType.isInstanceOf[DecimalType] - case excludeType => - dataType == excludeType - } - } - } - - def generateDataFrame( - r: Random, - spark: SparkSession, - numRows: Int, - options: DataGenOptions): DataFrame = { - val filteredPrimitiveTypes = filteredPrimitives(options.excludeTypes) - val dataTypes = ListBuffer[DataType]() - dataTypes.appendAll(filteredPrimitiveTypes) - - val arraysOfPrimitives = filteredPrimitiveTypes.map(DataTypes.createArrayType) - - if (options.generateStruct) { - dataTypes += StructType(filteredPrimitiveTypes.zipWithIndex.map(x => - StructField(s"c${x._2}", x._1, nullable = true))) - - if (options.generateArray) { - dataTypes += StructType(arraysOfPrimitives.zipWithIndex.map(x => - StructField(s"c${x._2}", x._1, nullable = true))) - } - } - - if (options.generateMap) { - dataTypes += MapType(DataTypes.IntegerType, DataTypes.StringType) - } - - if (options.generateArray) { - dataTypes.appendAll(arraysOfPrimitives) - - if (options.generateStruct) { - dataTypes += DataTypes.createArrayType( - StructType(filteredPrimitiveTypes.zipWithIndex.map(x => - StructField(s"c${x._2}", x._1, nullable = true)))) - } - - if (options.generateMap) { - dataTypes += DataTypes.createArrayType( - MapType(DataTypes.IntegerType, DataTypes.StringType)) - } - } - - // generate schema using random data types - val fields = dataTypes.zipWithIndex - .map(i => StructField(s"c${i._2}", i._1, nullable = true)) - val schema = StructType(fields.toSeq) - - generateDataFrame(r, spark, schema, numRows, options) - } - def generateDataFrame( r: Random, spark: SparkSession, schema: StructType, numRows: Int, - options: DataGenOptions): DataFrame = { + options: DataGenOptions2): DataFrame = { // generate columnar data val cols: Seq[Seq[Any]] = @@ -144,7 +67,7 @@ object FuzzDataGenerator { r: Random, dataType: DataType, numRows: Int, - options: DataGenOptions): Seq[Any] = { + options: DataGenOptions2): Seq[Any] = { dataType match { case ArrayType(elementType, _) => val values = generateColumn(r, elementType, numRows, options) @@ -256,11 +179,7 @@ object FuzzDataGenerator { } } -case class DataGenOptions( +case class DataGenOptions2( allowNull: Boolean = true, generateNegativeZero: Boolean = true, - baseDate: Long = FuzzDataGenerator.defaultBaseDate, - generateArray: Boolean = false, - generateStruct: Boolean = false, - generateMap: Boolean = false, - excludeTypes: Seq[DataType] = Seq.empty) + baseDate: Long = FuzzDataGenerator.defaultBaseDate) diff --git a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala index 27e40c9d74..8d63bf02df 100644 --- a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala @@ -19,21 +19,94 @@ package org.apache.comet.testing +import scala.collection.mutable.ListBuffer import scala.util.Random import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.types.{DataType, DataTypes, MapType, StructField, StructType} object ParquetGenerator { + def makeParquetSchema(options: ParquetDataGenOptions): StructType = { + val primitiveTypes = options.primitiveTypes + val dataTypes = ListBuffer[DataType]() + dataTypes.appendAll(primitiveTypes) + + val arraysOfPrimitives = primitiveTypes.map(DataTypes.createArrayType) + + if (options.generateStruct) { + dataTypes += StructType( + primitiveTypes.zipWithIndex.map(x => StructField(s"c${x._2}", x._1, nullable = true))) + + if (options.generateArray) { + dataTypes += StructType(arraysOfPrimitives.zipWithIndex.map(x => + StructField(s"c${x._2}", x._1, nullable = true))) + } + } + + if (options.generateMap) { + dataTypes += MapType(DataTypes.IntegerType, DataTypes.StringType) + } + + if (options.generateArray) { + dataTypes.appendAll(arraysOfPrimitives) + + if (options.generateStruct) { + dataTypes += DataTypes.createArrayType(StructType(primitiveTypes.zipWithIndex.map(x => + StructField(s"c${x._2}", x._1, nullable = true)))) + } + + if (options.generateMap) { + dataTypes += DataTypes.createArrayType( + MapType(DataTypes.IntegerType, DataTypes.StringType)) + } + } + + // generate schema using random data types + val fields = dataTypes.zipWithIndex + .map(i => StructField(s"c${i._2}", i._1, nullable = true)) + StructType(fields.toSeq) + } + def makeParquetFile( r: Random, spark: SparkSession, filename: String, numRows: Int, - options: DataGenOptions): Unit = { + options: ParquetDataGenOptions): Unit = { - val df = FuzzDataGenerator.generateDataFrame(r, spark, numRows, options) + val schema = makeParquetSchema(options) + + val x = DataGenOptions2( + allowNull = options.allowNull, + generateNegativeZero = options.generateNegativeZero, + baseDate = options.baseDate) + + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, numRows, x) df.write.mode(SaveMode.Overwrite).parquet(filename) } } + +case class ParquetDataGenOptions( + allowNull: Boolean = true, + generateNegativeZero: Boolean = true, + baseDate: Long = FuzzDataGenerator.defaultBaseDate, + generateArray: Boolean = false, + generateStruct: Boolean = false, + generateMap: Boolean = false, + primitiveTypes: Seq[DataType] = Seq( + DataTypes.BooleanType, + DataTypes.ByteType, + DataTypes.ShortType, + DataTypes.IntegerType, + DataTypes.LongType, + DataTypes.FloatType, + DataTypes.DoubleType, + DataTypes.createDecimalType(10, 2), + DataTypes.createDecimalType(36, 18), + DataTypes.DateType, + DataTypes.TimestampType, + DataTypes.TimestampNTZType, + DataTypes.StringType, + DataTypes.BinaryType)) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 2adb7a9ed6..1313e3e6b8 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.functions._ import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.serde.{CometArrayExcept, CometArrayRemove, CometArrayReverse, CometFlatten} -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -64,7 +64,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -95,7 +95,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -266,7 +266,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -310,7 +310,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -340,7 +340,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -588,7 +588,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -622,7 +622,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -692,7 +692,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -720,7 +720,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -773,7 +773,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, diff --git a/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala index d89e81b0fd..8eaec24aae 100644 --- a/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -99,7 +99,7 @@ class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHe spark, filename, 100, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -174,7 +174,7 @@ class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHe spark, filename, 10, - DataGenOptions( + ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = false, diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index a69080e446..9068e5b74d 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { @@ -59,7 +59,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.COMET_ENABLED.key -> "false", SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { val options = - DataGenOptions( + ParquetDataGenOptions( generateArray = true, generateStruct = true, generateNegativeZero = false, diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 398d960136..a6e55446b4 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.isComplexType -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class CometFuzzTestSuite extends CometFuzzTestBase { @@ -262,7 +262,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { generateStruct: Boolean = true): Unit = { val options = - DataGenOptions( + ParquetDataGenOptions( generateArray = generateArray, generateStruct = generateStruct, generateNegativeZero = false) diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 3557ad348b..7283b696b4 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class CometMapExpressionSuite extends CometTestBase { @@ -108,7 +108,7 @@ class CometMapExpressionSuite extends CometTestBase { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = false, generateNegativeZero = false, generateArray = true, diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index d0b1dfb362..1bf817bd39 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.functions.{avg, count_distinct, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} /** * Test suite dedicated to Comet native aggregate operator @@ -45,7 +45,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - ParquetGenerator.makeParquetFile(random, spark, filename, 10000, DataGenOptions()) + ParquetGenerator.makeParquetFile(random, spark, filename, 10000, ParquetDataGenOptions()) } val tableName = "avg_decimal" withTable(tableName) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index ab7081e10e..747bcff79a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -50,7 +50,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.{CometConf, ExtendedExplainInfo} import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} class CometExecSuite extends CometTestBase { @@ -2052,7 +2052,7 @@ class CometExecSuite extends CometTestBase { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = DataGenOptions( + val options = ParquetDataGenOptions( allowNull = true, generateNegativeZero = true, generateArray = true, From 0ebd1439d4dea7b778e216fefd9ebd242a566662 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 09:54:55 -0600 Subject: [PATCH 03/22] refactor --- docs/source/user-guide/latest/configs.md | 1 + .../scala/org/apache/comet/fuzz/Main.scala | 4 +- .../comet/testing/FuzzDataGenerator.scala | 6 +-- .../comet/testing/ParquetGenerator.scala | 53 +++++++++++-------- .../comet/CometArrayExpressionSuite.scala | 22 ++++---- .../comet/CometBitwiseExpressionSuite.scala | 6 +-- .../org/apache/comet/CometFuzzTestBase.scala | 4 +- .../org/apache/comet/CometFuzzTestSuite.scala | 4 +- .../comet/CometMapExpressionSuite.scala | 4 +- .../comet/exec/CometAggregateSuite.scala | 9 +++- .../apache/comet/exec/CometExecSuite.scala | 4 +- 11 files changed, 67 insertions(+), 50 deletions(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index c4c3343722..bc8fdcb982 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -268,6 +268,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.Reverse.enabled` | Enable Comet acceleration for `Reverse` | true | | `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true | | `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true | +| `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true | | `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true | | `spark.comet.expression.ShiftLeft.enabled` | Enable Comet acceleration for `ShiftLeft` | true | | `spark.comet.expression.ShiftRight.enabled` | Enable Comet acceleration for `ShiftRight` | true | diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala index 74edd94008..633a10ba95 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala @@ -26,7 +26,7 @@ import org.rogach.scallop.ScallopOption import org.apache.spark.sql.SparkSession -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { object generateData extends Subcommand("data") { @@ -78,7 +78,7 @@ object Main { case Some(seed) => new Random(seed) case None => new Random() } - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = true, generateArray = conf.generateData.generateArrays(), generateStruct = conf.generateData.generateStructs(), diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index d75c53259f..bba38964d6 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -49,7 +49,7 @@ object FuzzDataGenerator { spark: SparkSession, schema: StructType, numRows: Int, - options: DataGenOptions2): DataFrame = { + options: DataGenOptions): DataFrame = { // generate columnar data val cols: Seq[Seq[Any]] = @@ -67,7 +67,7 @@ object FuzzDataGenerator { r: Random, dataType: DataType, numRows: Int, - options: DataGenOptions2): Seq[Any] = { + options: DataGenOptions): Seq[Any] = { dataType match { case ArrayType(elementType, _) => val values = generateColumn(r, elementType, numRows, options) @@ -179,7 +179,7 @@ object FuzzDataGenerator { } } -case class DataGenOptions2( +case class DataGenOptions( allowNull: Boolean = true, generateNegativeZero: Boolean = true, baseDate: Long = FuzzDataGenerator.defaultBaseDate) diff --git a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala index 8d63bf02df..279c91a1e2 100644 --- a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala @@ -27,7 +27,36 @@ import org.apache.spark.sql.types.{DataType, DataTypes, MapType, StructField, St object ParquetGenerator { - def makeParquetSchema(options: ParquetDataGenOptions): StructType = { + /** Generate a Parquet file using a generated schema */ + def makeParquetFile( + r: Random, + spark: SparkSession, + filename: String, + numRows: Int, + options: ParquetGeneratorOptions): Unit = { + val schema = generateSchema(options) + + val dataGenOptions = DataGenOptions( + allowNull = options.allowNull, + generateNegativeZero = options.generateNegativeZero, + baseDate = options.baseDate) + + makeParquetFile(r, spark, filename, schema, numRows, dataGenOptions) + } + + /** Generate a Parquet file using the provided schema */ + def makeParquetFile( + r: Random, + spark: SparkSession, + filename: String, + schema: StructType, + numRows: Int, + options: DataGenOptions): Unit = { + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, numRows, options) + df.write.mode(SaveMode.Overwrite).parquet(filename) + } + + private def generateSchema(options: ParquetGeneratorOptions): StructType = { val primitiveTypes = options.primitiveTypes val dataTypes = ListBuffer[DataType]() dataTypes.appendAll(primitiveTypes) @@ -67,28 +96,10 @@ object ParquetGenerator { .map(i => StructField(s"c${i._2}", i._1, nullable = true)) StructType(fields.toSeq) } - - def makeParquetFile( - r: Random, - spark: SparkSession, - filename: String, - numRows: Int, - options: ParquetDataGenOptions): Unit = { - - val schema = makeParquetSchema(options) - - val x = DataGenOptions2( - allowNull = options.allowNull, - generateNegativeZero = options.generateNegativeZero, - baseDate = options.baseDate) - - val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, numRows, x) - - df.write.mode(SaveMode.Overwrite).parquet(filename) - } } -case class ParquetDataGenOptions( +/** Schema and Data generation options */ +case class ParquetGeneratorOptions( allowNull: Boolean = true, generateNegativeZero: Boolean = true, baseDate: Long = FuzzDataGenerator.defaultBaseDate, diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 1313e3e6b8..bd76400857 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.functions._ import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.serde.{CometArrayExcept, CometArrayRemove, CometArrayReverse, CometFlatten} -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -64,7 +64,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -95,7 +95,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -266,7 +266,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -310,7 +310,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -340,7 +340,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -588,7 +588,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -622,7 +622,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -692,7 +692,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -720,7 +720,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, @@ -773,7 +773,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, diff --git a/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala index 8eaec24aae..d2e214e29e 100644 --- a/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -99,7 +99,7 @@ class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHe spark, filename, 100, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = false, @@ -174,7 +174,7 @@ class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHe spark, filename, 10, - ParquetDataGenOptions( + ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = false, diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 9068e5b74d..9ab35c822f 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { @@ -59,7 +59,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { CometConf.COMET_ENABLED.key -> "false", SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { val options = - ParquetDataGenOptions( + ParquetGeneratorOptions( generateArray = true, generateStruct = true, generateNegativeZero = false, diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index a6e55446b4..1a4196710a 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.isComplexType -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class CometFuzzTestSuite extends CometFuzzTestBase { @@ -262,7 +262,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { generateStruct: Boolean = true): Unit = { val options = - ParquetDataGenOptions( + ParquetGeneratorOptions( generateArray = generateArray, generateStruct = generateStruct, generateNegativeZero = false) diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 7283b696b4..a9eb057f82 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class CometMapExpressionSuite extends CometTestBase { @@ -108,7 +108,7 @@ class CometMapExpressionSuite extends CometTestBase { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = false, generateNegativeZero = false, generateArray = true, diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 1bf817bd39..f64d8b02cd 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.functions.{avg, count_distinct, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} /** * Test suite dedicated to Comet native aggregate operator @@ -45,7 +45,12 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - ParquetGenerator.makeParquetFile(random, spark, filename, 10000, ParquetDataGenOptions()) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 10000, + ParquetGeneratorOptions()) } val tableName = "avg_decimal" withTable(tableName) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 747bcff79a..0fcc9c68e3 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -50,7 +50,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.{CometConf, ExtendedExplainInfo} import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} -import org.apache.comet.testing.{ParquetDataGenOptions, ParquetGenerator} +import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} class CometExecSuite extends CometTestBase { @@ -2052,7 +2052,7 @@ class CometExecSuite extends CometTestBase { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetDataGenOptions( + val options = ParquetGeneratorOptions( allowNull = true, generateNegativeZero = true, generateArray = true, From 3ffa0dfdca3039155b2a46642194946c7ffa70c1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 10:43:02 -0600 Subject: [PATCH 04/22] Refactor --- .../comet/testing/FuzzDataGenerator.scala | 65 ++++++++++++++++++ .../comet/testing/ParquetGenerator.scala | 67 +++---------------- 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index bba38964d6..087221e1a3 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -44,6 +44,47 @@ object FuzzDataGenerator { val defaultBaseDate: Long = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("3333-05-25 12:34:56").getTime + def generateSchema(options: SchemaGenOptions): StructType = { + val primitiveTypes = options.primitiveTypes + val dataTypes = ListBuffer[DataType]() + dataTypes.appendAll(primitiveTypes) + + val arraysOfPrimitives = primitiveTypes.map(DataTypes.createArrayType) + + if (options.generateStruct) { + dataTypes += StructType( + primitiveTypes.zipWithIndex.map(x => StructField(s"c${x._2}", x._1, nullable = true))) + + if (options.generateArray) { + dataTypes += StructType(arraysOfPrimitives.zipWithIndex.map(x => + StructField(s"c${x._2}", x._1, nullable = true))) + } + } + + if (options.generateMap) { + dataTypes += MapType(DataTypes.IntegerType, DataTypes.StringType) + } + + if (options.generateArray) { + dataTypes.appendAll(arraysOfPrimitives) + + if (options.generateStruct) { + dataTypes += DataTypes.createArrayType(StructType(primitiveTypes.zipWithIndex.map(x => + StructField(s"c${x._2}", x._1, nullable = true)))) + } + + if (options.generateMap) { + dataTypes += DataTypes.createArrayType( + MapType(DataTypes.IntegerType, DataTypes.StringType)) + } + } + + // generate schema using random data types + val fields = dataTypes.zipWithIndex + .map(i => StructField(s"c${i._2}", i._1, nullable = true)) + StructType(fields.toSeq) + } + def generateDataFrame( r: Random, spark: SparkSession, @@ -179,6 +220,30 @@ object FuzzDataGenerator { } } +object SchemaGenOptions { + val defaultPrimitiveTypes: Seq[DataType] = Seq( + DataTypes.BooleanType, + DataTypes.ByteType, + DataTypes.ShortType, + DataTypes.IntegerType, + DataTypes.LongType, + DataTypes.FloatType, + DataTypes.DoubleType, + DataTypes.createDecimalType(10, 2), + DataTypes.createDecimalType(36, 18), + DataTypes.DateType, + DataTypes.TimestampType, + DataTypes.TimestampNTZType, + DataTypes.StringType, + DataTypes.BinaryType) +} + +case class SchemaGenOptions( + generateArray: Boolean = false, + generateStruct: Boolean = false, + generateMap: Boolean = false, + primitiveTypes: Seq[DataType] = SchemaGenOptions.defaultPrimitiveTypes) + case class DataGenOptions( allowNull: Boolean = true, generateNegativeZero: Boolean = true, diff --git a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala index 279c91a1e2..7591216991 100644 --- a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala @@ -19,11 +19,10 @@ package org.apache.comet.testing -import scala.collection.mutable.ListBuffer import scala.util.Random import org.apache.spark.sql.{SaveMode, SparkSession} -import org.apache.spark.sql.types.{DataType, DataTypes, MapType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, StructType} object ParquetGenerator { @@ -34,7 +33,13 @@ object ParquetGenerator { filename: String, numRows: Int, options: ParquetGeneratorOptions): Unit = { - val schema = generateSchema(options) + + val schemaGenOptions = SchemaGenOptions( + generateArray = options.generateArray, + generateStruct = options.generateStruct, + generateMap = options.generateMap, + primitiveTypes = options.primitiveTypes) + val schema = FuzzDataGenerator.generateSchema(schemaGenOptions) val dataGenOptions = DataGenOptions( allowNull = options.allowNull, @@ -56,46 +61,6 @@ object ParquetGenerator { df.write.mode(SaveMode.Overwrite).parquet(filename) } - private def generateSchema(options: ParquetGeneratorOptions): StructType = { - val primitiveTypes = options.primitiveTypes - val dataTypes = ListBuffer[DataType]() - dataTypes.appendAll(primitiveTypes) - - val arraysOfPrimitives = primitiveTypes.map(DataTypes.createArrayType) - - if (options.generateStruct) { - dataTypes += StructType( - primitiveTypes.zipWithIndex.map(x => StructField(s"c${x._2}", x._1, nullable = true))) - - if (options.generateArray) { - dataTypes += StructType(arraysOfPrimitives.zipWithIndex.map(x => - StructField(s"c${x._2}", x._1, nullable = true))) - } - } - - if (options.generateMap) { - dataTypes += MapType(DataTypes.IntegerType, DataTypes.StringType) - } - - if (options.generateArray) { - dataTypes.appendAll(arraysOfPrimitives) - - if (options.generateStruct) { - dataTypes += DataTypes.createArrayType(StructType(primitiveTypes.zipWithIndex.map(x => - StructField(s"c${x._2}", x._1, nullable = true)))) - } - - if (options.generateMap) { - dataTypes += DataTypes.createArrayType( - MapType(DataTypes.IntegerType, DataTypes.StringType)) - } - } - - // generate schema using random data types - val fields = dataTypes.zipWithIndex - .map(i => StructField(s"c${i._2}", i._1, nullable = true)) - StructType(fields.toSeq) - } } /** Schema and Data generation options */ @@ -106,18 +71,4 @@ case class ParquetGeneratorOptions( generateArray: Boolean = false, generateStruct: Boolean = false, generateMap: Boolean = false, - primitiveTypes: Seq[DataType] = Seq( - DataTypes.BooleanType, - DataTypes.ByteType, - DataTypes.ShortType, - DataTypes.IntegerType, - DataTypes.LongType, - DataTypes.FloatType, - DataTypes.DoubleType, - DataTypes.createDecimalType(10, 2), - DataTypes.createDecimalType(36, 18), - DataTypes.DateType, - DataTypes.TimestampType, - DataTypes.TimestampNTZType, - DataTypes.StringType, - DataTypes.BinaryType)) + primitiveTypes: Seq[DataType] = SchemaGenOptions.defaultPrimitiveTypes) From fccb7e697b261be7425e0a1e471e19a826907aa2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 11:01:50 -0600 Subject: [PATCH 05/22] simplify --- .../scala/org/apache/comet/fuzz/Main.scala | 16 +-- .../comet/testing/ParquetGenerator.scala | 27 +---- .../comet/CometArrayExpressionSuite.scala | 106 +++++++----------- .../comet/CometBitwiseExpressionSuite.scala | 18 +-- .../org/apache/comet/CometFuzzTestBase.scala | 25 +++-- .../org/apache/comet/CometFuzzTestSuite.scala | 19 ++-- .../comet/CometMapExpressionSuite.scala | 19 ++-- .../comet/exec/CometAggregateSuite.scala | 5 +- .../apache/comet/exec/CometExecSuite.scala | 19 ++-- 9 files changed, 109 insertions(+), 145 deletions(-) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala index 633a10ba95..1f81dc7791 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala @@ -26,7 +26,7 @@ import org.rogach.scallop.ScallopOption import org.apache.spark.sql.SparkSession -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { object generateData extends Subcommand("data") { @@ -78,19 +78,19 @@ object Main { case Some(seed) => new Random(seed) case None => new Random() } - val options = ParquetGeneratorOptions( - allowNull = true, - generateArray = conf.generateData.generateArrays(), - generateStruct = conf.generateData.generateStructs(), - generateMap = conf.generateData.generateMaps(), - generateNegativeZero = !conf.generateData.excludeNegativeZero()) for (i <- 0 until conf.generateData.numFiles()) { ParquetGenerator.makeParquetFile( r, spark, s"test$i.parquet", numRows = conf.generateData.numRows(), - options) + SchemaGenOptions( + generateArray = conf.generateData.generateArrays(), + generateStruct = conf.generateData.generateStructs(), + generateMap = conf.generateData.generateMaps()), + DataGenOptions( + allowNull = true, + generateNegativeZero = !conf.generateData.excludeNegativeZero())) } case Some(conf.generateQueries) => val r = conf.generateQueries.randomSeed.toOption match { diff --git a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala index 7591216991..a43100a69d 100644 --- a/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala @@ -22,7 +22,7 @@ package org.apache.comet.testing import scala.util.Random import org.apache.spark.sql.{SaveMode, SparkSession} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType object ParquetGenerator { @@ -32,20 +32,9 @@ object ParquetGenerator { spark: SparkSession, filename: String, numRows: Int, - options: ParquetGeneratorOptions): Unit = { - - val schemaGenOptions = SchemaGenOptions( - generateArray = options.generateArray, - generateStruct = options.generateStruct, - generateMap = options.generateMap, - primitiveTypes = options.primitiveTypes) + schemaGenOptions: SchemaGenOptions, + dataGenOptions: DataGenOptions): Unit = { val schema = FuzzDataGenerator.generateSchema(schemaGenOptions) - - val dataGenOptions = DataGenOptions( - allowNull = options.allowNull, - generateNegativeZero = options.generateNegativeZero, - baseDate = options.baseDate) - makeParquetFile(r, spark, filename, schema, numRows, dataGenOptions) } @@ -62,13 +51,3 @@ object ParquetGenerator { } } - -/** Schema and Data generation options */ -case class ParquetGeneratorOptions( - allowNull: Boolean = true, - generateNegativeZero: Boolean = true, - baseDate: Long = FuzzDataGenerator.defaultBaseDate, - generateArray: Boolean = false, - generateStruct: Boolean = false, - generateMap: Boolean = false, - primitiveTypes: Seq[DataType] = SchemaGenOptions.defaultPrimitiveTypes) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index bd76400857..c346dc2e95 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.functions._ import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.serde.{CometArrayExcept, CometArrayRemove, CometArrayReverse, CometFlatten} -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -64,12 +64,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = false, - generateStruct = false, - generateMap = false)) + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } val table = spark.read.parquet(filename) table.createOrReplaceTempView("t1") @@ -95,13 +91,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false) - ParquetGenerator.makeParquetFile(random, spark, filename, 100, options) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } withSQLConf( CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", @@ -266,12 +262,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false)) + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } val table = spark.read.parquet(filename) table.createOrReplaceTempView("t1") @@ -310,12 +302,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = false, - generateStruct = false, - generateMap = false)) + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } val table = spark.read.parquet(filename) table.createOrReplaceTempView("t2") @@ -340,12 +328,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false)) + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } withSQLConf( CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", @@ -588,12 +572,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = false, - generateStruct = false, - generateMap = false)) + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withTempView("t1", "t2") { @@ -622,13 +602,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false) - ParquetGenerator.makeParquetFile(random, spark, filename, 100, options) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } withSQLConf( CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", @@ -692,12 +672,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = false, - generateStruct = false, - generateMap = false)) + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } val table = spark.read.parquet(filename) table.createOrReplaceTempView("t1") @@ -720,13 +696,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false) - ParquetGenerator.makeParquetFile(random, spark, filename, 100, options) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } withSQLConf( CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", @@ -773,13 +749,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = false) - ParquetGenerator.makeParquetFile(random, spark, filename, 100, options) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } withSQLConf( CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", diff --git a/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala index d2e214e29e..02c003ede8 100644 --- a/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometBitwiseExpressionSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -99,12 +99,8 @@ class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHe spark, filename, 100, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = false, - generateStruct = false, - generateMap = false)) + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } val table = spark.read.parquet(filename) checkSparkAnswerAndOperator( @@ -174,12 +170,8 @@ class CometBitwiseExpressionSuite extends CometTestBase with AdaptiveSparkPlanHe spark, filename, 10, - ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = false, - generateStruct = false, - generateMap = false)) + SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false), + DataGenOptions(allowNull = true, generateNegativeZero = true)) } val table = spark.read.parquet(filename) val df = diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala index 9ab35c822f..1c0636780e 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { @@ -58,15 +58,20 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "false", SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { - val options = - ParquetGeneratorOptions( - generateArray = true, - generateStruct = true, - generateNegativeZero = false, - // override base date due to known issues with experimental scans - baseDate = - new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime) - ParquetGenerator.makeParquetFile(random, spark, filename, 1000, options) + val schemaGenOptions = + SchemaGenOptions(generateArray = true, generateStruct = true) + val dataGenOptions = DataGenOptions( + generateNegativeZero = false, + // override base date due to known issues with experimental scans + baseDate = + new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 1000, + schemaGenOptions, + dataGenOptions) } } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 1a4196710a..006112d2b0 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.isComplexType -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometFuzzTestSuite extends CometFuzzTestBase { @@ -261,11 +261,10 @@ class CometFuzzTestSuite extends CometFuzzTestBase { generateArray: Boolean = true, generateStruct: Boolean = true): Unit = { - val options = - ParquetGeneratorOptions( - generateArray = generateArray, - generateStruct = generateStruct, - generateNegativeZero = false) + val schemaGenOptions = + SchemaGenOptions(generateArray = generateArray, generateStruct = generateStruct) + + val dataGenOptions = DataGenOptions(generateNegativeZero = false) withTempPath { filename => val random = new Random(42) @@ -273,7 +272,13 @@ class CometFuzzTestSuite extends CometFuzzTestBase { CometConf.COMET_ENABLED.key -> "false", SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outputTimestampType.toString, SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { - ParquetGenerator.makeParquetFile(random, spark, filename.toString, 100, options) + ParquetGenerator.makeParquetFile( + random, + spark, + filename.toString, + 100, + schemaGenOptions, + dataGenOptions) } Seq(defaultTimezone, "UTC", "America/Denver").foreach { tz => diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index a9eb057f82..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -108,13 +108,16 @@ class CometMapExpressionSuite extends CometTestBase { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetGeneratorOptions( - allowNull = false, - generateNegativeZero = false, - generateArray = true, - generateStruct = false, - generateMap = false) - ParquetGenerator.makeParquetFile(random, spark, filename, 100, options) + val schemaGenOptions = + SchemaGenOptions(generateArray = true, generateStruct = false, generateMap = false) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) } spark.read.parquet(filename).createOrReplaceTempView("t1") val df = spark.sql("SELECT map_from_arrays(array(c12), array(c3)) FROM t1") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index f64d8b02cd..211cc16d05 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.functions.{avg, count_distinct, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} /** * Test suite dedicated to Comet native aggregate operator @@ -50,7 +50,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { spark, filename, 10000, - ParquetGeneratorOptions()) + SchemaGenOptions(), + DataGenOptions()) } val tableName = "avg_decimal" withTable(tableName) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0fcc9c68e3..1b15c39cac 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -50,7 +50,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.{CometConf, ExtendedExplainInfo} import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} -import org.apache.comet.testing.{ParquetGenerator, ParquetGeneratorOptions} +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometExecSuite extends CometTestBase { @@ -2052,13 +2052,16 @@ class CometExecSuite extends CometTestBase { val filename = path.toString val random = new Random(42) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val options = ParquetGeneratorOptions( - allowNull = true, - generateNegativeZero = true, - generateArray = true, - generateStruct = true, - generateMap = true) - ParquetGenerator.makeParquetFile(random, spark, filename, 100, options) + val schemaGenOptions = + SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = true) + val dataGenOptions = DataGenOptions(allowNull = true, generateNegativeZero = true) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) } withSQLConf( CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", From aeca95bed6330f85a43bc3ee12b4877cf107f9a5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 12:04:50 -0600 Subject: [PATCH 06/22] fall back to Spark if lpad/rpad pad argument is not a literal --- .../org/apache/comet/serde/strings.scala | 29 +++++++++---------- .../comet/testing/FuzzDataGenerator.scala | 7 +++-- .../comet/CometStringExpressionSuite.scala | 26 +++++++++++++++++ 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index c6f5a85089..e9ca91920a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -162,6 +162,13 @@ object CometRLike extends CometExpressionSerde[RLike] { object CometStringRPad extends CometExpressionSerde[StringRPad] { + override def getSupportLevel(expr: StringRPad): SupportLevel = { + if (!expr.pad.isInstanceOf[Literal]) { + return Unsupported(Some("Only scalar values are supported for the pad argument")) + } + Compatible() + } + override def convert( expr: StringRPad, inputs: Seq[Attribute], @@ -177,21 +184,13 @@ object CometStringRPad extends CometExpressionSerde[StringRPad] { object CometStringLPad extends CometExpressionSerde[StringLPad] { - /** - * Convert a Spark expression into a protocol buffer representation that can be passed into - * native code. - * - * @param expr - * The Spark expression. - * @param inputs - * The input attributes. - * @param binding - * Whether the attributes are bound (this is only relevant in aggregate expressions). - * @return - * Protocol buffer representation, or None if the expression could not be converted. In this - * case it is expected that the input expression will have been tagged with reasons why it - * could not be converted. - */ + override def getSupportLevel(expr: StringLPad): SupportLevel = { + if (!expr.pad.isInstanceOf[Literal]) { + return Unsupported(Some("Only scalar values are supported for the pad argument")) + } + Compatible() + } + override def convert( expr: StringLPad, inputs: Seq[Attribute], diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index 087221e1a3..a2832838d8 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -194,8 +194,8 @@ object FuzzDataGenerator { case 1 => r.nextInt().toByte.toString case 2 => r.nextLong().toString case 3 => r.nextDouble().toString - case 4 => RandomStringUtils.randomAlphabetic(8) - case _ => r.nextString(8) + case 4 => RandomStringUtils.randomAlphabetic(options.maxStringLength) + case _ => r.nextString(options.maxStringLength) } }) case DataTypes.BinaryType => @@ -247,4 +247,5 @@ case class SchemaGenOptions( case class DataGenOptions( allowNull: Boolean = true, generateNegativeZero: Boolean = true, - baseDate: Long = FuzzDataGenerator.defaultBaseDate) + baseDate: Long = FuzzDataGenerator.defaultBaseDate, + maxStringLength: Int = 8) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 44d40cf1c1..4fcba47c43 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -19,12 +19,38 @@ package org.apache.comet +import scala.util.Random + import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + +import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { + test("lpad") { + testPadding("lpad") + } + + test("rpad") { + testPadding("rpad") + } + + private def testPadding(expr: String): Unit = { + val r = new Random() + val schema = StructType( + Seq( + StructField("str", DataTypes.StringType, true), + StructField("pad", DataTypes.StringType, true))) + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 100, DataGenOptions()) + df.createOrReplaceTempView("t1") + + checkSparkAnswer(s"SELECT str, $expr(str, 4, pad) FROM t1 ORDER BY str, pad") + checkSparkAnswerAndOperator(s"SELECT str, $expr(str, 4, 'x') FROM t1 ORDER BY str") + } + test("Various String scalar functions") { val table = "names" withTable(table) { From cb536a832a9e54e57e52ee1be347c0e3724c6393 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:05:07 -0600 Subject: [PATCH 07/22] save progress --- .../char_varchar_utils/read_side_padding.rs | 22 +++++++++++-------- .../comet/CometStringExpressionSuite.scala | 8 +++++-- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs index d969b6279b..6891ab40b6 100644 --- a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs +++ b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs @@ -204,15 +204,19 @@ fn spark_read_side_padding_internal( ); for (string, length) in string_array.iter().zip(int_pad_array) { - match string { - Some(string) => builder.append_value(add_padding_string( - string.parse().unwrap(), - length.unwrap() as usize, - truncate, - pad_string, - is_left_pad, - )?), - _ => builder.append_null(), + if length.unwrap() < 0 { + builder.append_null(); + } else { + match string { + Some(string) => builder.append_value(add_padding_string( + string.parse().unwrap(), + length.unwrap() as usize, + truncate, + pad_string, + is_left_pad, + )?), + _ => builder.append_null(), + } } } Ok(ColumnarValue::Array(Arc::new(builder.finish()))) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 4fcba47c43..687c75a45a 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -43,12 +43,16 @@ class CometStringExpressionSuite extends CometTestBase { val schema = StructType( Seq( StructField("str", DataTypes.StringType, true), + StructField("len", DataTypes.IntegerType, true), StructField("pad", DataTypes.StringType, true))) val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 100, DataGenOptions()) df.createOrReplaceTempView("t1") - checkSparkAnswer(s"SELECT str, $expr(str, 4, pad) FROM t1 ORDER BY str, pad") - checkSparkAnswerAndOperator(s"SELECT str, $expr(str, 4, 'x') FROM t1 ORDER BY str") + // we expect Comet to fall back to Spark if the pad argument is not a literal +// checkSparkAnswer(s"SELECT str, $expr(str, 4, pad) FROM t1 ORDER BY str, pad") +// checkSparkAnswerAndOperator(s"SELECT str, $expr(str, 4, 'x') FROM t1 ORDER BY str") + checkSparkAnswerAndOperator(s"SELECT str, len % 10, $expr(str, len % 10, 'x') FROM t1 ORDER BY str, len") +// checkSparkAnswerAndOperator(s"SELECT len, len % 10 FROM t1 ORDER BY len") } test("Various String scalar functions") { From 3bda19cf9dafb46e12f27590d9b909d6a6dc422e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:19:57 -0600 Subject: [PATCH 08/22] fix --- .../char_varchar_utils/read_side_padding.rs | 27 ++++++++++--------- .../comet/testing/FuzzDataGenerator.scala | 2 +- .../comet/CometStringExpressionSuite.scala | 3 ++- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs index 6891ab40b6..07dee1e7bf 100644 --- a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs +++ b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs @@ -204,19 +204,22 @@ fn spark_read_side_padding_internal( ); for (string, length) in string_array.iter().zip(int_pad_array) { - if length.unwrap() < 0 { - builder.append_null(); - } else { - match string { - Some(string) => builder.append_value(add_padding_string( - string.parse().unwrap(), - length.unwrap() as usize, - truncate, - pad_string, - is_left_pad, - )?), - _ => builder.append_null(), + let length = length.unwrap(); + match string { + Some(string) => { + if length < 0 { + builder.append_value("".to_owned()); + } else { + builder.append_value(add_padding_string( + string.parse().unwrap(), + length as usize, + truncate, + pad_string, + is_left_pad, + )?) + } } + _ => builder.append_null(), } } Ok(ColumnarValue::Array(Arc::new(builder.finish()))) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index d4b12c7c10..a2832838d8 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -248,4 +248,4 @@ case class DataGenOptions( allowNull: Boolean = true, generateNegativeZero: Boolean = true, baseDate: Long = FuzzDataGenerator.defaultBaseDate, - maxStringLength: Int = 8) \ No newline at end of file + maxStringLength: Int = 8) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 687c75a45a..bfa31247d9 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -51,7 +51,8 @@ class CometStringExpressionSuite extends CometTestBase { // we expect Comet to fall back to Spark if the pad argument is not a literal // checkSparkAnswer(s"SELECT str, $expr(str, 4, pad) FROM t1 ORDER BY str, pad") // checkSparkAnswerAndOperator(s"SELECT str, $expr(str, 4, 'x') FROM t1 ORDER BY str") - checkSparkAnswerAndOperator(s"SELECT str, len % 10, $expr(str, len % 10, 'x') FROM t1 ORDER BY str, len") + checkSparkAnswerAndOperator( + s"SELECT str, len % 10, $expr(str, len % 10, 'x') FROM t1 ORDER BY str, len") // checkSparkAnswerAndOperator(s"SELECT len, len % 10 FROM t1 ORDER BY len") } From 7adfff416167f51ffc4c04d2ebe01869c9a2a056 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:32:20 -0600 Subject: [PATCH 09/22] fix --- .../org/apache/comet/serde/strings.scala | 6 +++++ .../comet/CometStringExpressionSuite.scala | 25 +++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index e9ca91920a..3d4bacfa26 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -163,6 +163,9 @@ object CometRLike extends CometExpressionSerde[RLike] { object CometStringRPad extends CometExpressionSerde[StringRPad] { override def getSupportLevel(expr: StringRPad): SupportLevel = { + if (expr.str.isInstanceOf[Literal]) { + return Unsupported(Some("Scalar values are not supported for the str argument")) + } if (!expr.pad.isInstanceOf[Literal]) { return Unsupported(Some("Only scalar values are supported for the pad argument")) } @@ -185,6 +188,9 @@ object CometStringRPad extends CometExpressionSerde[StringRPad] { object CometStringLPad extends CometExpressionSerde[StringLPad] { override def getSupportLevel(expr: StringLPad): SupportLevel = { + if (expr.str.isInstanceOf[Literal]) { + return Unsupported(Some("Scalar values are not supported for the str argument")) + } if (!expr.pad.isInstanceOf[Literal]) { return Unsupported(Some("Only scalar values are supported for the pad argument")) } diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index bfa31247d9..95f99d830c 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -20,13 +20,13 @@ package org.apache.comet import scala.util.Random - import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} - import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} +import org.apache.spark.sql.comet.CometProjectExec +import org.apache.spark.sql.execution.ProjectExec class CometStringExpressionSuite extends CometTestBase { @@ -48,12 +48,21 @@ class CometStringExpressionSuite extends CometTestBase { val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 100, DataGenOptions()) df.createOrReplaceTempView("t1") - // we expect Comet to fall back to Spark if the pad argument is not a literal -// checkSparkAnswer(s"SELECT str, $expr(str, 4, pad) FROM t1 ORDER BY str, pad") -// checkSparkAnswerAndOperator(s"SELECT str, $expr(str, 4, 'x') FROM t1 ORDER BY str") - checkSparkAnswerAndOperator( - s"SELECT str, len % 10, $expr(str, len % 10, 'x') FROM t1 ORDER BY str, len") -// checkSparkAnswerAndOperator(s"SELECT len, len % 10 FROM t1 ORDER BY len") + // test all combinations of scalar and array arguments + for (str <- Seq("'hello'", "str")) { + for (len <- Seq("6", "len % 10")) { + for (pad <- Seq("'x'", "pad")) { + val sql = s"SELECT $str, $len, $expr($str, $len, $pad) FROM t1 ORDER BY str, len, pad" + if (str == "'hello'" || pad == "pad") { + // Comet does not support literal for str argument + // Comet only supports literals for pad argument + checkSparkAnswer(sql) + } else { + checkSparkAnswerAndOperator(sql) + } + } + } + } } test("Various String scalar functions") { From fb3cdebf694ea93a937a8a30e18813e7e1bdd376 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:38:35 -0600 Subject: [PATCH 10/22] remove old tests --- .../apache/comet/CometExpressionSuite.scala | 62 ------------------- .../comet/CometStringExpressionSuite.scala | 6 +- 2 files changed, 3 insertions(+), 65 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 1eca17dccc..ddbe7d14e2 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -414,41 +414,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } - test("Verify rpad expr support for second arg instead of just literal") { - val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("తెలుగు", 2)) - withParquetTable(data, "t1") { - val res = sql("select rpad(_1,_2) , rpad(_1,2) from t1 order by _1") - checkSparkAnswerAndOperator(res) - } - } - - test("RPAD with character support other than default space") { - val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("hi", 2)) - withParquetTable(data, "t1") { - val res = sql( - """ select rpad(_1,_2,'?'), rpad(_1,_2,'??') , rpad(_1,2, '??'), hex(rpad(unhex('aabb'), 5)), - rpad(_1, 5, '??') from t1 order by _1 """.stripMargin) - checkSparkAnswerAndOperator(res) - } - } - - test("test lpad expression support") { - val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("తెలుగు", 2)) - withParquetTable(data, "t1") { - val res = sql("select lpad(_1,_2) , lpad(_1,2) from t1 order by _1") - checkSparkAnswerAndOperator(res) - } - } - - test("LPAD with character support other than default space") { - val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("hi", 2)) - withParquetTable(data, "t1") { - val res = sql( - """ select lpad(_1,_2,'?'), lpad(_1,_2,'??') , lpad(_1,2, '??'), hex(lpad(unhex('aabb'), 5)), - rpad(_1, 5, '??') from t1 order by _1 """.stripMargin) - checkSparkAnswerAndOperator(res) - } - } test("dictionary arithmetic") { // TODO: test ANSI mode @@ -2292,33 +2257,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("rpad") { - val table = "rpad" - val gen = new DataGenerator(new Random(42)) - withTable(table) { - // generate some data - val dataChars = "abc123" - sql(s"create table $table(id int, name1 char(8), name2 varchar(8)) using parquet") - val testData = gen.generateStrings(100, dataChars, 6) ++ Seq( - "é", // unicode 'e\\u{301}' - "é" // unicode '\\u{e9}' - ) - testData.zipWithIndex.foreach { x => - sql(s"insert into $table values(${x._2}, '${x._1}', '${x._1}')") - } - // test 2-arg version - checkSparkAnswerAndOperator( - s"SELECT id, rpad(name1, 10), rpad(name2, 10) FROM $table ORDER BY id") - // test 3-arg version - for (length <- Seq(2, 10)) { - checkSparkAnswerAndOperator( - s"SELECT id, name1, rpad(name1, $length, ' ') FROM $table ORDER BY id") - checkSparkAnswerAndOperator( - s"SELECT id, name2, rpad(name2, $length, ' ') FROM $table ORDER BY id") - } - } - } - test("isnan") { Seq("true", "false").foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary) { diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 95f99d830c..1ec96ea397 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -20,13 +20,13 @@ package org.apache.comet import scala.util.Random + import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} -import org.apache.spark.sql.comet.CometProjectExec -import org.apache.spark.sql.execution.ProjectExec class CometStringExpressionSuite extends CometTestBase { @@ -39,7 +39,7 @@ class CometStringExpressionSuite extends CometTestBase { } private def testPadding(expr: String): Unit = { - val r = new Random() + val r = new Random(42) val schema = StructType( Seq( StructField("str", DataTypes.StringType, true), From d8ac26a35775d18293b503f7cb7f7fdd718cfa90 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:41:12 -0600 Subject: [PATCH 11/22] test 2-arg version --- .../apache/comet/CometStringExpressionSuite.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 1ec96ea397..6ea4c1824a 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -51,9 +51,16 @@ class CometStringExpressionSuite extends CometTestBase { // test all combinations of scalar and array arguments for (str <- Seq("'hello'", "str")) { for (len <- Seq("6", "len % 10")) { - for (pad <- Seq("'x'", "pad")) { - val sql = s"SELECT $str, $len, $expr($str, $len, $pad) FROM t1 ORDER BY str, len, pad" - if (str == "'hello'" || pad == "pad") { + for (pad <- Seq(Some("'x'"), Some("'zzz'"), Some("pad"), None)) { + val sql = pad match { + case Some(p) => + // 3 args + s"SELECT $str, $len, $expr($str, $len, $p) FROM t1 ORDER BY str, len, pad" + case _ => + // 2 args (default pad of ' ') + s"SELECT $str, $len, $expr($str, $len) FROM t1 ORDER BY str, len, pad" + } + if (str == "'hello'" || pad.contains("pad")) { // Comet does not support literal for str argument // Comet only supports literals for pad argument checkSparkAnswer(sql) From e1f78499b9021d12e60d7b00c110b6e0ccb2be9c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:43:34 -0600 Subject: [PATCH 12/22] max string length --- .../org/apache/comet/CometStringExpressionSuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 6ea4c1824a..f8e6d2f9da 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -45,7 +45,12 @@ class CometStringExpressionSuite extends CometTestBase { StructField("str", DataTypes.StringType, true), StructField("len", DataTypes.IntegerType, true), StructField("pad", DataTypes.StringType, true))) - val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 100, DataGenOptions()) + val df = FuzzDataGenerator.generateDataFrame( + r, + spark, + schema, + 100, + DataGenOptions(maxStringLength = 6)) df.createOrReplaceTempView("t1") // test all combinations of scalar and array arguments From 99518850e7138b26c2255c27ebbc982a97599c03 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:49:48 -0600 Subject: [PATCH 13/22] test negative literal length --- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index f8e6d2f9da..e2046db46f 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -55,7 +55,7 @@ class CometStringExpressionSuite extends CometTestBase { // test all combinations of scalar and array arguments for (str <- Seq("'hello'", "str")) { - for (len <- Seq("6", "len % 10")) { + for (len <- Seq("6", "-6", "0", "len % 10")) { for (pad <- Seq(Some("'x'"), Some("'zzz'"), Some("pad"), None)) { val sql = pad match { case Some(p) => From ed3648e998f3f9480985820cfc1b5a907996b469 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:55:00 -0600 Subject: [PATCH 14/22] extra unicode chars --- .../scala/org/apache/comet/testing/FuzzDataGenerator.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index a2832838d8..e1be4e1493 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -44,6 +44,11 @@ object FuzzDataGenerator { val defaultBaseDate: Long = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("3333-05-25 12:34:56").getTime + val unicodeSpecialChars: String = Seq( + "é", // unicode 'e\\u{301}' + "é" // unicode '\\u{e9}' + ).mkString + def generateSchema(options: SchemaGenOptions): StructType = { val primitiveTypes = options.primitiveTypes val dataTypes = ListBuffer[DataType]() @@ -195,6 +200,7 @@ object FuzzDataGenerator { case 2 => r.nextLong().toString case 3 => r.nextDouble().toString case 4 => RandomStringUtils.randomAlphabetic(options.maxStringLength) + case 5 => unicodeSpecialChars case _ => r.nextString(options.maxStringLength) } }) From d0efb261f91a04a037155d97b6b95f64aaba6ae4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 13:58:37 -0600 Subject: [PATCH 15/22] more unicode --- .../scala/org/apache/comet/testing/FuzzDataGenerator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index e1be4e1493..3c13822020 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -46,7 +46,8 @@ object FuzzDataGenerator { val unicodeSpecialChars: String = Seq( "é", // unicode 'e\\u{301}' - "é" // unicode '\\u{e9}' + "é", // unicode '\\u{e9}' + "తెలుగు" ).mkString def generateSchema(options: SchemaGenOptions): StructType = { From ca151c263e8d6911c9afa94cfbeb2f5cc6fe2163 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 14:07:08 -0600 Subject: [PATCH 16/22] clippy --- .../src/static_invoke/char_varchar_utils/read_side_padding.rs | 2 +- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs index 07dee1e7bf..a0f83fefa7 100644 --- a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs +++ b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs @@ -208,7 +208,7 @@ fn spark_read_side_padding_internal( match string { Some(string) => { if length < 0 { - builder.append_value("".to_owned()); + builder.append_value(""); } else { builder.append_value(add_padding_string( string.parse().unwrap(), diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index e2046db46f..e002029662 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -49,7 +49,7 @@ class CometStringExpressionSuite extends CometTestBase { r, spark, schema, - 100, + 1000, DataGenOptions(maxStringLength = 6)) df.createOrReplaceTempView("t1") From 40130f4032c778a41e145e376eec3549394d39d9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 15:06:34 -0600 Subject: [PATCH 17/22] scalastyle --- .../main/scala/org/apache/comet/testing/FuzzDataGenerator.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index 3c13822020..68eef48c33 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -44,11 +44,13 @@ object FuzzDataGenerator { val defaultBaseDate: Long = new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("3333-05-25 12:34:56").getTime + // scalastyle:off val unicodeSpecialChars: String = Seq( "é", // unicode 'e\\u{301}' "é", // unicode '\\u{e9}' "తెలుగు" ).mkString + // scalastyle:on def generateSchema(options: SchemaGenOptions): StructType = { val primitiveTypes = options.primitiveTypes From 6a7f4eb41d0de744174fd66cfc6584d2b3290b43 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 22 Oct 2025 15:33:58 -0600 Subject: [PATCH 18/22] scalastyle --- .../scala/org/apache/comet/testing/FuzzDataGenerator.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala index 68eef48c33..bec26b727d 100644 --- a/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala +++ b/spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala @@ -48,8 +48,7 @@ object FuzzDataGenerator { val unicodeSpecialChars: String = Seq( "é", // unicode 'e\\u{301}' "é", // unicode '\\u{e9}' - "తెలుగు" - ).mkString + "తెలుగు").mkString // scalastyle:on def generateSchema(options: SchemaGenOptions): StructType = { From af0bb81e0687795339983dfee367645f7ed9a214 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 23 Oct 2025 15:38:50 -0600 Subject: [PATCH 19/22] fix --- .../org/apache/comet/CometStringExpressionSuite.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index d00fad5b25..757f39d9a1 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -71,11 +71,18 @@ class CometStringExpressionSuite extends CometTestBase { // 2 args (default pad of ' ') s"SELECT $str, $len, $expr($str, $len) FROM t1 ORDER BY str, len, pad" } - if (str == "'hello'") { + val isLiteralStr = str == "'hello'" + val isLiteralLen = !len.contains("len") + val isLiteralPad = !pad.contains("pad") + if (isLiteralStr && isLiteralLen && isLiteralPad) { + // all arguments are literal, so Spark constant folding will kick in + // and pad function will not be evaluated by Comet + checkSparkAnswer(sql) + } else if (isLiteralStr) { checkSparkAnswerAndFallbackReason( sql, "Scalar values are not supported for the str argument") - } else if (pad.contains("pad")) { + } else if (!isLiteralPad) { checkSparkAnswerAndFallbackReason( sql, "Only scalar values are supported for the pad argument") From 27cc67e649672f53b3bbd707718192190caa99aa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 23 Oct 2025 15:39:05 -0600 Subject: [PATCH 20/22] fix --- .../scala/org/apache/comet/CometStringExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index 757f39d9a1..a04c66903b 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -46,7 +46,7 @@ class CometStringExpressionSuite extends CometTestBase { StructField("len", DataTypes.IntegerType, nullable = true), StructField("pad", DataTypes.StringType, nullable = true))) // scalastyle:off - val customStrings = Seq( + val edgeCases = Seq( "é", // unicode 'e\\u{301}' "é", // unicode '\\u{e9}' "తెలుగు") @@ -56,7 +56,7 @@ class CometStringExpressionSuite extends CometTestBase { spark, schema, 1000, - DataGenOptions(maxStringLength = 6, customStrings = customStrings)) + DataGenOptions(maxStringLength = 6, customStrings = edgeCases)) df.createOrReplaceTempView("t1") // test all combinations of scalar and array arguments From 170b7001e522329ac18bbc3d8d76b6f00514cf17 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 23 Oct 2025 16:15:02 -0600 Subject: [PATCH 21/22] address feedback --- .../static_invoke/char_varchar_utils/read_side_padding.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs index a0f83fefa7..89485ddec4 100644 --- a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs +++ b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs @@ -207,9 +207,7 @@ fn spark_read_side_padding_internal( let length = length.unwrap(); match string { Some(string) => { - if length < 0 { - builder.append_value(""); - } else { + if length >= 0 { builder.append_value(add_padding_string( string.parse().unwrap(), length as usize, @@ -217,6 +215,8 @@ fn spark_read_side_padding_internal( pad_string, is_left_pad, )?) + } else { + builder.append_value(""); } } _ => builder.append_null(), From a2655951511d67b182ba18a71f16a14256de9959 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 23 Oct 2025 17:53:55 -0600 Subject: [PATCH 22/22] add tests for binary input --- .../comet/CometStringExpressionSuite.scala | 62 +++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala index a04c66903b..a63aba8da9 100644 --- a/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala @@ -30,15 +30,23 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} class CometStringExpressionSuite extends CometTestBase { - test("lpad") { - testPadding("lpad") + test("lpad string") { + testStringPadding("lpad") } - test("rpad") { - testPadding("rpad") + test("rpad string") { + testStringPadding("rpad") } - private def testPadding(expr: String): Unit = { + test("lpad binary") { + testBinaryPadding("lpad") + } + + test("rpad binary") { + testBinaryPadding("rpad") + } + + private def testStringPadding(expr: String): Unit = { val r = new Random(42) val schema = StructType( Seq( @@ -94,6 +102,50 @@ class CometStringExpressionSuite extends CometTestBase { } } + private def testBinaryPadding(expr: String): Unit = { + val r = new Random(42) + val schema = StructType( + Seq( + StructField("str", DataTypes.BinaryType, nullable = true), + StructField("len", DataTypes.IntegerType, nullable = true), + StructField("pad", DataTypes.BinaryType, nullable = true))) + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) + df.createOrReplaceTempView("t1") + + // test all combinations of scalar and array arguments + for (str <- Seq("unhex('DDEEFF')", "str")) { + // Spark does not support negative length for lpad/rpad with binary input and Comet does + // not support abs yet, so use `10 + len % 10` to avoid negative length + for (len <- Seq("6", "0", "10 + len % 10")) { + for (pad <- Seq(Some("unhex('CAFE')"), Some("pad"), None)) { + + val sql = pad match { + case Some(p) => + // 3 args + s"SELECT $str, $len, $expr($str, $len, $p) FROM t1 ORDER BY str, len, pad" + case _ => + // 2 args (default pad of ' ') + s"SELECT $str, $len, $expr($str, $len) FROM t1 ORDER BY str, len, pad" + } + + val isLiteralStr = str != "str" + val isLiteralLen = !len.contains("len") + val isLiteralPad = !pad.contains("pad") + + if (isLiteralStr && isLiteralLen && isLiteralPad) { + // all arguments are literal, so Spark constant folding will kick in + // and pad function will not be evaluated by Comet + checkSparkAnswer(sql) + } else { + // Comet will fall back to Spark because the plan contains a staticinvoke instruction + // which is not supported + checkSparkAnswerAndFallbackReason(sql, "staticinvoke is not supported") + } + } + } + } + } + test("Various String scalar functions") { val table = "names" withTable(table) {