Skip to content

Commit f458088

Browse files
authored
minor: Refactor expression microbenchmarks to remove duplicate code (#2956)
1 parent a6cfadb commit f458088

8 files changed

+77
-216
lines changed

spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import org.apache.spark.benchmark.Benchmark
2322
import org.apache.spark.sql.types._
2423

25-
import org.apache.comet.CometConf
26-
2724
/**
2825
* Benchmark to measure Comet expression evaluation performance. To run this benchmark:
2926
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -35,10 +32,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
3532

3633
def integerArithmeticBenchmark(values: Int, op: BinaryOp, useDictionary: Boolean): Unit = {
3734
val dataType = IntegerType
38-
val benchmark = new Benchmark(
39-
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
40-
values,
41-
output = output)
4235

4336
withTempPath { dir =>
4437
withTempTable(table) {
@@ -48,25 +41,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
4841
s"SELECT CAST(value AS ${dataType.sql}) AS c1, " +
4942
s"CAST(value AS ${dataType.sql}) c2 FROM $tbl"))
5043

51-
benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
52-
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
53-
}
54-
55-
benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
56-
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
57-
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
58-
}
59-
}
60-
61-
benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
62-
withSQLConf(
63-
CometConf.COMET_ENABLED.key -> "true",
64-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
65-
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
66-
}
67-
}
44+
val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
45+
val query = s"SELECT c1 ${op.sig} c2 FROM $table"
6846

69-
benchmark.run()
47+
runExpressionBenchmark(name, values, query)
7048
}
7149
}
7250
}
@@ -76,36 +54,17 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
7654
dataType: DecimalType,
7755
op: BinaryOp,
7856
useDictionary: Boolean): Unit = {
79-
val benchmark = new Benchmark(
80-
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
81-
values,
82-
output = output)
8357
val df = makeDecimalDataFrame(values, dataType, useDictionary)
8458

8559
withTempPath { dir =>
8660
withTempTable(table) {
8761
df.createOrReplaceTempView(tbl)
8862
prepareTable(dir, spark.sql(s"SELECT dec AS c1, dec AS c2 FROM $tbl"))
8963

90-
benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
91-
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
92-
}
93-
94-
benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
95-
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
96-
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
97-
}
98-
}
99-
100-
benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
101-
withSQLConf(
102-
CometConf.COMET_ENABLED.key -> "true",
103-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
104-
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
105-
}
106-
}
64+
val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
65+
val query = s"SELECT c1 ${op.sig} c2 FROM $table"
10766

108-
benchmark.run()
67+
runExpressionBenchmark(name, values, query)
10968
}
11069
}
11170
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,54 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
110110
benchmark.run()
111111
}
112112

113+
/**
114+
* Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec).
115+
* This provides a consistent benchmark structure for expression evaluation.
116+
*
117+
* @param name
118+
* Benchmark name
119+
* @param cardinality
120+
* Number of rows being processed
121+
* @param query
122+
* SQL query to benchmark
123+
* @param extraCometConfigs
124+
* Additional configurations to apply for Comet cases (optional)
125+
*/
126+
final def runExpressionBenchmark(
127+
name: String,
128+
cardinality: Long,
129+
query: String,
130+
extraCometConfigs: Map[String, String] = Map.empty): Unit = {
131+
val benchmark = new Benchmark(name, cardinality, output = output)
132+
133+
benchmark.addCase("Spark") { _ =>
134+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
135+
spark.sql(query).noop()
136+
}
137+
}
138+
139+
benchmark.addCase("Comet (Scan)") { _ =>
140+
withSQLConf(
141+
CometConf.COMET_ENABLED.key -> "true",
142+
CometConf.COMET_EXEC_ENABLED.key -> "false") {
143+
spark.sql(query).noop()
144+
}
145+
}
146+
147+
val cometExecConfigs = Map(
148+
CometConf.COMET_ENABLED.key -> "true",
149+
CometConf.COMET_EXEC_ENABLED.key -> "true",
150+
"spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs
151+
152+
benchmark.addCase("Comet (Scan + Exec)") { _ =>
153+
withSQLConf(cometExecConfigs.toSeq: _*) {
154+
spark.sql(query).noop()
155+
}
156+
}
157+
158+
benchmark.run()
159+
}
160+
113161
protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
114162
val testDf = if (partition.isDefined) {
115163
df.write.partitionBy(partition.get)

spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import scala.util.Try
23-
24-
import org.apache.spark.benchmark.Benchmark
2522
import org.apache.spark.sql.SparkSession
2623
import org.apache.spark.sql.internal.SQLConf
2724
import org.apache.spark.sql.types.{DataType, LongType}
2825

29-
import org.apache.comet.CometConf
3026
import org.apache.comet.expressions.{CometCast, CometEvalMode}
3127
import org.apache.comet.serde.{Compatible, Incompatible, Unsupported}
3228

@@ -81,48 +77,20 @@ object CometCastBenchmark extends CometBenchmarkBase {
8177
toDataType: DataType,
8278
isAnsiMode: Boolean): Unit = {
8379

84-
val benchmark =
85-
new Benchmark(
86-
s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}",
87-
values,
88-
output = output)
89-
9080
withTempPath { dir =>
9181
withTempTable("parquetV1Table") {
9282
prepareTable(dir, spark.sql(s"SELECT value FROM $tbl"))
83+
9384
val functionSQL = castExprSQL(toDataType, "value")
9485
val query = s"SELECT $functionSQL FROM parquetV1Table"
86+
val name =
87+
s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}"
9588

96-
benchmark.addCase(
97-
s"SQL Parquet - Spark Cast expr from ${fromDataType.sql} to : ${toDataType.sql} , " +
98-
s"ansi mode enabled : ${isAnsiMode}") { _ =>
99-
withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
100-
if (isAnsiMode) {
101-
Try { spark.sql(query).noop() }
102-
} else {
103-
spark.sql(query).noop()
104-
}
105-
}
106-
}
89+
val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString)
10790

108-
benchmark.addCase(
109-
s"SQL Parquet - Comet Cast expr from ${fromDataType.sql} to : ${toDataType.sql} , " +
110-
s"ansi mode enabled : ${isAnsiMode}") { _ =>
111-
withSQLConf(
112-
CometConf.COMET_ENABLED.key -> "true",
113-
CometConf.COMET_EXEC_ENABLED.key -> "true",
114-
SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
115-
if (isAnsiMode) {
116-
Try { spark.sql(query).noop() }
117-
} else {
118-
spark.sql(query).noop()
119-
}
120-
}
121-
}
122-
benchmark.run()
91+
runExpressionBenchmark(name, values, query, extraConfigs)
12392
}
12493
}
125-
12694
}
12795

12896
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import org.apache.spark.benchmark.Benchmark
23-
24-
import org.apache.comet.CometConf
25-
2622
/**
2723
* Benchmark to measure Comet execution performance. To run this benchmark:
2824
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
@@ -32,65 +28,26 @@ import org.apache.comet.CometConf
3228
object CometConditionalExpressionBenchmark extends CometBenchmarkBase {
3329

3430
def caseWhenExprBenchmark(values: Int): Unit = {
35-
val benchmark = new Benchmark("Case When Expr", values, output = output)
36-
3731
withTempPath { dir =>
3832
withTempTable("parquetV1Table") {
3933
prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
4034

4135
val query =
4236
"select CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0' END from parquetV1Table"
4337

44-
benchmark.addCase("SQL Parquet - Spark") { _ =>
45-
spark.sql(query).noop()
46-
}
47-
48-
benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
49-
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
50-
spark.sql(query).noop()
51-
}
52-
}
53-
54-
benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
55-
withSQLConf(
56-
CometConf.COMET_ENABLED.key -> "true",
57-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
58-
spark.sql(query).noop()
59-
}
60-
}
61-
62-
benchmark.run()
38+
runExpressionBenchmark("Case When Expr", values, query)
6339
}
6440
}
6541
}
6642

6743
def ifExprBenchmark(values: Int): Unit = {
68-
val benchmark = new Benchmark("If Expr", values, output = output)
69-
7044
withTempPath { dir =>
7145
withTempTable("parquetV1Table") {
7246
prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
73-
val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"
74-
75-
benchmark.addCase("SQL Parquet - Spark") { _ =>
76-
spark.sql(query).noop()
77-
}
7847

79-
benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
80-
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
81-
spark.sql(query).noop()
82-
}
83-
}
84-
85-
benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
86-
withSQLConf(
87-
CometConf.COMET_ENABLED.key -> "true",
88-
CometConf.COMET_EXEC_ENABLED.key -> "true") {
89-
spark.sql(query).noop()
90-
}
91-
}
48+
val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"
9249

93-
benchmark.run()
50+
runExpressionBenchmark("If Expr", values, query)
9451
}
9552
}
9653
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
3939
s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl"))
4040
Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level =>
4141
val isDictionary = if (useDictionary) "(Dictionary)" else ""
42-
runWithComet(s"Date Truncate $isDictionary - $level", values) {
43-
spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop()
44-
}
42+
val name = s"Date Truncate $isDictionary - $level"
43+
val query = s"select trunc(dt, '$level') from parquetV1Table"
44+
runExpressionBenchmark(name, values, query)
4545
}
4646
}
4747
}
@@ -68,9 +68,9 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
6868
"WEEK",
6969
"QUARTER").foreach { level =>
7070
val isDictionary = if (useDictionary) "(Dictionary)" else ""
71-
runWithComet(s"Timestamp Truncate $isDictionary - $level", values) {
72-
spark.sql(s"select date_trunc('$level', ts) from parquetV1Table").noop()
73-
}
71+
val name = s"Timestamp Truncate $isDictionary - $level"
72+
val query = s"select date_trunc('$level', ts) from parquetV1Table"
73+
runExpressionBenchmark(name, values, query)
7474
}
7575
}
7676
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometJsonExpressionBenchmark.scala

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import org.apache.spark.benchmark.Benchmark
2322
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
2423

2524
import org.apache.comet.CometConf
@@ -54,8 +53,6 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
5453
* Generic method to run a JSON expression benchmark with the given configuration.
5554
*/
5655
def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = {
57-
val benchmark = new Benchmark(config.name, values, output = output)
58-
5956
withTempPath { dir =>
6057
withTempTable("parquetV1Table") {
6158
// Generate data with specified JSON patterns
@@ -119,31 +116,11 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
119116

120117
prepareTable(dir, jsonData)
121118

122-
benchmark.addCase("SQL Parquet - Spark") { _ =>
123-
spark.sql(config.query).noop()
124-
}
125-
126-
benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
127-
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
128-
spark.sql(config.query).noop()
129-
}
130-
}
131-
132-
benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
133-
val baseConfigs =
134-
Map(
135-
CometConf.COMET_ENABLED.key -> "true",
136-
CometConf.COMET_EXEC_ENABLED.key -> "true",
137-
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
138-
"spark.sql.optimizer.constantFolding.enabled" -> "false")
139-
val allConfigs = baseConfigs ++ config.extraCometConfigs
140-
141-
withSQLConf(allConfigs.toSeq: _*) {
142-
spark.sql(config.query).noop()
143-
}
144-
}
119+
val extraConfigs = Map(
120+
CometConf.getExprAllowIncompatConfigKey(
121+
classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs
145122

146-
benchmark.run()
123+
runExpressionBenchmark(config.name, values, config.query, extraConfigs)
147124
}
148125
}
149126
}

0 commit comments

Comments
 (0)