Skip to content

Commit 77899ee

Browse files
authored
chore: Add checks to microbenchmarks for plan running natively in Comet (#3045)
1 parent 91f0f7b commit 77899ee

File tree

3 files changed

+100
-53
lines changed

3 files changed

+100
-53
lines changed

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.{ExampleParquetWriter, GroupWriteSuppor
3737
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
3838
import org.apache.spark._
3939
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
40-
import org.apache.spark.sql.comet._
40+
import org.apache.spark.sql.comet.CometPlanChecker
4141
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec}
4242
import org.apache.spark.sql.execution._
4343
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -58,7 +58,8 @@ abstract class CometTestBase
5858
with BeforeAndAfterEach
5959
with AdaptiveSparkPlanHelper
6060
with ShimCometSparkSessionExtensions
61-
with ShimCometTestBase {
61+
with ShimCometTestBase
62+
with CometPlanChecker {
6263
import testImplicits._
6364

6465
protected val shuffleManager: String =
@@ -396,26 +397,6 @@ abstract class CometTestBase
396397
checkPlanNotMissingInput(plan)
397398
}
398399

399-
protected def findFirstNonCometOperator(
400-
plan: SparkPlan,
401-
excludedClasses: Class[_]*): Option[SparkPlan] = {
402-
val wrapped = wrapCometSparkToColumnar(plan)
403-
wrapped.foreach {
404-
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
405-
_: CometIcebergNativeScanExec =>
406-
case _: CometSinkPlaceHolder | _: CometScanWrapper =>
407-
case _: CometColumnarToRowExec =>
408-
case _: CometSparkToColumnarExec =>
409-
case _: CometExec | _: CometShuffleExchangeExec =>
410-
case _: CometBroadcastExchangeExec =>
411-
case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter =>
412-
case op if !excludedClasses.exists(c => c.isAssignableFrom(op.getClass)) =>
413-
return Some(op)
414-
case _ =>
415-
}
416-
None
417-
}
418-
419400
// checks the plan node has no missing inputs
420401
// such nodes represented in plan with exclamation mark !
421402
// example: !CometWindowExec
@@ -449,14 +430,6 @@ abstract class CometTestBase
449430
}
450431
}
451432

452-
/** Wraps the CometRowToColumn as ScanWrapper, so the child operators will not be checked */
453-
private def wrapCometSparkToColumnar(plan: SparkPlan): SparkPlan = {
454-
plan.transformDown {
455-
// don't care the native operators
456-
case p: CometSparkToColumnarExec => CometScanWrapper(null, p)
457-
}
458-
}
459-
460433
private var _spark: SparkSessionType = _
461434
override protected implicit def spark: SparkSessionType = _spark
462435
protected implicit def sqlContext: SQLContext = _spark.sqlContext

spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,19 @@ import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
3131
import org.apache.spark.SparkConf
3232
import org.apache.spark.benchmark.Benchmark
3333
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
34+
import org.apache.spark.sql.comet.CometPlanChecker
35+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3436
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
3537
import org.apache.spark.sql.internal.SQLConf
3638
import org.apache.spark.sql.types.DecimalType
3739

3840
import org.apache.comet.CometConf
3941
import org.apache.comet.CometSparkSessionExtensions
4042

41-
trait CometBenchmarkBase extends SqlBasedBenchmark {
43+
trait CometBenchmarkBase
44+
extends SqlBasedBenchmark
45+
with AdaptiveSparkPlanHelper
46+
with CometPlanChecker {
4247
override def getSparkSession: SparkSession = {
4348
val conf = new SparkConf()
4449
.setAppName("CometReadBenchmark")
@@ -88,28 +93,6 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
8893
}
8994
}
9095

91-
/** Runs function `f` with Comet on and off. */
92-
final def runWithComet(name: String, cardinality: Long)(f: => Unit): Unit = {
93-
val benchmark = new Benchmark(name, cardinality, output = output)
94-
95-
benchmark.addCase(s"$name - Spark ") { _ =>
96-
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
97-
f
98-
}
99-
}
100-
101-
benchmark.addCase(s"$name - Comet") { _ =>
102-
withSQLConf(
103-
CometConf.COMET_ENABLED.key -> "true",
104-
CometConf.COMET_EXEC_ENABLED.key -> "true",
105-
SQLConf.ANSI_ENABLED.key -> "false") {
106-
f
107-
}
108-
}
109-
110-
benchmark.run()
111-
}
112-
11396
/**
11497
* Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec).
11598
* This provides a consistent benchmark structure for expression evaluation.
@@ -149,6 +132,29 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
149132
CometConf.COMET_EXEC_ENABLED.key -> "true",
150133
"spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs
151134

135+
// Check that the plan is fully Comet native before running the benchmark
136+
withSQLConf(cometExecConfigs.toSeq: _*) {
137+
val df = spark.sql(query)
138+
df.noop()
139+
val plan = stripAQEPlan(df.queryExecution.executedPlan)
140+
findFirstNonCometOperator(plan) match {
141+
case Some(op) =>
142+
// scalastyle:off println
143+
println()
144+
println("=" * 80)
145+
println("WARNING: Benchmark plan is NOT fully Comet native!")
146+
println(s"First non-Comet operator: ${op.nodeName}")
147+
println("=" * 80)
148+
println("Query plan:")
149+
println(plan.treeString)
150+
println("=" * 80)
151+
println()
152+
// scalastyle:on println
153+
case None =>
154+
// All operators are Comet native, no warning needed
155+
}
156+
}
157+
152158
benchmark.addCase("Comet (Scan + Exec)") { _ =>
153159
withSQLConf(cometExecConfigs.toSeq: _*) {
154160
spark.sql(query).noop()
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet
21+
22+
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
23+
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, SparkPlan, WholeStageCodegenExec}
24+
25+
/**
26+
* Trait providing utilities to check if a Spark plan is fully running on Comet native operators.
27+
* Used by both CometTestBase and CometBenchmarkBase.
28+
*/
29+
trait CometPlanChecker {
30+
31+
/**
32+
* Finds the first non-Comet operator in the plan, if any.
33+
*
34+
* @param plan
35+
* The SparkPlan to check
36+
* @param excludedClasses
37+
* Classes to exclude from the check (these are allowed to be non-Comet)
38+
* @return
39+
* Some(operator) if a non-Comet operator is found, None otherwise
40+
*/
41+
protected def findFirstNonCometOperator(
42+
plan: SparkPlan,
43+
excludedClasses: Class[_]*): Option[SparkPlan] = {
44+
val wrapped = wrapCometSparkToColumnar(plan)
45+
wrapped.foreach {
46+
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
47+
_: CometIcebergNativeScanExec =>
48+
case _: CometSinkPlaceHolder | _: CometScanWrapper =>
49+
case _: CometColumnarToRowExec =>
50+
case _: CometSparkToColumnarExec =>
51+
case _: CometExec | _: CometShuffleExchangeExec =>
52+
case _: CometBroadcastExchangeExec =>
53+
case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter =>
54+
case op if !excludedClasses.exists(c => c.isAssignableFrom(op.getClass)) =>
55+
return Some(op)
56+
case _ =>
57+
}
58+
None
59+
}
60+
61+
/** Wraps the CometSparkToColumnar as ScanWrapper, so the child operators will not be checked */
62+
private def wrapCometSparkToColumnar(plan: SparkPlan): SparkPlan = {
63+
plan.transformDown {
64+
// don't care the native operators
65+
case p: CometSparkToColumnarExec => CometScanWrapper(null, p)
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)