diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala index c6fe55b56b..a513aa1a77 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala @@ -19,11 +19,8 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.types._ -import org.apache.comet.CometConf - /** * Benchmark to measure Comet expression evaluation performance. To run this benchmark: * `SPARK_GENERATE_BENCHMARK_FILES=1 make @@ -35,10 +32,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { def integerArithmeticBenchmark(values: Int, op: BinaryOp, useDictionary: Boolean): Unit = { val dataType = IntegerType - val benchmark = new Benchmark( - s"Binary op ${dataType.sql}, dictionary = $useDictionary", - values, - output = output) withTempPath { dir => withTempTable(table) { @@ -48,25 +41,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { s"SELECT CAST(value AS ${dataType.sql}) AS c1, " + s"CAST(value AS ${dataType.sql}) c2 FROM $tbl")) - benchmark.addCase(s"$op ($dataType) - Spark") { _ => - spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() - } - - benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() - } - } - - benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true") { - spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() - } - } + val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary" + val query = s"SELECT c1 ${op.sig} c2 FROM $table" - benchmark.run() + runExpressionBenchmark(name, values, query) } } } @@ -76,10 +54,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { dataType: DecimalType, op: BinaryOp, useDictionary: Boolean): Unit = { - val benchmark = new Benchmark( - s"Binary op ${dataType.sql}, dictionary = $useDictionary", - values, - output = output) val df = makeDecimalDataFrame(values, dataType, useDictionary) withTempPath { dir => @@ -87,25 +61,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { df.createOrReplaceTempView(tbl) prepareTable(dir, spark.sql(s"SELECT dec AS c1, dec AS c2 FROM $tbl")) - benchmark.addCase(s"$op ($dataType) - Spark") { _ => - spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() - } - - benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() - } - } - - benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true") { - spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() - } - } + val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary" + val query = s"SELECT c1 ${op.sig} c2 FROM $table" - benchmark.run() + runExpressionBenchmark(name, values, query) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index 5ee787ad97..8d56cefa05 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -110,6 +110,54 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { benchmark.run() } + /** + * Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec). + * This provides a consistent benchmark structure for expression evaluation. + * + * @param name + * Benchmark name + * @param cardinality + * Number of rows being processed + * @param query + * SQL query to benchmark + * @param extraCometConfigs + * Additional configurations to apply for Comet cases (optional) + */ + final def runExpressionBenchmark( + name: String, + cardinality: Long, + query: String, + extraCometConfigs: Map[String, String] = Map.empty): Unit = { + val benchmark = new Benchmark(name, cardinality, output = output) + + benchmark.addCase("Spark") { _ => + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark.sql(query).noop() + } + } + + benchmark.addCase("Comet (Scan)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "false") { + spark.sql(query).noop() + } + } + + val cometExecConfigs = Map( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + "spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs + + benchmark.addCase("Comet (Scan + Exec)") { _ => + withSQLConf(cometExecConfigs.toSeq: _*) { + spark.sql(query).noop() + } + } + + benchmark.run() + } + protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { val testDf = if (partition.isDefined) { df.write.partitionBy(partition.get) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala index b2212dfd06..975abd632f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala @@ -19,14 +19,10 @@ package org.apache.spark.sql.benchmark -import scala.util.Try - -import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, LongType} -import org.apache.comet.CometConf import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.{Compatible, Incompatible, Unsupported} @@ -81,48 +77,20 @@ object CometCastBenchmark extends CometBenchmarkBase { toDataType: DataType, isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}", - values, - output = output) - withTempPath { dir => withTempTable("parquetV1Table") { prepareTable(dir, spark.sql(s"SELECT value FROM $tbl")) + val functionSQL = castExprSQL(toDataType, "value") val query = s"SELECT $functionSQL FROM parquetV1Table" + val name = + s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}" - benchmark.addCase( - s"SQL Parquet - Spark Cast expr from ${fromDataType.sql} to : ${toDataType.sql} , " + - s"ansi mode enabled : ${isAnsiMode}") { _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - if (isAnsiMode) { - Try { spark.sql(query).noop() } - } else { - spark.sql(query).noop() - } - } - } + val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) - benchmark.addCase( - s"SQL Parquet - Comet Cast expr from ${fromDataType.sql} to : ${toDataType.sql} , " + - s"ansi mode enabled : ${isAnsiMode}") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - if (isAnsiMode) { - Try { spark.sql(query).noop() } - } else { - spark.sql(query).noop() - } - } - } - benchmark.run() + runExpressionBenchmark(name, values, query, extraConfigs) } } - } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala index 0dddfb36a5..c5eb9ea390 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala @@ -19,10 +19,6 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.benchmark.Benchmark - -import org.apache.comet.CometConf - /** * Benchmark to measure Comet execution performance. To run this benchmark: * `SPARK_GENERATE_BENCHMARK_FILES=1 make @@ -32,8 +28,6 @@ import org.apache.comet.CometConf object CometConditionalExpressionBenchmark extends CometBenchmarkBase { def caseWhenExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("Case When Expr", values, output = output) - withTempPath { dir => withTempTable("parquetV1Table") { prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl")) @@ -41,56 +35,19 @@ object CometConditionalExpressionBenchmark extends CometBenchmarkBase { val query = "select CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0' END from parquetV1Table" - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(query).noop() - } - - benchmark.addCase("SQL Parquet - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.run() + runExpressionBenchmark("Case When Expr", values, query) } } } def ifExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("If Expr", values, output = output) - withTempPath { dir => withTempTable("parquetV1Table") { prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl")) - val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table" - - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(query).noop() - } - benchmark.addCase("SQL Parquet - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } + val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table" - benchmark.run() + runExpressionBenchmark("If Expr", values, query) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 0af1ecade5..47eff41bbd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -39,9 +39,9 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level => val isDictionary = if (useDictionary) "(Dictionary)" else "" - runWithComet(s"Date Truncate $isDictionary - $level", values) { - spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop() - } + val name = s"Date Truncate $isDictionary - $level" + val query = s"select trunc(dt, '$level') from parquetV1Table" + runExpressionBenchmark(name, values, query) } } } @@ -68,9 +68,9 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { "WEEK", "QUARTER").foreach { level => val isDictionary = if (useDictionary) "(Dictionary)" else "" - runWithComet(s"Timestamp Truncate $isDictionary - $level", values) { - spark.sql(s"select date_trunc('$level', ts) from parquetV1Table").noop() - } + val name = s"Timestamp Truncate $isDictionary - $level" + val query = s"select date_trunc('$level', ts) from parquetV1Table" + runExpressionBenchmark(name, values, query) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala index e8bd00bd9c..5b4741ba68 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.expressions.JsonToStructs import org.apache.comet.CometConf @@ -54,8 +53,6 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase { * Generic method to run a JSON expression benchmark with the given configuration. */ def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = { - val benchmark = new Benchmark(config.name, values, output = output) - withTempPath { dir => withTempTable("parquetV1Table") { // Generate data with specified JSON patterns @@ -119,31 +116,11 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase { prepareTable(dir, jsonData) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(config.query).noop() - } - - benchmark.addCase("SQL Parquet - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(config.query).noop() - } - } - - benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => - val baseConfigs = - Map( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true", - "spark.sql.optimizer.constantFolding.enabled" -> "false") - val allConfigs = baseConfigs ++ config.extraCometConfigs - - withSQLConf(allConfigs.toSeq: _*) { - spark.sql(config.query).noop() - } - } + val extraConfigs = Map( + CometConf.getExprAllowIncompatConfigKey( + classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs - benchmark.run() + runExpressionBenchmark(config.name, values, config.query, extraConfigs) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala index 2ca924821c..6506c5665d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala @@ -19,10 +19,6 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.benchmark.Benchmark - -import org.apache.comet.CometConf - /** * Benchmark to measure Comet execution performance. To run this benchmark: * `SPARK_GENERATE_BENCHMARK_FILES=1 make @@ -32,8 +28,6 @@ import org.apache.comet.CometConf object CometPredicateExpressionBenchmark extends CometBenchmarkBase { def inExprBenchmark(values: Int): Unit = { - val benchmark = new Benchmark("in Expr", values, output = output) - withTempPath { dir => withTempTable("parquetV1Table") { prepareTable( @@ -41,27 +35,10 @@ object CometPredicateExpressionBenchmark extends CometBenchmarkBase { spark.sql( "select CASE WHEN value < 0 THEN 'negative'" + s" WHEN value = 0 THEN 'zero' ELSE 'positive' END c1 from $tbl")) - val query = "select * from parquetV1Table where c1 in ('positive', 'zero')" - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(query).noop() - } - - benchmark.addCase("SQL Parquet - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } + val query = "select * from parquetV1Table where c1 in ('positive', 'zero')" - benchmark.run() + runExpressionBenchmark("in Expr", values, query) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index d1ed8702a7..41eabb8513 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.benchmark.Benchmark - import org.apache.comet.CometConf /** @@ -50,37 +48,14 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { * Generic method to run a string expression benchmark with the given configuration. */ def runStringExprBenchmark(config: StringExprConfig, values: Int): Unit = { - val benchmark = new Benchmark(config.name, values, output = output) - withTempPath { dir => withTempTable("parquetV1Table") { prepareTable(dir, spark.sql(s"SELECT REPEAT(CAST(value AS STRING), 100) AS c1 FROM $tbl")) - benchmark.addCase("SQL Parquet - Spark") { _ => - spark.sql(config.query).noop() - } - - benchmark.addCase("SQL Parquet - Comet (Scan)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(config.query).noop() - } - } - - benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => - val baseConfigs = - Map( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true", - "spark.sql.optimizer.constantFolding.enabled" -> "false") - val allConfigs = baseConfigs ++ config.extraCometConfigs - - withSQLConf(allConfigs.toSeq: _*) { - spark.sql(config.query).noop() - } - } + val extraConfigs = + Map(CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") ++ config.extraCometConfigs - benchmark.run() + runExpressionBenchmark(config.name, values, config.query, extraConfigs) } } }