Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,10 @@ object CometConf extends ShimCometConf {
val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
.doc(
"When this setting is enabled, Comet will provide a verbose tree representation of " +
"the extended information.")
"When this setting is enabled, Comet's extended explain output will provide the full " +
"query plan annotated with fallback reasons as well as a summary of how much of " +
"the plan was accelerated by Comet. When this setting is disabled, a list of fallback " +
"reasons will be provided instead.")
.booleanConf
.createWithDefault(false)

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
| spark.comet.exec.window.enabled | Whether to enable window by default. | true |
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
Expand Down
68 changes: 59 additions & 9 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.collection.mutable

import org.apache.spark.sql.ExtendedExplainGenerator
import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

import org.apache.comet.CometExplainInfo.getActualPlan
Expand Down Expand Up @@ -82,9 +83,18 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
// generates the extended info in a verbose manner, printing each node along with the
// extended information in a tree display
def generateVerboseExtendedInfo(plan: SparkPlan): String = {
val planStats = new CometCoverageStats()
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString)
outString.toString()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
s"${outString.toString()}\n$planStats"
}

/** Get the coverage statistics without the full plan */
def generateCoverageInfo(plan: SparkPlan): String = {
val planStats = new CometCoverageStats()
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
planStats.toString()
}

// Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any
Expand All @@ -93,7 +103,22 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
depth: Int,
lastChildren: Seq[Boolean],
indent: Int,
outString: StringBuilder): Unit = {
outString: StringBuilder,
planStats: CometCoverageStats): Unit = {

node match {
case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec |
_: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec =>
// ignore
case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec |
_: CometSparkToColumnarExec =>
planStats.transitions += 1
case _: CometPlan =>
planStats.cometOperators += 1
case _ =>
planStats.sparkOperators += 1
}

outString.append(" " * indent)
if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand All @@ -120,15 +145,17 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
depth + 2,
lastChildren :+ node.children.isEmpty :+ false,
indent,
outString)
outString,
planStats)
case _ =>
}
generateTreeString(
getActualPlan(innerChildrenLocal.last),
depth + 2,
lastChildren :+ node.children.isEmpty :+ true,
indent,
outString)
outString,
planStats)
}
if (node.children.nonEmpty) {
node.children.init.foreach {
Expand All @@ -138,18 +165,40 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
depth + 1,
lastChildren :+ false,
indent,
outString)
outString,
planStats)
case _ =>
}
node.children.last match {
case c @ (_: TreeNode[_]) =>
generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString)
generateTreeString(
getActualPlan(c),
depth + 1,
lastChildren :+ true,
indent,
outString,
planStats)
case _ =>
}
}
}
}

class CometCoverageStats {
var sparkOperators: Int = 0
var cometOperators: Int = 0
var transitions: Int = 0

override def toString(): String = {
val eligible = sparkOperators + cometOperators
val converted =
if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0
s"Comet accelerated $cometOperators out of $eligible " +
s"eligible operators (${converted.toInt}%). " +
s"Final plan contains $transitions transitions between Spark and Comet."
}
}

object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

Expand All @@ -162,6 +211,7 @@ object CometExplainInfo {
case p: ReusedExchangeExec => getActualPlan(p.child)
case p => p
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class CometExecSuite extends CometTestBase {
val (_, cometPlan) = checkSparkAnswer(df)
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(infos.contains("Dynamic Partition Pruning is not supported"))

withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") {
val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(extendedExplain.contains("Comet accelerated"))
}
}
}
}
Expand Down
Loading