-
Notifications
You must be signed in to change notification settings - Fork 279
feat: Add plan conversion statistics to extended explain info #2412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
79310d0
5157f83
4ef5293
8b6dcfc
8dca97d
45aa334
2dc8f70
22407fc
f85ffd5
3552888
8fc2108
6f99dd9
9cd439f
ad7ff51
ac57ee9
36c0439
bdc8300
62473ad
e483ffa
6a0500d
6436574
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,8 +23,10 @@ import scala.collection.mutable | |
|
|
||
| import org.apache.spark.sql.ExtendedExplainGenerator | ||
| import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag} | ||
| import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec} | ||
| import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} | ||
| import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometPlan, CometSparkToColumnarExec} | ||
| import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, RowToColumnarExec, SparkPlan, WholeStageCodegenExec} | ||
| import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, QueryStageExec} | ||
| import org.apache.spark.sql.execution.exchange.ReusedExchangeExec | ||
|
|
||
| import org.apache.comet.CometExplainInfo.getActualPlan | ||
|
|
||
|
|
@@ -81,9 +83,14 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| // generates the extended info in a verbose manner, printing each node along with the | ||
| // extended information in a tree display | ||
| def generateVerboseExtendedInfo(plan: SparkPlan): String = { | ||
| val planStats = new PlanStats() | ||
| val outString = new StringBuilder() | ||
| generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString) | ||
| outString.toString() | ||
| generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats) | ||
| val eligible = planStats.sparkOperators + planStats.cometOperators | ||
| val converted = | ||
| if (eligible == 0) 0.0 else planStats.cometOperators.toDouble / eligible * 100.0 | ||
| val summary = s"Comet accelerated ${converted.toInt}% of eligible operators ($planStats)." | ||
| s"${outString.toString()}\n$summary" | ||
| } | ||
|
|
||
| // Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any | ||
|
|
@@ -92,7 +99,22 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| depth: Int, | ||
| lastChildren: Seq[Boolean], | ||
| indent: Int, | ||
| outString: StringBuilder): Unit = { | ||
| outString: StringBuilder, | ||
| planStats: PlanStats): Unit = { | ||
|
|
||
| node match { | ||
| case _: AdaptiveSparkPlanExec | _: InputAdapter | _: QueryStageExec | | ||
| _: WholeStageCodegenExec | _: ReusedExchangeExec | _: AQEShuffleReadExec => | ||
| planStats.wrappers += 1 | ||
| case _: RowToColumnarExec | _: ColumnarToRowExec | _: CometColumnarToRowExec | | ||
| _: CometSparkToColumnarExec => | ||
| planStats.transitions += 1 | ||
| case _: CometPlan => | ||
| planStats.cometOperators += 1 | ||
| case _ => | ||
| planStats.sparkOperators += 1 | ||
| } | ||
|
|
||
| outString.append(" " * indent) | ||
| if (depth > 0) { | ||
| lastChildren.init.foreach { isLast => | ||
|
|
@@ -119,15 +141,17 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| depth + 2, | ||
| lastChildren :+ node.children.isEmpty :+ false, | ||
| indent, | ||
| outString) | ||
| outString, | ||
| planStats) | ||
| case _ => | ||
| } | ||
| generateTreeString( | ||
| getActualPlan(innerChildrenLocal.last), | ||
| depth + 2, | ||
| lastChildren :+ node.children.isEmpty :+ true, | ||
| indent, | ||
| outString) | ||
| outString, | ||
| planStats) | ||
| } | ||
| if (node.children.nonEmpty) { | ||
| node.children.init.foreach { | ||
|
|
@@ -137,18 +161,37 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { | |
| depth + 1, | ||
| lastChildren :+ false, | ||
| indent, | ||
| outString) | ||
| outString, | ||
| planStats) | ||
| case _ => | ||
| } | ||
| node.children.last match { | ||
| case c @ (_: TreeNode[_]) => | ||
| generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString) | ||
| generateTreeString( | ||
| getActualPlan(c), | ||
| depth + 1, | ||
| lastChildren :+ true, | ||
| indent, | ||
| outString, | ||
| planStats) | ||
| case _ => | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| class PlanStats { | ||
| var sparkOperators: Int = 0 | ||
| var cometOperators: Int = 0 | ||
| var wrappers: Int = 0 | ||
| var transitions: Int = 0 | ||
|
|
||
| override def toString: String = { | ||
| s"sparkOperators=$sparkOperators, cometOperators=$cometOperators, " + | ||
|
||
| s"transitions=$transitions, wrappers=$wrappers" | ||
| } | ||
| } | ||
|
|
||
| object CometExplainInfo { | ||
| val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,6 +118,11 @@ class CometExecSuite extends CometTestBase { | |
| val (_, cometPlan) = checkSparkAnswer(df) | ||
| val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) | ||
| assert(infos.contains("Dynamic Partition Pruning is not supported")) | ||
|
|
||
| withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") { | ||
| val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) | ||
| assert(extendedExplain.contains("Comet accelerated 33% of eligible operators")) | ||
|
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.