Skip to content

Commit f70f46d

Browse files
committed
[SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the handling of Project and Filter over partitioned relation
## What changes were proposed in this pull request? A followup of apache#20988 `PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation. ## How was this patch tested? existing test Author: Wenchen Fan <[email protected]> Closes apache#21111 from cloud-fan/refactor.
1 parent c3a86fa commit f70f46d

File tree

4 files changed

+39
-44
lines changed

4 files changed

+39
-44
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ object LocalRelation {
4343
}
4444
}
4545

46+
/**
47+
* Logical plan node for scanning data from a local collection.
48+
*
49+
* @param data The local collection holding the data. It doesn't need to be sent to executors
50+
* and then doesn't need to be serializable.
51+
*/
4652
case class LocalRelation(
4753
output: Seq[Attribute],
4854
data: Seq[InternalRow] = Nil,

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
2525

2626
/**
2727
* Physical plan node for scanning data from a local collection.
28+
*
29+
* `Seq` may not be serializable and ideally we should not send `rows` and `unsafeRows`
30+
* to the executors. Thus marking them as transient.
2831
*/
2932
case class LocalTableScanExec(
3033
output: Seq[Attribute],

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 17 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.aggregate._
27+
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2728
import org.apache.spark.sql.catalyst.plans.logical._
2829
import org.apache.spark.sql.catalyst.rules.Rule
2930
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
@@ -49,9 +50,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
4950
}
5051

5152
plan.transform {
52-
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
53+
case a @ Aggregate(_, aggExprs, child @ PhysicalOperation(
54+
projectList, filters, PartitionedRelation(partAttrs, rel))) =>
5355
// We only apply this optimization when only partitioned attributes are scanned.
54-
if (a.references.subsetOf(attrs)) {
56+
if (AttributeSet((projectList ++ filters).flatMap(_.references)).subsetOf(partAttrs)) {
57+
// The project list and filters all only refer to partition attributes, which means the
58+
// the Aggregator operator can also only refer to partition attributes, and filters are
59+
// all partition filters. This is a metadata only query we can optimize.
5560
val aggFunctions = aggExprs.flatMap(_.collect {
5661
case agg: AggregateExpression => agg
5762
})
@@ -102,7 +107,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
102107
partFilters: Seq[Expression]): LogicalPlan = {
103108
// this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
104109
// relation's schema. PartitionedRelation ensures that the filters only reference partition cols
105-
val relFilters = partFilters.map { e =>
110+
val normalizedFilters = partFilters.map { e =>
106111
e transform {
107112
case a: AttributeReference =>
108113
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
@@ -114,11 +119,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
114119
relation match {
115120
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
116121
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
117-
val partitionData = fsRelation.location.listFiles(relFilters, Nil)
118-
// partition data may be a stream, which can cause serialization to hit stack level too
119-
// deep exceptions because it is a recursive structure in memory. converting to array
120-
// avoids the problem.
121-
LocalRelation(partAttrs, partitionData.map(_.values).toArray, isStreaming)
122+
val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil)
123+
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
122124

123125
case relation: HiveTableRelation =>
124126
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
@@ -127,7 +129,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
127129
val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
128130
.getOrElse(SQLConf.get.sessionLocalTimeZone)
129131
val partitions = if (partFilters.nonEmpty) {
130-
catalog.listPartitionsByFilter(relation.tableMeta.identifier, relFilters)
132+
catalog.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
131133
} else {
132134
catalog.listPartitions(relation.tableMeta.identifier)
133135
}
@@ -137,10 +139,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
137139
Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
138140
})
139141
}
140-
// partition data may be a stream, which can cause serialization to hit stack level too
141-
// deep exceptions because it is a recursive structure in memory. converting to array
142-
// avoids the problem.
143-
LocalRelation(partAttrs, partitionData.toArray)
142+
LocalRelation(partAttrs, partitionData)
144143

145144
case _ =>
146145
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
@@ -151,44 +150,21 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
151150

152151
/**
153152
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
154-
* pair of the partition attributes, partition filters, and the table relation node.
155-
*
156-
* It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
157-
* deterministic expressions, and returns result after reaching the partitioned table relation
158-
* node.
153+
* pair of the partition attributes and the table relation node.
159154
*/
160155
object PartitionedRelation extends PredicateHelper {
161156

162-
def unapply(
163-
plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
157+
def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
164158
plan match {
165159
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
166-
if fsRelation.partitionSchema.nonEmpty =>
160+
if fsRelation.partitionSchema.nonEmpty =>
167161
val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
168-
Some((partAttrs, partAttrs, Nil, l))
162+
Some((partAttrs, l))
169163

170164
case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
171165
val partAttrs = AttributeSet(
172166
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
173-
Some((partAttrs, partAttrs, Nil, relation))
174-
175-
case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
176-
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
177-
if (p.references.subsetOf(attrs)) {
178-
Some((partAttrs, p.outputSet, filters, relation))
179-
} else {
180-
None
181-
}
182-
}
183-
184-
case f @ Filter(condition, child) if condition.deterministic =>
185-
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
186-
if (f.references.subsetOf(partAttrs)) {
187-
Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
188-
} else {
189-
None
190-
}
191-
}
167+
Some((partAttrs, relation))
192168

193169
case _ => None
194170
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.QueryTest
2424
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2525
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
2626
import org.apache.spark.sql.hive.test.TestHiveSingleton
27+
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
2728
import org.apache.spark.sql.test.SQLTestUtils
2829
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2930

@@ -32,13 +33,22 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto
3233

3334
import spark.implicits._
3435

35-
before {
36+
override def beforeAll(): Unit = {
37+
super.beforeAll()
3638
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
3739
(0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
3840
}
3941

42+
override protected def afterAll(): Unit = {
43+
try {
44+
sql("DROP TABLE IF EXISTS metadata_only")
45+
} finally {
46+
super.afterAll()
47+
}
48+
}
49+
4050
test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
41-
withTable("metadata_only") {
51+
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
4252
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
4353

4454
// verify the number of matching partitions
@@ -50,7 +60,7 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto
5060
}
5161

5262
test("SPARK-23877: filter on projected expression") {
53-
withTable("metadata_only") {
63+
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
5464
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
5565

5666
// verify the matching partitions

0 commit comments

Comments
 (0)