Skip to content

Commit cfc3e1a

Browse files
xuanyuankinggatorsmile
authored andcommitted
[SPARK-24339][SQL] Prunes the unused columns from child of ScriptTransformation
## What changes were proposed in this pull request? Modify the strategy in ColumnPruning to add a Project between ScriptTransformation and its child, this strategy can reduce the scan time especially in the scenario of the table has many columns. ## How was this patch tested? Add UT in ColumnPruningSuite and ScriptTransformationSuite. Author: Yuanjian Li <[email protected]> Closes apache#21839 from xuanyuanking/SPARK-24339.
1 parent 61f0ca4 commit cfc3e1a

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,13 +501,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
501501
case d @ DeserializeToObject(_, _, child) if (child.outputSet -- d.references).nonEmpty =>
502502
d.copy(child = prunedChild(child, d.references))
503503

504-
// Prunes the unused columns from child of Aggregate/Expand/Generate
504+
// Prunes the unused columns from child of Aggregate/Expand/Generate/ScriptTransformation
505505
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
506506
a.copy(child = prunedChild(child, a.references))
507507
case f @ FlatMapGroupsInPandas(_, _, _, child) if (child.outputSet -- f.references).nonEmpty =>
508508
f.copy(child = prunedChild(child, f.references))
509509
case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty =>
510510
e.copy(child = prunedChild(child, e.references))
511+
case s @ ScriptTransformation(_, _, _, child, _)
512+
if (child.outputSet -- s.references).nonEmpty =>
513+
s.copy(child = prunedChild(child, s.references))
511514

512515
// prune unrequired references
513516
case p @ Project(_, g: Generate) if p.references != g.outputSet =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,30 @@ class ColumnPruningSuite extends PlanTest {
140140
comparePlans(optimized, expected)
141141
}
142142

143+
test("Column pruning for ScriptTransformation") {
144+
val input = LocalRelation('a.int, 'b.string, 'c.double)
145+
val query =
146+
ScriptTransformation(
147+
Seq('a, 'b),
148+
"func",
149+
Seq.empty,
150+
input,
151+
null).analyze
152+
val optimized = Optimize.execute(query)
153+
154+
val expected =
155+
ScriptTransformation(
156+
Seq('a, 'b),
157+
"func",
158+
Seq.empty,
159+
Project(
160+
Seq('a, 'b),
161+
input),
162+
null).analyze
163+
164+
comparePlans(optimized, expected)
165+
}
166+
143167
test("Column pruning on Filter") {
144168
val input = LocalRelation('a.int, 'b.string, 'c.double)
145169
val plan1 = Filter('a > 1, input).analyze

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,25 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
136136
}
137137
assert(e.getMessage.contains("Subprocess exited with status"))
138138
}
139+
140+
test("SPARK-24339 verify the result after pruning the unused columns") {
141+
val rowsDf = Seq(
142+
("Bob", 16, 176),
143+
("Alice", 32, 164),
144+
("David", 60, 192),
145+
("Amy", 24, 180)).toDF("name", "age", "height")
146+
147+
checkAnswer(
148+
rowsDf,
149+
(child: SparkPlan) => new ScriptTransformationExec(
150+
input = Seq(rowsDf.col("name").expr),
151+
script = "cat",
152+
output = Seq(AttributeReference("name", StringType)()),
153+
child = child,
154+
ioschema = serdeIOSchema
155+
),
156+
rowsDf.select("name").collect())
157+
}
139158
}
140159

141160
private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {

0 commit comments

Comments
 (0)