Skip to content

Commit ca2e611

Browse files
authored
feat: Add plan conversion statistics to extended explain info (#2412)
1 parent 828642a commit ca2e611

File tree

4 files changed

+69
-12
lines changed

4 files changed

+69
-12
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,10 @@ object CometConf extends ShimCometConf {
457457
val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
458458
conf("spark.comet.explain.verbose.enabled")
459459
.doc(
460-
"When this setting is enabled, Comet will provide a verbose tree representation of " +
461-
"the extended information.")
460+
"When this setting is enabled, Comet's extended explain output will provide the full " +
461+
"query plan annotated with fallback reasons as well as a summary of how much of " +
462+
"the plan was accelerated by Comet. When this setting is disabled, a list of fallback " +
463+
"reasons will be provided instead.")
462464
.booleanConf
463465
.createWithDefault(false)
464466

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Comet provides the following configuration settings.
6363
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
6464
| spark.comet.exec.window.enabled | Whether to enable window by default. | true |
6565
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
66-
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
66+
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
6767
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
6868
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
6969
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |

spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import scala.collection.mutable
2323

2424
import org.apache.spark.sql.ExtendedExplainGenerator
2525
import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
26-
import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec}
27-
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
26+
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec}
27+
import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec}
28+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec}
2829
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2930

3031
import org.apache.comet.CometExplainInfo.getActualPlan
@@ -82,9 +83,18 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
8283
// generates the extended info in a verbose manner, printing each node along with the
8384
// extended information in a tree display
8485
def generateVerboseExtendedInfo(plan: SparkPlan): String = {
86+
val planStats = new CometCoverageStats()
8587
val outString = new StringBuilder()
86-
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString)
87-
outString.toString()
88+
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
89+
s"${outString.toString()}\n$planStats"
90+
}
91+
92+
/** Get the coverage statistics without the full plan */
93+
def generateCoverageInfo(plan: SparkPlan): String = {
94+
val planStats = new CometCoverageStats()
95+
val outString = new StringBuilder()
96+
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
97+
planStats.toString()
8898
}
8999

90100
// Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any
@@ -93,7 +103,22 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
93103
depth: Int,
94104
lastChildren: Seq[Boolean],
95105
indent: Int,
96-
outString: StringBuilder): Unit = {
106+
outString: StringBuilder,
107+
planStats: CometCoverageStats): Unit = {
108+
109+
node match {
110+
case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec |
111+
_: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec =>
112+
// ignore
113+
case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec |
114+
_: CometSparkToColumnarExec =>
115+
planStats.transitions += 1
116+
case _: CometPlan =>
117+
planStats.cometOperators += 1
118+
case _ =>
119+
planStats.sparkOperators += 1
120+
}
121+
97122
outString.append(" " * indent)
98123
if (depth > 0) {
99124
lastChildren.init.foreach { isLast =>
@@ -120,15 +145,17 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
120145
depth + 2,
121146
lastChildren :+ node.children.isEmpty :+ false,
122147
indent,
123-
outString)
148+
outString,
149+
planStats)
124150
case _ =>
125151
}
126152
generateTreeString(
127153
getActualPlan(innerChildrenLocal.last),
128154
depth + 2,
129155
lastChildren :+ node.children.isEmpty :+ true,
130156
indent,
131-
outString)
157+
outString,
158+
planStats)
132159
}
133160
if (node.children.nonEmpty) {
134161
node.children.init.foreach {
@@ -138,18 +165,40 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
138165
depth + 1,
139166
lastChildren :+ false,
140167
indent,
141-
outString)
168+
outString,
169+
planStats)
142170
case _ =>
143171
}
144172
node.children.last match {
145173
case c @ (_: TreeNode[_]) =>
146-
generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString)
174+
generateTreeString(
175+
getActualPlan(c),
176+
depth + 1,
177+
lastChildren :+ true,
178+
indent,
179+
outString,
180+
planStats)
147181
case _ =>
148182
}
149183
}
150184
}
151185
}
152186

187+
class CometCoverageStats {
188+
var sparkOperators: Int = 0
189+
var cometOperators: Int = 0
190+
var transitions: Int = 0
191+
192+
override def toString(): String = {
193+
val eligible = sparkOperators + cometOperators
194+
val converted =
195+
if (eligible == 0) 0.0 else cometOperators.toDouble / eligible * 100.0
196+
s"Comet accelerated $cometOperators out of $eligible " +
197+
s"eligible operators (${converted.toInt}%). " +
198+
s"Final plan contains $transitions transitions between Spark and Comet."
199+
}
200+
}
201+
153202
object CometExplainInfo {
154203
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")
155204

@@ -162,6 +211,7 @@ object CometExplainInfo {
162211
case p: ReusedExchangeExec => getActualPlan(p.child)
163212
case p => p
164213
}
214+
165215
}
166216

167217
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ class CometExecSuite extends CometTestBase {
118118
val (_, cometPlan) = checkSparkAnswer(df)
119119
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
120120
assert(infos.contains("Dynamic Partition Pruning is not supported"))
121+
122+
withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") {
123+
val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
124+
assert(extendedExplain.contains("Comet accelerated"))
125+
}
121126
}
122127
}
123128
}

0 commit comments

Comments
 (0)