Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 280c35a

Browse files
committed
[SPARK-18854][SQL] numberedTreeString and apply(i) inconsistent for subqueries
## What changes were proposed in this pull request? This is a bug introduced by subquery handling. numberedTreeString (which uses generateTreeString under the hood) numbers trees including innerChildren (used to print subqueries), but apply (which uses getNodeNumbered) ignores innerChildren. As a result, apply(i) would return the wrong plan node if there are subqueries. This patch fixes the bug. ## How was this patch tested? Added a test case in SubquerySuite.scala to test both the depth-first traversal of numbering as well as making sure the two methods are consistent. Author: Reynold Xin <[email protected]> Closes apache#16277 from rxin/SPARK-18854. (cherry picked from commit ffdd1fc) Signed-off-by: Reynold Xin <[email protected]>
1 parent d0d9c57 commit 280c35a

File tree

5 files changed

+55
-23
lines changed

5 files changed

+55
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ import org.apache.spark.sql.types.{DataType, StructType}
2424
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
2525
self: PlanType =>
2626

27+
/**
28+
* Override [[TreeNode.apply]] to so we can return a more narrow type.
29+
*
30+
* Note that this cannot return BaseType because logical plan's plan node might return
31+
* physical plan for innerChildren, e.g. in-memory relation logical plan node has a reference
32+
* to the physical plan node it is referencing.
33+
*/
34+
override def apply(number: Int): QueryPlan[_] = super.apply(number).asInstanceOf[QueryPlan[_]]
35+
2736
def output: Seq[Attribute]
2837

2938
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)])
412412
s"CTE $cteAliases"
413413
}
414414

415-
override def innerChildren: Seq[QueryPlan[_]] = cteRelations.map(_._2)
415+
override def innerChildren: Seq[LogicalPlan] = cteRelations.map(_._2)
416416
}
417417

418418
case class WithWindowDefinition(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,16 @@ package org.apache.spark.sql.catalyst.trees
2020
import java.util.UUID
2121

2222
import scala.collection.Map
23-
import scala.collection.mutable.Stack
2423
import scala.reflect.ClassTag
2524

2625
import org.apache.commons.lang3.ClassUtils
2726
import org.json4s.JsonAST._
2827
import org.json4s.JsonDSL._
2928
import org.json4s.jackson.JsonMethods._
3029

31-
import org.apache.spark.SparkContext
32-
import org.apache.spark.rdd.{EmptyRDD, RDD}
3330
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
3431
import org.apache.spark.sql.catalyst.FunctionIdentifier
3532
import org.apache.spark.sql.catalyst.ScalaReflection._
36-
import org.apache.spark.sql.catalyst.ScalaReflectionLock
3733
import org.apache.spark.sql.catalyst.TableIdentifier
3834
import org.apache.spark.sql.catalyst.errors._
3935
import org.apache.spark.sql.catalyst.expressions._
@@ -493,25 +489,35 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
493489

494490
/**
495491
* Returns a string representation of the nodes in this tree, where each operator is numbered.
496-
* The numbers can be used with [[trees.TreeNode.apply apply]] to easily access specific subtrees.
492+
* The numbers can be used with [[TreeNode.apply]] to easily access specific subtrees.
493+
*
494+
* The numbers are based on depth-first traversal of the tree (with innerChildren traversed first
495+
* before children).
497496
*/
498497
def numberedTreeString: String =
499498
treeString.split("\n").zipWithIndex.map { case (line, i) => f"$i%02d $line" }.mkString("\n")
500499

501500
/**
502501
* Returns the tree node at the specified number.
503502
* Numbers for each node can be found in the [[numberedTreeString]].
503+
*
504+
* Note that this cannot return BaseType because logical plan's plan node might return
505+
* physical plan for innerChildren, e.g. in-memory relation logical plan node has a reference
506+
* to the physical plan node it is referencing.
504507
*/
505-
def apply(number: Int): BaseType = getNodeNumbered(new MutableInt(number))
508+
def apply(number: Int): TreeNode[_] = getNodeNumbered(new MutableInt(number)).orNull
506509

507-
protected def getNodeNumbered(number: MutableInt): BaseType = {
510+
private def getNodeNumbered(number: MutableInt): Option[TreeNode[_]] = {
508511
if (number.i < 0) {
509-
null.asInstanceOf[BaseType]
512+
None
510513
} else if (number.i == 0) {
511-
this
514+
Some(this)
512515
} else {
513516
number.i -= 1
514-
children.map(_.getNodeNumbered(number)).find(_ != null).getOrElse(null.asInstanceOf[BaseType])
517+
// Note that this traversal order must be the same as numberedTreeString.
518+
innerChildren.map(_.getNodeNumbered(number)).find(_ != None).getOrElse {
519+
children.map(_.getNodeNumbered(number)).find(_ != None).flatten
520+
}
515521
}
516522
}
517523

@@ -527,26 +533,25 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
527533
* The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
528534
* depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and
529535
* `lastChildren` for the root node should be empty.
536+
*
537+
* Note that this traversal (numbering) order must be the same as [[getNodeNumbered]].
530538
*/
531539
def generateTreeString(
532540
depth: Int,
533541
lastChildren: Seq[Boolean],
534542
builder: StringBuilder,
535543
verbose: Boolean,
536544
prefix: String = ""): StringBuilder = {
545+
537546
if (depth > 0) {
538547
lastChildren.init.foreach { isLast =>
539-
val prefixFragment = if (isLast) " " else ": "
540-
builder.append(prefixFragment)
548+
builder.append(if (isLast) " " else ": ")
541549
}
542-
543-
val branch = if (lastChildren.last) "+- " else ":- "
544-
builder.append(branch)
550+
builder.append(if (lastChildren.last) "+- " else ":- ")
545551
}
546552

547553
builder.append(prefix)
548-
val headline = if (verbose) verboseString else simpleString
549-
builder.append(headline)
554+
builder.append(if (verbose) verboseString else simpleString)
550555
builder.append("\n")
551556

552557
if (innerChildren.nonEmpty) {
@@ -557,9 +562,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
557562
}
558563

559564
if (children.nonEmpty) {
560-
children.init.foreach(
561-
_.generateTreeString(depth + 1, lastChildren :+ false, builder, verbose, prefix))
562-
children.last.generateTreeString(depth + 1, lastChildren :+ true, builder, verbose, prefix)
565+
children.init.foreach(_.generateTreeString(
566+
depth + 1, lastChildren :+ false, builder, verbose, prefix))
567+
children.last.generateTreeString(
568+
depth + 1, lastChildren :+ true, builder, verbose, prefix)
563569
}
564570

565571
builder

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
2424
import org.apache.spark.sql.catalyst.InternalRow
2525
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2626
import org.apache.spark.sql.catalyst.expressions._
27-
import org.apache.spark.sql.catalyst.plans.QueryPlan
2827
import org.apache.spark.sql.catalyst.plans.logical
2928
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3029
import org.apache.spark.sql.execution.SparkPlan
@@ -64,7 +63,7 @@ case class InMemoryRelation(
6463
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
6564
extends logical.LeafNode with MultiInstanceRelation {
6665

67-
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
66+
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
6867

6968
override def producedAttributes: AttributeSet = outputSet
7069

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
5454
t.createOrReplaceTempView("t")
5555
}
5656

57+
test("SPARK-18854 numberedTreeString for subquery") {
58+
val df = sql("select * from range(10) where id not in " +
59+
"(select id from range(2) union all select id from range(2))")
60+
61+
// The depth first traversal of the plan tree
62+
val dfs = Seq("Project", "Filter", "Union", "Project", "Range", "Project", "Range", "Range")
63+
val numbered = df.queryExecution.analyzed.numberedTreeString.split("\n")
64+
65+
// There should be 8 plan nodes in total
66+
assert(numbered.size == dfs.size)
67+
68+
for (i <- dfs.indices) {
69+
val node = df.queryExecution.analyzed(i)
70+
assert(node.nodeName == dfs(i))
71+
assert(numbered(i).contains(node.nodeName))
72+
}
73+
}
74+
5775
test("rdd deserialization does not crash [SPARK-15791]") {
5876
sql("select (select 1 as b) as b").rdd.count()
5977
}

0 commit comments

Comments
 (0)