Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package org.apache.spark.sql.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.types._

import org.apache.comet.CometConf

/**
* Benchmark to measure Comet expression evaluation performance. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
Expand All @@ -35,10 +32,6 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {

def integerArithmeticBenchmark(values: Int, op: BinaryOp, useDictionary: Boolean): Unit = {
val dataType = IntegerType
val benchmark = new Benchmark(
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
values,
output = output)

withTempPath { dir =>
withTempTable(table) {
Expand All @@ -48,25 +41,10 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
s"SELECT CAST(value AS ${dataType.sql}) AS c1, " +
s"CAST(value AS ${dataType.sql}) c2 FROM $tbl"))

benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
}

benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
}
}

benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
}
}
val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
val query = s"SELECT c1 ${op.sig} c2 FROM $table"

benchmark.run()
runExpressionBenchmark(name, values, query)
}
}
}
Expand All @@ -76,36 +54,17 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
dataType: DecimalType,
op: BinaryOp,
useDictionary: Boolean): Unit = {
val benchmark = new Benchmark(
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
values,
output = output)
val df = makeDecimalDataFrame(values, dataType, useDictionary)

withTempPath { dir =>
withTempTable(table) {
df.createOrReplaceTempView(tbl)
prepareTable(dir, spark.sql(s"SELECT dec AS c1, dec AS c2 FROM $tbl"))

benchmark.addCase(s"$op ($dataType) - Spark") { _ =>
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
}

benchmark.addCase(s"$op ($dataType) - Comet (Scan)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
}
}

benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop()
}
}
val name = s"Binary op ${dataType.sql}, dictionary = $useDictionary"
val query = s"SELECT c1 ${op.sig} c2 FROM $table"

benchmark.run()
runExpressionBenchmark(name, values, query)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,54 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
benchmark.run()
}

/**
* Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec).
* This provides a consistent benchmark structure for expression evaluation.
*
* @param name
* Benchmark name
* @param cardinality
* Number of rows being processed
* @param query
* SQL query to benchmark
* @param extraCometConfigs
* Additional configurations to apply for Comet cases (optional)
*/
final def runExpressionBenchmark(
name: String,
cardinality: Long,
query: String,
extraCometConfigs: Map[String, String] = Map.empty): Unit = {
val benchmark = new Benchmark(name, cardinality, output = output)

benchmark.addCase("Spark") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
spark.sql(query).noop()
}
}

benchmark.addCase("Comet (Scan)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "false") {
spark.sql(query).noop()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor : Would we be needing exec and scan exec for all types of benchmarks @andygrove ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I only updated expression benchmarks in this PR but will follow up for the other ones (shuffle, exec, aggregate) in separate PRs in the future but those ones are a little more involved. My short term goal is to add benchmarks for all expressions, so wanted to refactor the expression benchmarks before doing that.


val cometExecConfigs = Map(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
"spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs

benchmark.addCase("Comet (Scan + Exec)") { _ =>
withSQLConf(cometExecConfigs.toSeq: _*) {
spark.sql(query).noop()
}
}

benchmark.run()
}

protected def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = {
val testDf = if (partition.isDefined) {
df.write.partitionBy(partition.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@

package org.apache.spark.sql.benchmark

import scala.util.Try

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, LongType}

import org.apache.comet.CometConf
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{Compatible, Incompatible, Unsupported}

Expand Down Expand Up @@ -81,48 +77,20 @@ object CometCastBenchmark extends CometBenchmarkBase {
toDataType: DataType,
isAnsiMode: Boolean): Unit = {

val benchmark =
new Benchmark(
s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}",
values,
output = output)

withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT value FROM $tbl"))

val functionSQL = castExprSQL(toDataType, "value")
val query = s"SELECT $functionSQL FROM parquetV1Table"
val name =
s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}"

benchmark.addCase(
s"SQL Parquet - Spark Cast expr from ${fromDataType.sql} to : ${toDataType.sql} , " +
s"ansi mode enabled : ${isAnsiMode}") { _ =>
withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
if (isAnsiMode) {
Try { spark.sql(query).noop() }
} else {
spark.sql(query).noop()
}
}
}
val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString)

benchmark.addCase(
s"SQL Parquet - Comet Cast expr from ${fromDataType.sql} to : ${toDataType.sql} , " +
s"ansi mode enabled : ${isAnsiMode}") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) {
if (isAnsiMode) {
Try { spark.sql(query).noop() }
} else {
spark.sql(query).noop()
}
}
}
benchmark.run()
runExpressionBenchmark(name, values, query, extraConfigs)
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

package org.apache.spark.sql.benchmark

import org.apache.spark.benchmark.Benchmark

import org.apache.comet.CometConf

/**
* Benchmark to measure Comet execution performance. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
Expand All @@ -32,65 +28,26 @@ import org.apache.comet.CometConf
object CometConditionalExpressionBenchmark extends CometBenchmarkBase {

def caseWhenExprBenchmark(values: Int): Unit = {
val benchmark = new Benchmark("Case When Expr", values, output = output)

withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))

val query =
"select CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0' END from parquetV1Table"

benchmark.addCase("SQL Parquet - Spark") { _ =>
spark.sql(query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}

benchmark.run()
runExpressionBenchmark("Case When Expr", values, query)
}
}
}

def ifExprBenchmark(values: Int): Unit = {
val benchmark = new Benchmark("If Expr", values, output = output)

withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl"))
val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"

benchmark.addCase("SQL Parquet - Spark") { _ =>
spark.sql(query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}
val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table"

benchmark.run()
runExpressionBenchmark("If Expr", values, query)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl"))
Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level =>
val isDictionary = if (useDictionary) "(Dictionary)" else ""
runWithComet(s"Date Truncate $isDictionary - $level", values) {
spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop()
}
val name = s"Date Truncate $isDictionary - $level"
val query = s"select trunc(dt, '$level') from parquetV1Table"
runExpressionBenchmark(name, values, query)
}
}
}
Expand All @@ -68,9 +68,9 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
"WEEK",
"QUARTER").foreach { level =>
val isDictionary = if (useDictionary) "(Dictionary)" else ""
runWithComet(s"Timestamp Truncate $isDictionary - $level", values) {
spark.sql(s"select date_trunc('$level', ts) from parquetV1Table").noop()
}
val name = s"Timestamp Truncate $isDictionary - $level"
val query = s"select date_trunc('$level', ts) from parquetV1Table"
runExpressionBenchmark(name, values, query)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.spark.sql.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.catalyst.expressions.JsonToStructs

import org.apache.comet.CometConf
Expand Down Expand Up @@ -54,8 +53,6 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {
* Generic method to run a JSON expression benchmark with the given configuration.
*/
def runJsonExprBenchmark(config: JsonExprConfig, values: Int): Unit = {
val benchmark = new Benchmark(config.name, values, output = output)

withTempPath { dir =>
withTempTable("parquetV1Table") {
// Generate data with specified JSON patterns
Expand Down Expand Up @@ -119,31 +116,11 @@ object CometJsonExpressionBenchmark extends CometBenchmarkBase {

prepareTable(dir, jsonData)

benchmark.addCase("SQL Parquet - Spark") { _ =>
spark.sql(config.query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(config.query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
val baseConfigs =
Map(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
"spark.sql.optimizer.constantFolding.enabled" -> "false")
val allConfigs = baseConfigs ++ config.extraCometConfigs

withSQLConf(allConfigs.toSeq: _*) {
spark.sql(config.query).noop()
}
}
val extraConfigs = Map(
CometConf.getExprAllowIncompatConfigKey(
classOf[JsonToStructs]) -> "true") ++ config.extraCometConfigs

benchmark.run()
runExpressionBenchmark(config.name, values, config.query, extraConfigs)
}
}
}
Expand Down
Loading
Loading