Skip to content

Commit 1737d45

Browse files
committed
[SPARK-24478][SQL][FOLLOWUP] Move projection and filter push down to physical conversion
## What changes were proposed in this pull request? This is a followup of apache#21503, to completely move operator pushdown to the planner rule. The code are mostly from apache#21319 ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes apache#21574 from cloud-fan/followup.
1 parent 8f225e0 commit 1737d45

File tree

3 files changed

+123
-117
lines changed

3 files changed

+123
-117
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@
2323
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
2424
* interface to report statistics to Spark.
2525
*
26-
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
27-
* DataSourceReader. Implementations that return more accurate statistics based on projection and
28-
* filters will not improve query performance until the planner can push operators before getting
29-
* stats.
26+
* Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader.
27+
* Implementations that return more accurate statistics based on pushed operators will not improve
28+
* query performance until the planner can push operators before getting stats.
3029
*/
3130
@InterfaceStability.Evolving
3231
public interface SupportsReportStatistics extends DataSourceReader {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 26 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2424
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
26-
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
27-
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
26+
import org.apache.spark.sql.sources.DataSourceRegister
2827
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
29-
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
28+
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
3029
import org.apache.spark.sql.types.StructType
3130

31+
/**
32+
* A logical plan representing a data source v2 scan.
33+
*
34+
* @param source An instance of a [[DataSourceV2]] implementation.
35+
* @param options The options for this scan. Used to create fresh [[DataSourceReader]].
36+
* @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh
37+
* [[DataSourceReader]].
38+
*/
3239
case class DataSourceV2Relation(
3340
source: DataSourceV2,
3441
output: Seq[AttributeReference],
3542
options: Map[String, String],
36-
userSpecifiedSchema: Option[StructType] = None)
43+
userSpecifiedSchema: Option[StructType])
3744
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
3845

3946
import DataSourceV2Relation._
@@ -42,14 +49,7 @@ case class DataSourceV2Relation(
4249

4350
override def simpleString: String = "RelationV2 " + metadataString
4451

45-
lazy val v2Options: DataSourceOptions = makeV2Options(options)
46-
47-
def newReader: DataSourceReader = userSpecifiedSchema match {
48-
case Some(userSchema) =>
49-
source.asReadSupportWithSchema.createReader(userSchema, v2Options)
50-
case None =>
51-
source.asReadSupport.createReader(v2Options)
52-
}
52+
def newReader(): DataSourceReader = source.createReader(options, userSpecifiedSchema)
5353

5454
override def computeStats(): Statistics = newReader match {
5555
case r: SupportsReportStatistics =>
@@ -139,83 +139,26 @@ object DataSourceV2Relation {
139139
source.getClass.getSimpleName
140140
}
141141
}
142-
}
143-
144-
private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
145-
new DataSourceOptions(options.asJava)
146-
}
147142

148-
private def schema(
149-
source: DataSourceV2,
150-
v2Options: DataSourceOptions,
151-
userSchema: Option[StructType]): StructType = {
152-
val reader = userSchema match {
153-
case Some(s) =>
154-
source.asReadSupportWithSchema.createReader(s, v2Options)
155-
case _ =>
156-
source.asReadSupport.createReader(v2Options)
143+
def createReader(
144+
options: Map[String, String],
145+
userSpecifiedSchema: Option[StructType]): DataSourceReader = {
146+
val v2Options = new DataSourceOptions(options.asJava)
147+
userSpecifiedSchema match {
148+
case Some(s) =>
149+
asReadSupportWithSchema.createReader(s, v2Options)
150+
case _ =>
151+
asReadSupport.createReader(v2Options)
152+
}
157153
}
158-
reader.readSchema()
159154
}
160155

161156
def create(
162157
source: DataSourceV2,
163158
options: Map[String, String],
164-
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
165-
val output = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
166-
DataSourceV2Relation(source, output, options, userSpecifiedSchema)
167-
}
168-
169-
def pushRequiredColumns(
170-
relation: DataSourceV2Relation,
171-
reader: DataSourceReader,
172-
struct: StructType): Seq[AttributeReference] = {
173-
reader match {
174-
case projectionSupport: SupportsPushDownRequiredColumns =>
175-
projectionSupport.pruneColumns(struct)
176-
// return the output columns from the relation that were projected
177-
val attrMap = relation.output.map(a => a.name -> a).toMap
178-
projectionSupport.readSchema().map(f => attrMap(f.name))
179-
case _ =>
180-
relation.output
181-
}
182-
}
183-
184-
def pushFilters(
185-
reader: DataSourceReader,
186-
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
187-
reader match {
188-
case r: SupportsPushDownCatalystFilters =>
189-
val postScanFilters = r.pushCatalystFilters(filters.toArray)
190-
val pushedFilters = r.pushedCatalystFilters()
191-
(postScanFilters, pushedFilters)
192-
193-
case r: SupportsPushDownFilters =>
194-
// A map from translated data source filters to original catalyst filter expressions.
195-
val translatedFilterToExpr = scala.collection.mutable.HashMap.empty[Filter, Expression]
196-
// Catalyst filter expression that can't be translated to data source filters.
197-
val untranslatableExprs = scala.collection.mutable.ArrayBuffer.empty[Expression]
198-
199-
for (filterExpr <- filters) {
200-
val translated = DataSourceStrategy.translateFilter(filterExpr)
201-
if (translated.isDefined) {
202-
translatedFilterToExpr(translated.get) = filterExpr
203-
} else {
204-
untranslatableExprs += filterExpr
205-
}
206-
}
207-
208-
// Data source filters that need to be evaluated again after scanning. which means
209-
// the data source cannot guarantee the rows returned can pass these filters.
210-
// As a result we must return it so Spark can plan an extra filter operator.
211-
val postScanFilters =
212-
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
213-
// The filters which are marked as pushed to this data source
214-
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
215-
216-
(untranslatableExprs ++ postScanFilters, pushedFilters)
217-
218-
case _ => (filters, Nil)
219-
}
159+
userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
160+
val reader = source.createReader(options, userSpecifiedSchema)
161+
DataSourceV2Relation(
162+
source, reader.readSchema().toAttributes, options, userSpecifiedSchema)
220163
}
221164
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 94 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,51 +17,115 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import org.apache.spark.sql.{execution, Strategy}
21-
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.sql.{sources, Strategy}
23+
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
2224
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2325
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24-
import org.apache.spark.sql.execution.SparkPlan
26+
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
27+
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
2528
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
29+
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
2630

2731
object DataSourceV2Strategy extends Strategy {
28-
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
29-
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
30-
val projectSet = AttributeSet(project.flatMap(_.references))
31-
val filterSet = AttributeSet(filters.flatMap(_.references))
32-
33-
val projection = if (filterSet.subsetOf(projectSet) &&
34-
AttributeSet(relation.output) == projectSet) {
35-
// When the required projection contains all of the filter columns and column pruning alone
36-
// can produce the required projection, push the required projection.
37-
// A final projection may still be needed if the data source produces a different column
38-
// order or if it cannot prune all of the nested columns.
39-
relation.output
40-
} else {
41-
// When there are filter columns not already in the required projection or when the required
42-
// projection is more complicated than column pruning, base column pruning on the set of
43-
// all columns needed by both.
44-
(projectSet ++ filterSet).toSeq
45-
}
4632

47-
val reader = relation.newReader
33+
/**
34+
* Pushes down filters to the data source reader
35+
*
36+
* @return pushed filter and post-scan filters.
37+
*/
38+
private def pushFilters(
39+
reader: DataSourceReader,
40+
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
41+
reader match {
42+
case r: SupportsPushDownCatalystFilters =>
43+
val postScanFilters = r.pushCatalystFilters(filters.toArray)
44+
val pushedFilters = r.pushedCatalystFilters()
45+
(pushedFilters, postScanFilters)
46+
47+
case r: SupportsPushDownFilters =>
48+
// A map from translated data source filters to original catalyst filter expressions.
49+
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
50+
// Catalyst filter expression that can't be translated to data source filters.
51+
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
52+
53+
for (filterExpr <- filters) {
54+
val translated = DataSourceStrategy.translateFilter(filterExpr)
55+
if (translated.isDefined) {
56+
translatedFilterToExpr(translated.get) = filterExpr
57+
} else {
58+
untranslatableExprs += filterExpr
59+
}
60+
}
61+
62+
// Data source filters that need to be evaluated again after scanning. which means
63+
// the data source cannot guarantee the rows returned can pass these filters.
64+
// As a result we must return it so Spark can plan an extra filter operator.
65+
val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray)
66+
.map(translatedFilterToExpr)
67+
// The filters which are marked as pushed to this data source
68+
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
69+
(pushedFilters, untranslatableExprs ++ postScanFilters)
70+
71+
case _ => (Nil, filters)
72+
}
73+
}
4874

49-
val output = DataSourceV2Relation.pushRequiredColumns(relation, reader,
50-
projection.asInstanceOf[Seq[AttributeReference]].toStructType)
75+
/**
76+
* Applies column pruning to the data source, w.r.t. the references of the given expressions.
77+
*
78+
* @return new output attributes after column pruning.
79+
*/
80+
// TODO: nested column pruning.
81+
private def pruneColumns(
82+
reader: DataSourceReader,
83+
relation: DataSourceV2Relation,
84+
exprs: Seq[Expression]): Seq[AttributeReference] = {
85+
reader match {
86+
case r: SupportsPushDownRequiredColumns =>
87+
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
88+
val neededOutput = relation.output.filter(requiredColumns.contains)
89+
if (neededOutput != relation.output) {
90+
r.pruneColumns(neededOutput.toStructType)
91+
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
92+
r.readSchema().toAttributes.map {
93+
// We have to keep the attribute id during transformation.
94+
a => a.withExprId(nameToAttr(a.name).exprId)
95+
}
96+
} else {
97+
relation.output
98+
}
99+
100+
case _ => relation.output
101+
}
102+
}
51103

52-
val (postScanFilters, pushedFilters) = DataSourceV2Relation.pushFilters(reader, filters)
53104

54-
logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
55-
logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
105+
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
106+
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
107+
val reader = relation.newReader()
108+
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
109+
// `postScanFilters` need to be evaluated after the scan.
110+
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
111+
val (pushedFilters, postScanFilters) = pushFilters(reader, filters)
112+
val output = pruneColumns(reader, relation, project ++ postScanFilters)
113+
logInfo(
114+
s"""
115+
|Pushing operators to ${relation.source.getClass}
116+
|Pushed Filters: ${pushedFilters.mkString(", ")}
117+
|Post-Scan Filters: ${postScanFilters.mkString(",")}
118+
|Output: ${output.mkString(", ")}
119+
""".stripMargin)
56120

57121
val scan = DataSourceV2ScanExec(
58122
output, relation.source, relation.options, pushedFilters, reader)
59123

60-
val filter = postScanFilters.reduceLeftOption(And)
61-
val withFilter = filter.map(execution.FilterExec(_, scan)).getOrElse(scan)
124+
val filterCondition = postScanFilters.reduceLeftOption(And)
125+
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
62126

63127
val withProjection = if (withFilter.output != project) {
64-
execution.ProjectExec(project, withFilter)
128+
ProjectExec(project, withFilter)
65129
} else {
66130
withFilter
67131
}

0 commit comments

Comments
 (0)