Skip to content

Commit 304e9f7

Browse files
committed
remove AQE integration
1 parent 14ff5a5 commit 304e9f7

File tree

4 files changed

+15
-113
lines changed

4 files changed

+15
-113
lines changed

spark/src/main/scala/org/apache/comet/cost/CometCostEvaluator.scala

Lines changed: 0 additions & 100 deletions
This file was deleted.

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import org.apache.spark.sql.types._
4949
import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
5050
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
5151
import org.apache.comet.CometSparkSessionExtensions._
52+
import org.apache.comet.cost.DefaultCometCostModel
5253
import org.apache.comet.rules.CometExecRule.allExecs
5354
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, Unsupported}
5455
import org.apache.comet.serde.operator._
@@ -344,7 +345,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
344345
}
345346

346347
override def apply(plan: SparkPlan): SparkPlan = {
347-
val newPlan = _apply(plan)
348+
val candidatePlan = _apply(plan)
349+
350+
// TODO load cost model via config and reflection
351+
val costModel = new DefaultCometCostModel
352+
val costBefore = costModel.estimateCost(plan)
353+
val costAfter = costModel.estimateCost(candidatePlan)
354+
355+
val newPlan = if (costAfter.acceleration > costBefore.acceleration) {
356+
candidatePlan
357+
} else {
358+
plan
359+
}
360+
348361
if (showTransformations && !newPlan.fastEquals(plan)) {
349362
logInfo(s"""
350363
|=== Applying Rule $ruleName ===

spark/src/main/scala/org/apache/spark/Plugins.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
2929
import org.apache.spark.sql.internal.StaticSQLConf
3030

31-
import org.apache.comet.CometConf.{COMET_COST_BASED_OPTIMIZATION_ENABLED, COMET_ONHEAP_ENABLED}
31+
import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
3232
import org.apache.comet.CometSparkSessionExtensions
3333

3434
/**
@@ -57,15 +57,6 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
5757
// register CometSparkSessionExtensions if it isn't already registered
5858
CometDriverPlugin.registerCometSessionExtension(sc.conf)
5959

60-
// Enable cost-based optimization if configured
61-
if (sc.getConf.getBoolean(COMET_COST_BASED_OPTIMIZATION_ENABLED.key, false)) {
62-
// Set the custom cost evaluator for Spark's adaptive query execution
63-
sc.conf.set(
64-
"spark.sql.adaptive.customCostEvaluatorClass",
65-
"org.apache.comet.cost.CometCostEvaluator")
66-
logInfo("Enabled Comet cost-based optimization with CometCostEvaluator")
67-
}
68-
6960
if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
7061
val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
7162
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)

spark/src/test/scala/org/apache/comet/CometCostModelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ class CometCostModelSuite extends CometTestBase {
7373
CometConf.COMET_EXEC_PROJECT_ENABLED.key -> "true",
7474
CometConf.COMET_EXEC_AGGREGATE_ENABLED.key -> "true", // Enable aggregation for GROUP BY
7575
CometConf.COMET_EXEC_HASH_JOIN_ENABLED.key -> "true", // Enable joins
76-
// Manually set the custom cost evaluator since plugin might not be loaded
77-
"spark.sql.adaptive.customCostEvaluatorClass" -> "org.apache.comet.cost.CometCostEvaluator",
7876
// Lower AQE thresholds to ensure it triggers on small test data
7977
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "1KB",
8078
"spark.sql.adaptive.coalescePartitions.minPartitionSize" -> "1B") {

0 commit comments

Comments
 (0)