Skip to content

Commit b3fde5a

Browse files
rdbluecloud-fan
authored andcommitted
[SPARK-23877][SQL] Use filter predicates to prune partitions in metadata-only queries
## What changes were proposed in this pull request? This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver. This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq. ## How was this patch tested? Existing tests for metadata-only queries. Author: Ryan Blue <[email protected]> Closes apache#20988 from rdblue/SPARK-23877-metadata-only-push-filters.
1 parent e55953b commit b3fde5a

File tree

2 files changed

+132
-30
lines changed

2 files changed

+132
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
4949
}
5050

5151
plan.transform {
52-
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) =>
52+
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
5353
// We only apply this optimization when only partitioned attributes are scanned.
54-
if (a.references.subsetOf(partAttrs)) {
54+
if (a.references.subsetOf(attrs)) {
5555
val aggFunctions = aggExprs.flatMap(_.collect {
5656
case agg: AggregateExpression => agg
5757
})
@@ -67,7 +67,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
6767
})
6868
}
6969
if (isAllDistinctAgg) {
70-
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
70+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
7171
} else {
7272
a
7373
}
@@ -98,27 +98,49 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
9898
*/
9999
private def replaceTableScanWithPartitionMetadata(
100100
child: LogicalPlan,
101-
relation: LogicalPlan): LogicalPlan = {
101+
relation: LogicalPlan,
102+
partFilters: Seq[Expression]): LogicalPlan = {
103+
// this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
104+
// relation's schema. PartitionedRelation ensures that the filters only reference partition cols
105+
val relFilters = partFilters.map { e =>
106+
e transform {
107+
case a: AttributeReference =>
108+
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
109+
}
110+
}
111+
102112
child transform {
103113
case plan if plan eq relation =>
104114
relation match {
105115
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
106116
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
107-
val partitionData = fsRelation.location.listFiles(Nil, Nil)
108-
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
117+
val partitionData = fsRelation.location.listFiles(relFilters, Nil)
118+
// partition data may be a stream, which can cause serialization to hit stack level too
119+
// deep exceptions because it is a recursive structure in memory. converting to array
120+
// avoids the problem.
121+
LocalRelation(partAttrs, partitionData.map(_.values).toArray, isStreaming)
109122

110123
case relation: HiveTableRelation =>
111124
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
112125
val caseInsensitiveProperties =
113126
CaseInsensitiveMap(relation.tableMeta.storage.properties)
114127
val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
115128
.getOrElse(SQLConf.get.sessionLocalTimeZone)
116-
val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
129+
val partitions = if (partFilters.nonEmpty) {
130+
catalog.listPartitionsByFilter(relation.tableMeta.identifier, relFilters)
131+
} else {
132+
catalog.listPartitions(relation.tableMeta.identifier)
133+
}
134+
135+
val partitionData = partitions.map { p =>
117136
InternalRow.fromSeq(partAttrs.map { attr =>
118137
Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
119138
})
120139
}
121-
LocalRelation(partAttrs, partitionData)
140+
// partition data may be a stream, which can cause serialization to hit stack level too
141+
// deep exceptions because it is a recursive structure in memory. converting to array
142+
// avoids the problem.
143+
LocalRelation(partAttrs, partitionData.toArray)
122144

123145
case _ =>
124146
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
@@ -129,35 +151,47 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
129151

130152
/**
131153
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
132-
* pair of the partition attributes and the table relation node.
154+
* pair of the partition attributes, partition filters, and the table relation node.
133155
*
134156
* It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
135157
* deterministic expressions, and returns result after reaching the partitioned table relation
136158
* node.
137159
*/
138-
object PartitionedRelation {
139-
140-
def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
141-
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
142-
if fsRelation.partitionSchema.nonEmpty =>
143-
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
144-
Some((AttributeSet(partAttrs), l))
145-
146-
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
147-
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
148-
Some((AttributeSet(partAttrs), relation))
149-
150-
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
151-
unapply(child).flatMap { case (partAttrs, relation) =>
152-
if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None
153-
}
160+
object PartitionedRelation extends PredicateHelper {
161+
162+
def unapply(
163+
plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
164+
plan match {
165+
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
166+
if fsRelation.partitionSchema.nonEmpty =>
167+
val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
168+
Some((partAttrs, partAttrs, Nil, l))
169+
170+
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
171+
val partAttrs = AttributeSet(
172+
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
173+
Some((partAttrs, partAttrs, Nil, relation))
174+
175+
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
176+
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
177+
if (p.references.subsetOf(attrs)) {
178+
Some((partAttrs, p.outputSet, filters, relation))
179+
} else {
180+
None
181+
}
182+
}
154183

155-
case f @ Filter(condition, child) if condition.deterministic =>
156-
unapply(child).flatMap { case (partAttrs, relation) =>
157-
if (f.references.subsetOf(partAttrs)) Some((partAttrs, relation)) else None
158-
}
184+
case f @ Filter(condition, child) if condition.deterministic =>
185+
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
186+
if (f.references.subsetOf(partAttrs)) {
187+
Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
188+
} else {
189+
None
190+
}
191+
}
159192

160-
case _ => None
193+
case _ => None
194+
}
161195
}
162196
}
163197
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import org.scalatest.BeforeAndAfter
21+
22+
import org.apache.spark.metrics.source.HiveCatalogMetrics
23+
import org.apache.spark.sql.QueryTest
24+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
25+
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
26+
import org.apache.spark.sql.hive.test.TestHiveSingleton
27+
import org.apache.spark.sql.test.SQLTestUtils
28+
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
29+
30+
class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleton
31+
with BeforeAndAfter with SQLTestUtils {
32+
33+
import spark.implicits._
34+
35+
before {
36+
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
37+
(0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
38+
}
39+
40+
test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
41+
withTable("metadata_only") {
42+
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
43+
44+
// verify the number of matching partitions
45+
assert(sql("SELECT DISTINCT part FROM metadata_only WHERE part < 5").collect().length === 5)
46+
47+
// verify that the partition predicate was pushed down to the metastore
48+
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount === 5)
49+
}
50+
}
51+
52+
test("SPARK-23877: filter on projected expression") {
53+
withTable("metadata_only") {
54+
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
55+
56+
// verify the matching partitions
57+
val partitions = spark.internalCreateDataFrame(Distinct(Filter(($"x" < 5).expr,
58+
Project(Seq(($"part" + 1).as("x").expr.asInstanceOf[NamedExpression]),
59+
spark.table("metadata_only").logicalPlan.asInstanceOf[SubqueryAlias].child)))
60+
.queryExecution.toRdd, StructType(Seq(StructField("x", IntegerType))))
61+
62+
checkAnswer(partitions, Seq(1, 2, 3, 4).toDF("x"))
63+
64+
// verify that the partition predicate was not pushed down to the metastore
65+
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount == 11)
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)