diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 15c160ed6c..c2a5d05829 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -457,8 +457,10 @@ object CometConf extends ShimCometConf { val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.explain.verbose.enabled") .doc( - "When this setting is enabled, Comet will provide a verbose tree representation of " + - "the extended information.") + "When this setting is enabled, Comet's extended explain output will provide the full " + + "query plan annotated with fallback reasons as well as a summary of how much of " + + "the plan was accelerated by Comet. When this setting is disabled, a list of fallback " + + "reasons will be provided instead.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 410386e958..bebca3c443 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -63,7 +63,7 @@ Comet provides the following configuration settings. | spark.comet.exec.union.enabled | Whether to enable union by default. | true | | spark.comet.exec.window.enabled | Whether to enable window by default. | true | | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | -| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | +| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false | | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false | diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index b5b8a53029..1e514956d8 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -23,8 +23,9 @@ import scala.collection.mutable import org.apache.spark.sql.ExtendedExplainGenerator import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} -import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec} -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec} +import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.comet.CometExplainInfo.getActualPlan @@ -82,9 +83,18 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { // generates the extended info in a verbose manner, printing each node along with the // extended information in a tree display def generateVerboseExtendedInfo(plan: SparkPlan): String = { + val planStats = new CometCoverageStats() val outString = new StringBuilder() - generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString) - outString.toString() + generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) + s"${outString.toString()}\n$planStats" + } + + /** Get the coverage statistics without the full plan */ + def generateCoverageInfo(plan: SparkPlan): String = { + val planStats = new CometCoverageStats() + val outString = new StringBuilder() + generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) + planStats.toString() } // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any @@ -93,7 +103,22 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth: Int, lastChildren: Seq[Boolean], indent: Int, - outString: StringBuilder): Unit = { + outString: StringBuilder, + planStats: CometCoverageStats): Unit = { + + node match { + case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | + _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => + // ignore + case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | + _: CometSparkToColumnarExec => + planStats.transitions += 1 + case _: CometPlan => + planStats.cometOperators += 1 + case _ => + planStats.sparkOperators += 1 + } + outString.append(" " * indent) if (depth > 0) { lastChildren.init.foreach { isLast => @@ -120,7 +145,8 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth + 2, lastChildren :+ node.children.isEmpty :+ false, indent, - outString) + outString, + planStats) case _ => } generateTreeString( @@ -128,7 +154,8 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth + 2, lastChildren :+ node.children.isEmpty :+ true, indent, - outString) + outString, + planStats) } if (node.children.nonEmpty) { node.children.init.foreach { @@ -138,18 +165,40 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { depth + 1, lastChildren :+ false, indent, - outString) + outString, + planStats) case _ => } node.children.last match { case c @ (_: TreeNode[_]) => - generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString) + generateTreeString( + getActualPlan(c), + depth + 1, + lastChildren :+ true, + indent, + outString, + planStats) case _ => } } } } +class CometCoverageStats { + var sparkOperators: Int = 0 + var cometOperators: Int = 0 + var transitions: Int = 0 + + override def toString(): String = { + val eligible = sparkOperators + cometOperators + val converted = + if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0 + s"Comet accelerated $cometOperators out of $eligible " + + s"eligible operators (${converted.toInt}%). " + + s"Final plan contains $transitions transitions between Spark and Comet." + } +} + object CometExplainInfo { val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") @@ -162,6 +211,7 @@ object CometExplainInfo { case p: ReusedExchangeExec => getActualPlan(p.child) case p => p } + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index ca76234c87..04203f7545 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -118,6 +118,11 @@ class CometExecSuite extends CometTestBase { val (_, cometPlan) = checkSparkAnswer(df) val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) assert(infos.contains("Dynamic Partition Pruning is not supported")) + + withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { + val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + assert(extendedExplain.contains("Comet accelerated")) + } } } }