From 79310d015222f09aefbb21cce4211f154429b5ee Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 08:25:54 -0600 Subject: [PATCH 01/20] Add plan stats to extended explain --- .../apache/comet/ExtendedExplainInfo.scala | 63 ++++++++++++++++--- .../apache/comet/exec/CometExecSuite.scala | 6 ++ 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 755c345717..e270b83c76 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -23,7 +23,8 @@ import scala.collection.mutable import org.apache.spark.sql.ExtendedExplainGenerator import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} -import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec} +import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.comet.CometExplainInfo.getActualPlan @@ -81,9 +82,14 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { // generates the extended info in a verbose manner, printing each node along with the // extended information in a tree display def generateVerboseExtendedInfo(plan: SparkPlan): String = { + val planStats = new PlanStats() val outString = new StringBuilder() - generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString) - outString.toString() + generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) + val eligible = planStats.sparkOperators + planStats.cometOperators + val converted = + if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 + val summary = s"Comet accelerated ${converted.toInt}% of eligible operators ($planStats)." + s"${outString.toString()}\n\n$summary." } // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any @@ -92,7 +98,27 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth: Int, lastChildren: Seq[Boolean], indent: Int, - outString: StringBuilder): Unit = { + outString: StringBuilder, + planStats: PlanStats): Unit = { + + // scalastyle:off + node match { + case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | + _: WholeStageCodegenExec => + println(s"ZZZ WRAPPER: ${node.nodeName}") + planStats.wrappers += 1 + case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | + _: CometSparkToColumnarExec => + println(s"ZZZ TRANSITION: ${node.nodeName}") + planStats.transitions += 1 + case _: CometPlan => + println(s"ZZZ COMET: ${node.nodeName}") + planStats.cometOperators += 1 + case _ => + println(s"ZZZ SPARK: ${node.nodeName}") + planStats.sparkOperators += 1 + } + outString.append(" " * indent) if (depth > 0) { lastChildren.init.foreach { isLast => @@ -119,7 +145,8 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth + 2, lastChildren :+ node.children.isEmpty :+ false, indent, - outString) + outString, + planStats) case _ => } generateTreeString( @@ -127,7 +154,8 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth + 2, lastChildren :+ node.children.isEmpty :+ true, indent, - outString) + outString, + planStats) } if (node.children.nonEmpty) { node.children.init.foreach { @@ -137,18 +165,37 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth + 1, lastChildren :+ false, indent, - outString) + outString, + planStats) case _ => } node.children.last match { case c @ (_: TreeNode[_]) => - generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString) + generateTreeString( + getActualPlan(c), + depth + 1, + lastChildren :+ true, + indent, + outString, + planStats) case _ => } } } } +class PlanStats { + var sparkOperators: Int = 0 + var cometOperators: Int = 0 + var wrappers: Int = 0 + var transitions: Int = 0 + + override def toString: String = { + s"sparkOperators=$sparkOperators, cometOperators=$cometOperators, " + + s"transitions=$transitions, wrappers=$wrappers" + } +} + object CometExplainInfo { val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 47d2205a08..601c9b645e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -118,6 +118,12 @@ class CometExecSuite extends CometTestBase { val (_, cometPlan) = checkSparkAnswer(df) val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) assert(infos.contains("Dynamic Partition Pruning is not supported")) + + withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { + val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + // scalastyle:off + println(extendedExplain) + } } } } From 5157f832d636f2b07a026e56c615e05c33a62467 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 08:42:15 -0600 Subject: [PATCH 02/20] save --- .../main/scala/org/apache/comet/ExtendedExplainInfo.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index e270b83c76..25f7e05945 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometExplainInfo.getActualPlan @@ -89,7 +90,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { val converted = if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 val summary = s"Comet accelerated ${converted.toInt}% of eligible operators ($planStats)." - s"${outString.toString()}\n\n$summary." + s"${outString.toString()}\n$summary" } // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any @@ -101,10 +102,11 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { outString: StringBuilder, planStats: PlanStats): Unit = { + // TODO remove debug logging // scalastyle:off node match { case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | - _: WholeStageCodegenExec => + _: WholeStageCodegenExec | _: ReusedExchangeExec => println(s"ZZZ WRAPPER: ${node.nodeName}") planStats.wrappers += 1 case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | From 4ef52938775e4fda294fd6e84612a9913d1d77fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 08:55:23 -0600 Subject: [PATCH 03/20] save --- .../src/main/scala/org/apache/comet/ExtendedExplainInfo.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 25f7e05945..522e31ae50 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.ExtendedExplainGenerator import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec} -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometExplainInfo.getActualPlan @@ -106,7 +106,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { // scalastyle:off node match { case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | - _: WholeStageCodegenExec | _: ReusedExchangeExec => + _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => println(s"ZZZ WRAPPER: ${node.nodeName}") planStats.wrappers += 1 case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | From 8b6dcfc1015e0d3637536d177b3e66699b777d6c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 08:55:42 -0600 Subject: [PATCH 04/20] prep for review --- .../main/scala/org/apache/comet/ExtendedExplainInfo.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 522e31ae50..0cc7da83f8 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -102,22 +102,16 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { outString: StringBuilder, planStats: PlanStats): Unit = { - // TODO remove debug logging - // scalastyle:off node match { case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => - println(s"ZZZ WRAPPER: ${node.nodeName}") planStats.wrappers += 1 case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | _: CometSparkToColumnarExec => - println(s"ZZZ TRANSITION: ${node.nodeName}") planStats.transitions += 1 case _: CometPlan => - println(s"ZZZ COMET: ${node.nodeName}") planStats.cometOperators += 1 case _ => - println(s"ZZZ SPARK: ${node.nodeName}") planStats.sparkOperators += 1 } From 8dca97da756f75d65dbf30c7c86f5e82ed324891 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 09:00:02 -0600 Subject: [PATCH 05/20] prep for review --- .../src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 601c9b645e..38fe0f6794 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -121,8 +121,7 @@ class CometExecSuite extends CometTestBase { withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) - // scalastyle:off - println(extendedExplain) + assert(extendedExplain.contains("Comet accelerated 33% of eligible operators")) } } } From 45aa334d654db33affec063efdc376fac0f161ab Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 11:17:14 -0600 Subject: [PATCH 06/20] save progress --- .../scala/org/apache/comet/CometConf.scala | 15 +++++++-- docs/source/user-guide/latest/installation.md | 5 +-- .../apache/comet/rules/CometExecRule.scala | 31 +++++++++++++++---- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 54c88e9d5d..113534ce3c 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -457,11 +457,21 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_EXPLAIN_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.explain.enabled") + .doc( + "When this setting is enabled, Comet will log an explain plan as part of the query " + + "planning process for each query stage.") + .booleanConf + .createWithDefault(false) + val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.explain.verbose.enabled") .doc( - "When this setting is enabled, Comet will provide a verbose tree representation of " + - "the extended information.") + "When this setting is enabled, Comet's extended explain output will provide the full " + + "query plan annotated with fallback reasons as well as a summary of how much of " + + "the plan was accelerated by Comet. When this setting is disabled, a list of fallback " + + "reasons will be provided instead.") .booleanConf .createWithDefault(false) @@ -495,6 +505,7 @@ object CometConf extends ShimCometConf { "When this setting is enabled, Comet will provide logging explaining the reason(s) " + "why a query stage cannot be executed natively. Set this to false to " + "reduce the amount of logging.") + .internal() .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md index 89a8624e3e..e451b4542c 100644 --- a/docs/source/user-guide/latest/installation.md +++ b/docs/source/user-guide/latest/installation.md @@ -90,9 +90,10 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.executor.extraClassPath=$COMET_JAR \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.explainFallback.enabled=true \ + --conf spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo \ + --conf spark.comet.explain.enabled=true \ --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g + --conf spark.memory.offHeap.size=2g ``` ### Verify Comet enabled for Spark SQL query diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index a71eed6c80..77c564e414 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -638,13 +638,32 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { var newPlan = transform(normalizedPlan) - // if the plan cannot be run fully natively then explain why (when appropriate - // config is enabled) - if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { + if (CometConf.COMET_EXPLAIN_ENABLED.get()) { + // COMET_EXPLAIN_ENABLED shows an explain plan regardless of + // whether there were any fallback reasons or not val info = new ExtendedExplainInfo() - if (info.extensionInfo(newPlan).nonEmpty) { - logWarning( - "Comet cannot execute some parts of this plan natively " + + val fallbackReasons = info.extensionInfo(newPlan) + if (fallbackReasons.nonEmpty) { + logInfo( + "Comet cannot accelerate some parts of this plan " + + s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + + "to disable this logging):\n" + + s"${info.generateVerboseExtendedInfo(newPlan)}") + } else if (CometConf.COMET_EXPLAIN_ENABLED.get()) { + logInfo( + "Comet fully accelerated this plan " + + s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + + "to disable this logging):\n" + + s"${info.generateVerboseExtendedInfo(newPlan)}") + } + } else if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { + // COMET_EXPLAIN_FALLBACK_ENABLED only shows an explain plan if + // there were fallback reasons + val info = new ExtendedExplainInfo() + val fallbackReasons = info.extensionInfo(newPlan) + if (fallbackReasons.nonEmpty) { + logInfo( + "Comet cannot accelerate some parts of this plan " + s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " + "to disable this logging):\n" + s"${info.generateVerboseExtendedInfo(newPlan)}") From 2dc8f7027f50d4080e7499bbf35c0f1e3b66ea67 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 11:23:04 -0600 Subject: [PATCH 07/20] docs --- docs/source/user-guide/latest/configs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index c923c56687..015476d742 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -62,9 +62,9 @@ Comet provides the following configuration settings. | spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true | | spark.comet.exec.union.enabled | Whether to enable union by default. | true | | spark.comet.exec.window.enabled | Whether to enable window by default. | true | +| spark.comet.explain.enabled | When this setting is enabled, Comet will log an explain plan as part of the query planning process for each query stage. | false | | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | -| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | -| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | +| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false | | spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 | From 22407fcc786cd713b9a29be882683be04f8e2855 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 12:06:57 -0600 Subject: [PATCH 08/20] update docs --- docs/source/user-guide/latest/installation.md | 50 ++++++++++--------- .../apache/comet/rules/CometExecRule.scala | 6 +-- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md index e451b4542c..ae631f9c79 100644 --- a/docs/source/user-guide/latest/installation.md +++ b/docs/source/user-guide/latest/installation.md @@ -98,38 +98,40 @@ $SPARK_HOME/bin/spark-shell \ ### Verify Comet enabled for Spark SQL query -Create a test Parquet source +Create a test Parquet source. Note that Comet will not accelerate writing the Parquet file. -```scala -scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test") ``` +scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test") -Query the data from the test source and check: - -- INFO message shows the native Comet library has been initialized. -- The query plan reflects Comet operators being used for this query instead of Spark ones +WARN CometExecRule: Comet cannot accelerate some parts of this plan (set spark.comet.explain.enabled=false to disable this logging): +Execute InsertIntoHadoopFsRelationCommand [COMET: Execute InsertIntoHadoopFsRelationCommand is not supported] ++- WriteFiles [COMET: WriteFiles is not supported] ++- LocalTableScan [COMET: LocalTableScan is not supported] -```scala scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1") -scala> spark.sql("select * from t1 where a > 5").explain -INFO src/lib.rs: Comet native library initialized -== Physical Plan == - *(1) ColumnarToRow - +- CometFilter [a#14], (isnotnull(a#14) AND (a#14 > 5)) - +- CometScan parquet [a#14] Batched: true, DataFilters: [isnotnull(a#14), (a#14 > 5)], - Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [], - PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct + +WARN CometExecRule: Comet fully accelerated this plan (set spark.comet.explain.enabled=false to disable this logging): +Execute CreateViewCommand + +- CreateViewCommand + +- LogicalRelation + +Comet accelerated 0% of eligible operators (sparkOperators=3, cometOperators=0, transitions=0, wrappers=0). +``` + +Executing a simple SELECT query should be fully accelerated by Comet: + ``` +scala> spark.sql("select * from t1 where a > 5").explain -With the configuration `spark.comet.explainFallback.enabled=true`, Comet will log any reasons that prevent a plan from -being executed natively. +WARN CometExecRule: Comet fully accelerated this plan (set spark.comet.explain.enabled=false to disable this logging): +CometFilter ++- CometScanWrapper -```scala -scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet") -WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because: - - LocalTableScan is not supported - - WriteFiles is not supported - - Execute InsertIntoHadoopFsRelationCommand is not supported +Comet accelerated 100% of eligible operators (sparkOperators=0, cometOperators=2, transitions=0, wrappers=0). +== Physical Plan == + *(1) CometColumnarToRow + +- CometFilter [a#7], (isnotnull(a#7) AND (a#7 > 5)) ++- CometScan [native_iceberg_compat] parquet [a#7] Batched: true, DataFilters: [isnotnull(a#7), (a#7 > 5)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct ``` ## Additional Configuration diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 77c564e414..6cce5e613e 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -644,13 +644,13 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val info = new ExtendedExplainInfo() val fallbackReasons = info.extensionInfo(newPlan) if (fallbackReasons.nonEmpty) { - logInfo( + logWarning( "Comet cannot accelerate some parts of this plan " + s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + "to disable this logging):\n" + s"${info.generateVerboseExtendedInfo(newPlan)}") } else if (CometConf.COMET_EXPLAIN_ENABLED.get()) { - logInfo( + logWarning( "Comet fully accelerated this plan " + s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + "to disable this logging):\n" + @@ -662,7 +662,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val info = new ExtendedExplainInfo() val fallbackReasons = info.extensionInfo(newPlan) if (fallbackReasons.nonEmpty) { - logInfo( + logWarning( "Comet cannot accelerate some parts of this plan " + s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " + "to disable this logging):\n" + From f85ffd53cbc96e4b322bc7252ec64794788f13b1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 12:10:59 -0600 Subject: [PATCH 09/20] Revert --- common/src/main/scala/org/apache/comet/CometConf.scala | 1 - docs/source/user-guide/latest/configs.md | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 113534ce3c..71e2bf6a76 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -505,7 +505,6 @@ object CometConf extends ShimCometConf { "When this setting is enabled, Comet will provide logging explaining the reason(s) " + "why a query stage cannot be executed natively. Set this to false to " + "reduce the amount of logging.") - .internal() .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 015476d742..f307a92fa4 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -65,6 +65,7 @@ Comet provides the following configuration settings. | spark.comet.explain.enabled | When this setting is enabled, Comet will log an explain plan as part of the query planning process for each query stage. | false | | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false | +| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional memory for Comet when running Spark in on-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 0.2 | From 3552888b97e67fdca9658150f6d42febcd702dea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 12:28:11 -0600 Subject: [PATCH 10/20] fix --- docs/source/user-guide/latest/installation.md | 13 ++++++++++--- .../org/apache/comet/rules/CometExecRule.scala | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md index ae631f9c79..e090390280 100644 --- a/docs/source/user-guide/latest/installation.md +++ b/docs/source/user-guide/latest/installation.md @@ -108,14 +108,21 @@ Execute InsertIntoHadoopFsRelationCommand [COMET: Execute InsertIntoHadoopFsRela +- WriteFiles [COMET: WriteFiles is not supported] +- LocalTableScan [COMET: LocalTableScan is not supported] +Comet accelerated 0% of eligible operators (sparkOperators=3, cometOperators=0, transitions=0, wrappers=0). +``` + +Create a view from the Parquet file. Again, Comet will not accelerate this part. + +``` scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1") -WARN CometExecRule: Comet fully accelerated this plan (set spark.comet.explain.enabled=false to disable this logging): +WARN CometExecRule: Comet cannot accelerate some parts of this plan (set spark.comet.explain.enabled=false to disable this logging): Execute CreateViewCommand - +- CreateViewCommand - +- LogicalRelation + +- CreateViewCommand + +- LogicalRelation Comet accelerated 0% of eligible operators (sparkOperators=3, cometOperators=0, transitions=0, wrappers=0). + ``` Executing a simple SELECT query should be fully accelerated by Comet: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 6cce5e613e..ce5c96b46b 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -649,7 +649,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + "to disable this logging):\n" + s"${info.generateVerboseExtendedInfo(newPlan)}") - } else if (CometConf.COMET_EXPLAIN_ENABLED.get()) { + } else { logWarning( "Comet fully accelerated this plan " + s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + From 8fc2108720a53a4a15feebf222f9144a6d39e9a5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 12:33:52 -0600 Subject: [PATCH 11/20] revert some changes --- .../scala/org/apache/comet/CometConf.scala | 8 ----- docs/source/user-guide/latest/configs.md | 1 - docs/source/user-guide/latest/installation.md | 8 ----- .../apache/comet/rules/CometExecRule.scala | 29 ++++--------------- 4 files changed, 5 insertions(+), 41 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 71e2bf6a76..8aeedbf10b 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -457,14 +457,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_EXPLAIN_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.explain.enabled") - .doc( - "When this setting is enabled, Comet will log an explain plan as part of the query " + - "planning process for each query stage.") - .booleanConf - .createWithDefault(false) - val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.explain.verbose.enabled") .doc( diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index f307a92fa4..a3a550f256 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -62,7 +62,6 @@ Comet provides the following configuration settings. | spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true | | spark.comet.exec.union.enabled | Whether to enable union by default. | true | | spark.comet.exec.window.enabled | Whether to enable window by default. | true | -| spark.comet.explain.enabled | When this setting is enabled, Comet will log an explain plan as part of the query planning process for each query stage. | false | | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false | | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md index e090390280..619a7ab369 100644 --- a/docs/source/user-guide/latest/installation.md +++ b/docs/source/user-guide/latest/installation.md @@ -115,14 +115,6 @@ Create a view from the Parquet file. Again, Comet will not accelerate this part. ``` scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1") - -WARN CometExecRule: Comet cannot accelerate some parts of this plan (set spark.comet.explain.enabled=false to disable this logging): -Execute CreateViewCommand - +- CreateViewCommand - +- LogicalRelation - -Comet accelerated 0% of eligible operators (sparkOperators=3, cometOperators=0, transitions=0, wrappers=0). - ``` Executing a simple SELECT query should be fully accelerated by Comet: diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index ce5c96b46b..a71eed6c80 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -638,32 +638,13 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { var newPlan = transform(normalizedPlan) - if (CometConf.COMET_EXPLAIN_ENABLED.get()) { - // COMET_EXPLAIN_ENABLED shows an explain plan regardless of - // whether there were any fallback reasons or not + // if the plan cannot be run fully natively then explain why (when appropriate + // config is enabled) + if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { val info = new ExtendedExplainInfo() - val fallbackReasons = info.extensionInfo(newPlan) - if (fallbackReasons.nonEmpty) { - logWarning( - "Comet cannot accelerate some parts of this plan " + - s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + - "to disable this logging):\n" + - s"${info.generateVerboseExtendedInfo(newPlan)}") - } else { - logWarning( - "Comet fully accelerated this plan " + - s"(set ${CometConf.COMET_EXPLAIN_ENABLED.key}=false " + - "to disable this logging):\n" + - s"${info.generateVerboseExtendedInfo(newPlan)}") - } - } else if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { - // COMET_EXPLAIN_FALLBACK_ENABLED only shows an explain plan if - // there were fallback reasons - val info = new ExtendedExplainInfo() - val fallbackReasons = info.extensionInfo(newPlan) - if (fallbackReasons.nonEmpty) { + if (info.extensionInfo(newPlan).nonEmpty) { logWarning( - "Comet cannot accelerate some parts of this plan " + + "Comet cannot execute some parts of this plan natively " + s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " + "to disable this logging):\n" + s"${info.generateVerboseExtendedInfo(newPlan)}") From 6f99dd939586da6e919ea803ebb6a7a06bb79b88 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 12:36:19 -0600 Subject: [PATCH 12/20] revert --- dev/benchmarks/comet-tpch.sh | 1 + docs/source/user-guide/latest/installation.md | 52 +++++++++---------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh index 09f5893659..653c86e8a4 100755 --- a/dev/benchmarks/comet-tpch.sh +++ b/dev/benchmarks/comet-tpch.sh @@ -43,6 +43,7 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.comet.exec.replaceSortMergeJoin=true \ --conf spark.comet.expression.allowIncompatible=true \ --conf spark.comet.scan.impl=native_datafusion \ + --conf spark.comet.explainFallback.enabled=true \ tpcbench.py \ --name comet \ --benchmark tpch \ diff --git a/docs/source/user-guide/latest/installation.md b/docs/source/user-guide/latest/installation.md index 619a7ab369..89a8624e3e 100644 --- a/docs/source/user-guide/latest/installation.md +++ b/docs/source/user-guide/latest/installation.md @@ -90,47 +90,45 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.executor.extraClassPath=$COMET_JAR \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo \ - --conf spark.comet.explain.enabled=true \ + --conf spark.comet.explainFallback.enabled=true \ --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=2g + --conf spark.memory.offHeap.size=16g ``` ### Verify Comet enabled for Spark SQL query -Create a test Parquet source. Note that Comet will not accelerate writing the Parquet file. +Create a test Parquet source -``` +```scala scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test") - -WARN CometExecRule: Comet cannot accelerate some parts of this plan (set spark.comet.explain.enabled=false to disable this logging): -Execute InsertIntoHadoopFsRelationCommand [COMET: Execute InsertIntoHadoopFsRelationCommand is not supported] -+- WriteFiles [COMET: WriteFiles is not supported] -+- LocalTableScan [COMET: LocalTableScan is not supported] - -Comet accelerated 0% of eligible operators (sparkOperators=3, cometOperators=0, transitions=0, wrappers=0). ``` -Create a view from the Parquet file. Again, Comet will not accelerate this part. - -``` -scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1") -``` +Query the data from the test source and check: -Executing a simple SELECT query should be fully accelerated by Comet: +- INFO message shows the native Comet library has been initialized. +- The query plan reflects Comet operators being used for this query instead of Spark ones -``` +```scala +scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1") scala> spark.sql("select * from t1 where a > 5").explain +INFO src/lib.rs: Comet native library initialized +== Physical Plan == + *(1) ColumnarToRow + +- CometFilter [a#14], (isnotnull(a#14) AND (a#14 > 5)) + +- CometScan parquet [a#14] Batched: true, DataFilters: [isnotnull(a#14), (a#14 > 5)], + Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [], + PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct +``` -WARN CometExecRule: Comet fully accelerated this plan (set spark.comet.explain.enabled=false to disable this logging): -CometFilter -+- CometScanWrapper +With the configuration `spark.comet.explainFallback.enabled=true`, Comet will log any reasons that prevent a plan from +being executed natively. -Comet accelerated 100% of eligible operators (sparkOperators=0, cometOperators=2, transitions=0, wrappers=0). -== Physical Plan == - *(1) CometColumnarToRow - +- CometFilter [a#7], (isnotnull(a#7) AND (a#7 > 5)) -+- CometScan [native_iceberg_compat] parquet [a#7] Batched: true, DataFilters: [isnotnull(a#7), (a#7 > 5)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct +```scala +scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet") +WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because: + - LocalTableScan is not supported + - WriteFiles is not supported + - Execute InsertIntoHadoopFsRelationCommand is not supported ``` ## Additional Configuration From 9cd439f94d12b5268c94f6809a5e584f6181b421 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 17 Sep 2025 12:42:56 -0600 Subject: [PATCH 13/20] revert --- dev/benchmarks/comet-tpch.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh index 653c86e8a4..09f5893659 100755 --- a/dev/benchmarks/comet-tpch.sh +++ b/dev/benchmarks/comet-tpch.sh @@ -43,7 +43,6 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.comet.exec.replaceSortMergeJoin=true \ --conf spark.comet.expression.allowIncompatible=true \ --conf spark.comet.scan.impl=native_datafusion \ - --conf spark.comet.explainFallback.enabled=true \ tpcbench.py \ --name comet \ --benchmark tpch \ From ac57ee96ac81d2904adbfc609e169fb166b59fdb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Sep 2025 13:46:08 -0600 Subject: [PATCH 14/20] rename --- .../main/scala/org/apache/comet/ExtendedExplainInfo.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 9599473802..858ee2fce6 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -83,7 +83,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { // generates the extended info in a verbose manner, printing each node along with the // extended information in a tree display def generateVerboseExtendedInfo(plan: SparkPlan): String = { - val planStats = new PlanStats() + val planStats = new CometCoverageStats() val outString = new StringBuilder() generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) val eligible = planStats.sparkOperators + planStats.cometOperators @@ -100,7 +100,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { lastChildren: Seq[Boolean], indent: Int, outString: StringBuilder, - planStats: PlanStats): Unit = { + planStats: CometCoverageStats): Unit = { node match { case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | @@ -180,7 +180,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { } } -class PlanStats { +class CometCoverageStats { var sparkOperators: Int = 0 var cometOperators: Int = 0 var wrappers: Int = 0 From 36c04396290d26ea56be561a049cb3fbd6490272 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Sep 2025 13:51:55 -0600 Subject: [PATCH 15/20] address feedback --- .../org/apache/comet/ExtendedExplainInfo.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 858ee2fce6..627743b968 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -89,8 +89,9 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { val eligible = planStats.sparkOperators + planStats.cometOperators val converted = if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 - val summary = s"Comet accelerated ${converted.toInt}% of eligible operators ($planStats)." - s"${outString.toString()}\n$summary" + s"Comet accelerated ${planStats.cometOperators} out of $eligible " + + s"eligible operators (${converted.toInt}%). " + + s"Final plan contains ${planStats.transitions} transitions." } // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any @@ -105,7 +106,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { node match { case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => - planStats.wrappers += 1 + // ignore case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | _: CometSparkToColumnarExec => planStats.transitions += 1 @@ -183,13 +184,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { class CometCoverageStats { var sparkOperators: Int = 0 var cometOperators: Int = 0 - var wrappers: Int = 0 var transitions: Int = 0 - - override def toString: String = { - s"sparkOperators=$sparkOperators, cometOperators=$cometOperators, " + - s"transitions=$transitions, wrappers=$wrappers" - } } object CometExplainInfo { From bdc83006e555485da2d3f8c96d77060aa3cbf15f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 29 Sep 2025 13:54:41 -0600 Subject: [PATCH 16/20] address feedback --- .../org/apache/comet/ExtendedExplainInfo.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 627743b968..bb1e8cd470 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -83,6 +83,20 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { // generates the extended info in a verbose manner, printing each node along with the // extended information in a tree display def generateVerboseExtendedInfo(plan: SparkPlan): String = { + val planStats = new CometCoverageStats() + val outString = new StringBuilder() + generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) + val eligible = planStats.sparkOperators + planStats.cometOperators + val converted = + if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 + val summary = s"Comet accelerated ${planStats.cometOperators} out of $eligible " + + s"eligible operators (${converted.toInt}%). " + + s"Final plan contains ${planStats.transitions} transitions." + s"${outString.toString()}\n$summary" + } + + /** Get the coverage statistics without the full plan */ + def generateCoverageInfo(plan: SparkPlan): String = { val planStats = new CometCoverageStats() val outString = new StringBuilder() generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) From 62473ad07b7c759cd9c7c128b4617874ce41626f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 30 Sep 2025 08:23:14 -0600 Subject: [PATCH 17/20] Update spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala Co-authored-by: Parth Chandra --- spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index bb1e8cd470..8f890d5a7f 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -105,7 +105,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 s"Comet accelerated ${planStats.cometOperators} out of $eligible " + s"eligible operators (${converted.toInt}%). " + - s"Final plan contains ${planStats.transitions} transitions." + s"Final plan contains ${planStats.transitions} transitions between Spark and Comet." } // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any From e483ffa0aa8aec78fa6476bd682522cb7c96787b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 30 Sep 2025 08:28:04 -0600 Subject: [PATCH 18/20] Refactor --- .../apache/comet/ExtendedExplainInfo.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 8f890d5a7f..1e514956d8 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -86,13 +86,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { val planStats = new CometCoverageStats() val outString = new StringBuilder() generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) - val eligible = planStats.sparkOperators + planStats.cometOperators - val converted = - if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 - val summary = s"Comet accelerated ${planStats.cometOperators} out of $eligible " + - s"eligible operators (${converted.toInt}%). " + - s"Final plan contains ${planStats.transitions} transitions." - s"${outString.toString()}\n$summary" + s"${outString.toString()}\n$planStats" } /** Get the coverage statistics without the full plan */ @@ -100,12 +94,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { val planStats = new CometCoverageStats() val outString = new StringBuilder() generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) - val eligible = planStats.sparkOperators + planStats.cometOperators - val converted = - if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 - s"Comet accelerated ${planStats.cometOperators} out of $eligible " + - s"eligible operators (${converted.toInt}%). " + - s"Final plan contains ${planStats.transitions} transitions between Spark and Comet." + planStats.toString() } // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any @@ -199,6 +188,15 @@ class CometCoverageStats { var sparkOperators: Int = 0 var cometOperators: Int = 0 var transitions: Int = 0 + + override def toString(): String = { + val eligible = sparkOperators + cometOperators + val converted = + if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0 + s"Comet accelerated $cometOperators out of $eligible " + + s"eligible operators (${converted.toInt}%). " + + s"Final plan contains $transitions transitions between Spark and Comet." + } } object CometExplainInfo { @@ -213,6 +211,7 @@ object CometExplainInfo { case p: ReusedExchangeExec => getActualPlan(p.child) case p => p } + } } From 6a0500dc2ef7a1edce1ca2af903abf840c18809c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Oct 2025 09:39:01 -0600 Subject: [PATCH 19/20] make check more generic --- .../src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9a8ba917d3..75d2f60b2d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -121,7 +121,8 @@ class CometExecSuite extends CometTestBase { withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) - assert(extendedExplain.contains("Comet accelerated 33% of eligible operators")) + assert(extendedExplain.contains("Comet accelerated")) + assert(extendedExplain.contains("% of eligible operators")) } } } From 64365748ecb9484bafd9cf79d5a9c513a490afdd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Oct 2025 12:18:29 -0600 Subject: [PATCH 20/20] fix --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 75d2f60b2d..04203f7545 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -122,7 +122,6 @@ class CometExecSuite extends CometTestBase { withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) assert(extendedExplain.contains("Comet accelerated")) - assert(extendedExplain.contains("% of eligible operators")) } } }