Skip to content

Commit 127bc89

Browse files
viiryadongjoon-hyun
andcommitted
[SPARK-27707][SQL] Prune unnecessary nested fields from Generate
## What changes were proposed in this pull request? Performance issue using explode was found when a complex field contains huge array is to get duplicated as the number of exploded array elements. Given example: ```scala val df = spark.sparkContext.parallelize(Seq(("1", Array.fill(M)({ val i = math.random (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) })))).toDF("col", "arr") .selectExpr("col", "struct(col, arr) as st") .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col") ``` The explode causes `st` to be duplicated as many as the exploded elements. Benchmarks it: ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 [info] Intel(R) Core(TM) i7-8750H CPU 2.20GHz [info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] generate big nested struct array wholestage off 52668 53162 699 0.0 877803.4 1.0X [info] generate big nested struct array wholestage on 47261 49093 1125 0.0 787690.2 1.1X [info] ``` The query plan: ``` == Physical Plan == Project [col#508, st#512.col AS col1#515, arr_col#519] +- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519] +- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS st#512] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#504] +- Scan[obj#534] ``` This patch takes nested column pruning approach to prune unnecessary nested fields. It adds a projection of the needed nested fields as aliases on the child of `Generate`, and substitutes them by alias attributes on the projection on top of `Generate`. Benchmarks it after the change: ``` [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 [info] Intel(R) Core(TM) i7-8750H CPU 2.20GHz [info] generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] generate big nested struct array wholestage off 311 331 28 0.2 5188.6 1.0X [info] generate big nested struct array wholestage on 297 312 15 0.2 4947.3 1.0X [info] ``` The query plan: ``` == Physical Plan == Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603] +- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, [arr_col#603] +- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS st#596, _1#587 AS _gen_alias_608#608] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(in put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._3, true, false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, None) AS _2#588] +- Scan[obj#586] ``` This behavior is controlled by a SQL config `spark.sql.optimizer.expression.nestedPruning.enabled`. ## How was this patch tested? Added benchmark. Closes apache#24637 from viirya/SPARK-27707. Lead-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 0512af1 commit 127bc89

File tree

7 files changed

+150
-7
lines changed

7 files changed

+150
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ object NestedColumnAliasing {
5454
/**
5555
* Return a replaced project list.
5656
*/
57-
private def getNewProjectList(
57+
def getNewProjectList(
5858
projectList: Seq[NamedExpression],
5959
nestedFieldToAlias: Map[ExtractValue, Alias]): Seq[NamedExpression] = {
6060
projectList.map(_.transform {
@@ -66,7 +66,7 @@ object NestedColumnAliasing {
6666
/**
6767
* Return a plan with new children replaced with aliases.
6868
*/
69-
private def replaceChildrenWithAliases(
69+
def replaceChildrenWithAliases(
7070
plan: LogicalPlan,
7171
attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
7272
plan.withNewChildren(plan.children.map { plan =>
@@ -107,10 +107,10 @@ object NestedColumnAliasing {
107107
* 1. ExtractValue -> Alias: A new alias is created for each nested field.
108108
* 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases pointing it.
109109
*/
110-
private def getAliasSubMap(projectList: Seq[NamedExpression])
110+
def getAliasSubMap(exprList: Seq[Expression])
111111
: Option[(Map[ExtractValue, Alias], Map[ExprId, Seq[Alias]])] = {
112112
val (nestedFieldReferences, otherRootReferences) =
113-
projectList.flatMap(collectRootReferenceAndExtractValue).partition {
113+
exprList.flatMap(collectRootReferenceAndExtractValue).partition {
114114
case _: ExtractValue => true
115115
case _ => false
116116
}
@@ -155,4 +155,15 @@ object NestedColumnAliasing {
155155
case MapType(keyType, valueType, _) => totalFieldNum(keyType) + totalFieldNum(valueType)
156156
case _ => 1 // UDT and others
157157
}
158+
159+
/**
160+
* This is a while-list for pruning nested fields at `Generator`.
161+
*/
162+
def canPruneGenerator(g: Generator): Boolean = g match {
163+
case _: Explode => true
164+
case _: Stack => true
165+
case _: PosExplode => true
166+
case _: Inline => true
167+
case _ => false
168+
}
158169
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
588588
.map(_._2)
589589
p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
590590

591+
// prune unrequired nested fields
592+
case p @ Project(projectList, g: Generate) if SQLConf.get.nestedPruningOnExpressions &&
593+
NestedColumnAliasing.canPruneGenerator(g.generator) =>
594+
NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map {
595+
case (nestedFieldToAlias, attrToAliases) =>
596+
val newGenerator = g.generator.transform {
597+
case f: ExtractValue if nestedFieldToAlias.contains(f) =>
598+
nestedFieldToAlias(f).toAttribute
599+
}.asInstanceOf[Generator]
600+
601+
// Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`.
602+
val newGenerate = g.copy(generator = newGenerator)
603+
604+
val newChild = NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases)
605+
606+
Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild)
607+
}.getOrElse(p)
608+
591609
// Eliminate unneeded attributes from right side of a Left Existence Join.
592610
case j @ Join(_, right, LeftExistence(_), _, _) =>
593611
j.copy(right = prunedChild(right, j.references))

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,16 @@ object SQLConf {
16561656
.booleanConf
16571657
.createWithDefault(false)
16581658

1659+
val NESTED_PRUNING_ON_EXPRESSIONS =
1660+
buildConf("spark.sql.optimizer.expression.nestedPruning.enabled")
1661+
.internal()
1662+
.doc("Prune nested fields from expressions in an operator which are unnecessary in " +
1663+
"satisfying a query. Note that this optimization doesn't prune nested fields from " +
1664+
"physical data source scanning. For pruning nested fields from scanning, please use " +
1665+
"`spark.sql.optimizer.nestedSchemaPruning.enabled` config.")
1666+
.booleanConf
1667+
.createWithDefault(false)
1668+
16591669
val TOP_K_SORT_FALLBACK_THRESHOLD =
16601670
buildConf("spark.sql.execution.topKSortFallbackThreshold")
16611671
.internal()
@@ -2315,6 +2325,8 @@ class SQLConf extends Serializable with Logging {
23152325
def serializerNestedSchemaPruningEnabled: Boolean =
23162326
getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED)
23172327

2328+
def nestedPruningOnExpressions: Boolean = getConf(NESTED_PRUNING_ON_EXPRESSIONS)
2329+
23182330
def csvColumnPruning: Boolean = getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
23192331

23202332
def legacySizeOfNull: Boolean = getConf(SQLConf.LEGACY_SIZE_OF_NULL)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import scala.reflect.runtime.universe.TypeTag
2121

22+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2223
import org.apache.spark.sql.catalyst.dsl.expressions._
2324
import org.apache.spark.sql.catalyst.dsl.plans._
2425
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2526
import org.apache.spark.sql.catalyst.expressions._
2627
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
2728
import org.apache.spark.sql.catalyst.plans.logical._
2829
import org.apache.spark.sql.catalyst.rules.RuleExecutor
29-
import org.apache.spark.sql.types.StringType
30+
import org.apache.spark.sql.internal.SQLConf
31+
import org.apache.spark.sql.types.{StringType, StructType}
3032

3133
class ColumnPruningSuite extends PlanTest {
3234

@@ -101,6 +103,81 @@ class ColumnPruningSuite extends PlanTest {
101103
comparePlans(optimized, correctAnswer)
102104
}
103105

106+
test("Nested column pruning for Generate") {
107+
def runTest(
108+
origGenerator: Generator,
109+
replacedGenerator: Seq[String] => Generator,
110+
aliasedExprs: Seq[String] => Seq[Expression],
111+
unrequiredChildIndex: Seq[Int],
112+
generatorOutputNames: Seq[String]) {
113+
withSQLConf(SQLConf.NESTED_PRUNING_ON_EXPRESSIONS.key -> "true") {
114+
val structType = StructType.fromDDL("d double, e array<string>, f double, g double, " +
115+
"h array<struct<h1: int, h2: double>>")
116+
val input = LocalRelation('a.int, 'b.int, 'c.struct(structType))
117+
val generatorOutputs = generatorOutputNames.map(UnresolvedAttribute(_))
118+
119+
val selectedExprs = Seq(UnresolvedAttribute("a"), 'c.getField("d")) ++
120+
generatorOutputs
121+
122+
val query =
123+
input
124+
.generate(origGenerator, outputNames = generatorOutputNames)
125+
.select(selectedExprs: _*)
126+
.analyze
127+
128+
val optimized = Optimize.execute(query)
129+
130+
val aliases = NestedColumnAliasingSuite.collectGeneratedAliases(optimized)
131+
132+
val selectedFields = UnresolvedAttribute("a") +: aliasedExprs(aliases)
133+
val finalSelectedExprs = Seq(UnresolvedAttribute("a"), $"${aliases(0)}".as("c.d")) ++
134+
generatorOutputs
135+
136+
val correctAnswer =
137+
input
138+
.select(selectedFields: _*)
139+
.generate(replacedGenerator(aliases),
140+
unrequiredChildIndex = unrequiredChildIndex,
141+
outputNames = generatorOutputNames)
142+
.select(finalSelectedExprs: _*)
143+
.analyze
144+
145+
comparePlans(optimized, correctAnswer)
146+
}
147+
}
148+
149+
runTest(
150+
Explode('c.getField("e")),
151+
aliases => Explode($"${aliases(1)}".as("c.e")),
152+
aliases => Seq('c.getField("d").as(aliases(0)), 'c.getField("e").as(aliases(1))),
153+
Seq(2),
154+
Seq("explode")
155+
)
156+
runTest(Stack(2 :: 'c.getField("f") :: 'c.getField("g") :: Nil),
157+
aliases => Stack(2 :: $"${aliases(1)}".as("c.f") :: $"${aliases(2)}".as("c.g") :: Nil),
158+
aliases => Seq(
159+
'c.getField("d").as(aliases(0)),
160+
'c.getField("f").as(aliases(1)),
161+
'c.getField("g").as(aliases(2))),
162+
Seq(2, 3),
163+
Seq("stack")
164+
)
165+
runTest(
166+
PosExplode('c.getField("e")),
167+
aliases => PosExplode($"${aliases(1)}".as("c.e")),
168+
aliases => Seq('c.getField("d").as(aliases(0)), 'c.getField("e").as(aliases(1))),
169+
Seq(2),
170+
Seq("pos", "explode")
171+
)
172+
runTest(
173+
Inline('c.getField("h")),
174+
aliases => Inline($"${aliases(1)}".as("c.h")),
175+
aliases => Seq('c.getField("d").as(aliases(0)), 'c.getField("h").as(aliases(1))),
176+
Seq(2),
177+
Seq("h1", "h2")
178+
)
179+
}
180+
104181
test("Column pruning for Project on Sort") {
105182
val input = LocalRelation('a.int, 'b.string, 'c.double)
106183

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
2929

3030
class NestedColumnAliasingSuite extends SchemaPruningTest {
3131

32+
import NestedColumnAliasingSuite._
33+
3234
object Optimize extends RuleExecutor[LogicalPlan] {
3335
val batches = Batch("Nested column pruning", FixedPoint(100),
3436
ColumnPruning,
@@ -264,9 +266,10 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
264266
.analyze
265267
comparePlans(optimized, expected)
266268
}
269+
}
267270

268-
269-
private def collectGeneratedAliases(query: LogicalPlan): ArrayBuffer[String] = {
271+
object NestedColumnAliasingSuite {
272+
def collectGeneratedAliases(query: LogicalPlan): ArrayBuffer[String] = {
270273
val aliases = ArrayBuffer[String]()
271274
query.transformAllExpressions {
272275
case a @ Alias(_, name) if name.startsWith("_gen_alias_") =>

sql/core/benchmarks/MiscBenchmark-results.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ generate big struct array: Best/Avg Time(ms) Rate(M/s) Per Ro
105105
generate big struct array wholestage off 708 / 776 0.1 11803.5 1.0X
106106
generate big struct array wholestage on 535 / 589 0.1 8913.9 1.3X
107107

108+
OpenJDK 64-Bit Server VM 1.8.0_212-b04 on Linux 3.10.0-862.3.2.el7.x86_64
109+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
110+
generate big nested struct array: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
111+
------------------------------------------------------------------------------------------------------------------------
112+
generate big nested struct array wholestage off 540 553 19 0.1 8997.4 1.0X
113+
generate big nested struct array wholestage on 523 554 31 0.1 8725.0 1.0X
114+
108115

109116
================================================================================================
110117
generate regular generator

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.execution.benchmark
1919

2020
import org.apache.spark.benchmark.Benchmark
21+
import org.apache.spark.sql.internal.SQLConf
2122

2223
/**
2324
* Benchmark to measure whole stage codegen performance.
@@ -130,6 +131,20 @@ object MiscBenchmark extends SqlBasedBenchmark {
130131
df.selectExpr("*", "explode(arr) as arr_col")
131132
.select("col", "arr_col.*").count
132133
}
134+
135+
withSQLConf(SQLConf.NESTED_PRUNING_ON_EXPRESSIONS.key -> "true") {
136+
codegenBenchmark("generate big nested struct array", M) {
137+
import spark.implicits._
138+
val df = spark.sparkContext.parallelize(Seq(("1",
139+
Array.fill(M)({
140+
val i = math.random
141+
(i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
142+
})))).toDF("col", "arr")
143+
.selectExpr("col", "struct(col, arr) as st")
144+
.selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
145+
df.collect()
146+
}
147+
}
133148
}
134149
}
135150

0 commit comments

Comments
 (0)