Skip to content

Commit c0e1190

Browse files
committed
v1
1 parent f5e5dda commit c0e1190

File tree

17 files changed

+136
-129
lines changed

17 files changed

+136
-129
lines changed

backends-velox/src/test/scala/org/apache/gluten/execution/VeloxScanSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,19 @@ class VeloxScanSuite extends VeloxWholeStageTransformerSuite {
197197
}
198198
}
199199

200+
test("remove pushed down filter in filter node") {
201+
createTPCHNotNullTables()
202+
val query = "select l_partkey from lineitem where l_partkey = 1"
203+
runQueryAndCompare(query) {
204+
df =>
205+
{
206+
val executedPlan = getExecutedPlan(df)
207+
val filter = executedPlan.collect { case f: FilterExecTransformerBase => f }
208+
assert(filter.isEmpty)
209+
}
210+
}
211+
}
212+
200213
test("test binary as string") {
201214
withTempDir {
202215
dir =>

gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ case class DeltaScanTransformer(
7777
disableBucketedScan
7878
)
7979
}
80+
81+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer =
82+
copy(output = newOutput)
8083
}
8184

8285
object DeltaScanTransformer {

gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ case class HudiScanTransformer(
6969
disableBucketedScan
7070
)
7171
}
72+
73+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer =
74+
copy(output = newOutput)
7275
}
7376

7477
object HudiScanTransformer {

gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
2525

2626
import org.apache.spark.Partition
2727
import org.apache.spark.sql.catalyst.InternalRow
28-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal}
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, DynamicPruningExpression, Expression, Literal}
2929
import org.apache.spark.sql.catalyst.plans.QueryPlan
3030
import org.apache.spark.sql.connector.catalog.Table
3131
import org.apache.spark.sql.connector.read.Scan
@@ -238,6 +238,9 @@ case class IcebergScanTransformer(
238238
case _ => false
239239
}
240240
}
241+
242+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer =
243+
copy(output = newOutput.map(_.asInstanceOf[AttributeReference]))
241244
}
242245

243246
object IcebergScanTransformer {

gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
2222

2323
import org.apache.spark.Partition
2424
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
25+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
2626
import org.apache.spark.sql.connector.catalog.Table
2727
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan}
2828
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
@@ -96,6 +96,9 @@ case class MicroBatchScanExecTransformer(
9696
ctx.root.asInstanceOf[ReadRelNode].setStreamKafka(true);
9797
ctx
9898
}
99+
100+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer =
101+
copy(output = newOutput.map(_.asInstanceOf[AttributeReference]))
99102
}
100103

101104
object MicroBatchScanExecTransformer {

gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
2626
import org.apache.spark.softaffinity.SoftAffinity
2727
import org.apache.spark.sql.catalyst.InternalRow
2828
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
29-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal}
29+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, DynamicPruningExpression, Expression, Literal}
3030
import org.apache.spark.sql.catalyst.plans.QueryPlan
3131
import org.apache.spark.sql.connector.catalog.Table
3232
import org.apache.spark.sql.connector.read.Scan
@@ -74,8 +74,6 @@ case class PaimonScanTransformer(
7474
throw new GlutenNotSupportException("Only support PaimonScan.")
7575
}
7676

77-
override def filterExprs(): Seq[Expression] = pushdownFilters
78-
7977
override def getPartitionSchema: StructType = scan match {
8078
case paimonScan: PaimonScan =>
8179
val partitionKeys = paimonScan.table.partitionKeys()
@@ -179,6 +177,9 @@ case class PaimonScanTransformer(
179177

180178
override protected[this] def supportsBatchScan(scan: Scan): Boolean =
181179
PaimonScanTransformer.supportsBatchScan(scan)
180+
181+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer =
182+
copy(output = newOutput.map(_.asInstanceOf[AttributeReference]))
182183
}
183184

184185
object PaimonScanTransformer {

gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala

Lines changed: 38 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.gluten.execution
1818

1919
import org.apache.gluten.backendsapi.BackendsApiManager
20-
import org.apache.gluten.exception.GlutenNotSupportException
2120
import org.apache.gluten.expression.{ExpressionConverter, ExpressionTransformer}
2221
import org.apache.gluten.extension.columnar.transition.Convention
2322
import org.apache.gluten.metrics.MetricsUpdater
@@ -26,9 +25,9 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
2625

2726
import org.apache.spark.internal.Logging
2827
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, _}
3029
import org.apache.spark.sql.execution._
31-
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
30+
import org.apache.spark.sql.execution.metric.SQLMetric
3231
import org.apache.spark.sql.utils.StructTypeFWD
3332
import org.apache.spark.sql.vectorized.ColumnarBatch
3433

@@ -42,26 +41,11 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
4241
with Logging {
4342

4443
// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
45-
@transient override lazy val metrics =
44+
@transient override lazy val metrics: Map[String, SQLMetric] =
4645
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext)
4746

48-
// Split out all the IsNotNulls from condition.
49-
protected val (notNullPreds, _) = splitConjunctivePredicates(cond).partition {
50-
case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(child.outputSet)
51-
case _ => false
52-
}
53-
54-
// The columns that will filtered out by `IsNotNull` could be considered as not nullable.
55-
protected val notNullAttributes: Seq[ExprId] =
56-
notNullPreds.flatMap(_.references).distinct.map(_.exprId)
57-
58-
override def isNoop: Boolean = getRemainingCondition == null
59-
60-
override def metricsUpdater(): MetricsUpdater = if (isNoop) {
61-
MetricsUpdater.None
62-
} else {
47+
override def metricsUpdater(): MetricsUpdater =
6348
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(metrics)
64-
}
6549

6650
def getRelNode(
6751
context: SubstraitContext,
@@ -84,85 +68,58 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
8468
)
8569
}
8670

87-
override def output: Seq[Attribute] = {
88-
child.output.map {
89-
a =>
90-
if (a.nullable && notNullAttributes.contains(a.exprId)) {
91-
a.withNullability(false)
92-
} else {
93-
a
94-
}
95-
}
96-
}
71+
override def output: Seq[Attribute] = FilterExecTransformerBase.buildNewOutput(child.output, cond)
9772

9873
override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
9974

10075
override protected def outputExpressions: Seq[NamedExpression] = child.output
10176

102-
// FIXME: Should use field "condition" to store the actual executed filter expressions.
103-
// To make optimization easier (like to remove filter when it actually does nothing)
104-
protected def getRemainingCondition: Expression = {
105-
val scanFilters = child match {
106-
// Get the filters including the manually pushed down ones.
107-
case basicScanExecTransformer: BasicScanExecTransformer =>
108-
basicScanExecTransformer.filterExprs()
109-
// For fallback scan, we need to keep original filter.
110-
case _ =>
111-
Seq.empty[Expression]
112-
}
113-
if (scanFilters.isEmpty) {
114-
cond
115-
} else {
116-
val remainingFilters =
117-
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(cond))
118-
remainingFilters.reduceLeftOption(And).orNull
119-
}
120-
}
121-
12277
override protected def doValidateInternal(): ValidationResult = {
123-
val remainingCondition = getRemainingCondition
124-
if (remainingCondition == null) {
125-
// All the filters can be pushed down and the computing of this Filter
126-
// is not needed.
127-
return ValidationResult.succeeded
128-
}
12978
val substraitContext = new SubstraitContext
13079
val operatorId = substraitContext.nextOperatorId(this.nodeName)
13180
// Firstly, need to check if the Substrait plan for this operator can be successfully generated.
132-
val relNode = getRelNode(
133-
substraitContext,
134-
remainingCondition,
135-
child.output,
136-
operatorId,
137-
null,
138-
validation = true)
81+
val relNode =
82+
getRelNode(substraitContext, cond, child.output, operatorId, null, validation = true)
13983
// Then, validate the generated plan in native engine.
14084
doNativeValidation(substraitContext, relNode)
14185
}
14286

14387
override protected def doTransform(context: SubstraitContext): TransformContext = {
14488
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
145-
if (isNoop) {
146-
// The computing for this filter is not needed.
147-
// Since some columns' nullability will be removed after this filter, we need to update the
148-
// outputAttributes of child context.
149-
return TransformContext(output, childCtx.root)
150-
}
151-
15289
val operatorId = context.nextOperatorId(this.nodeName)
153-
val remainingCondition = getRemainingCondition
154-
val currRel = getRelNode(
155-
context,
156-
remainingCondition,
157-
child.output,
158-
operatorId,
159-
childCtx.root,
160-
validation = false)
90+
val currRel =
91+
getRelNode(context, cond, child.output, operatorId, childCtx.root, validation = false)
16192
assert(currRel != null, "Filter rel should be valid.")
16293
TransformContext(output, currRel)
16394
}
16495
}
16596

97+
object FilterExecTransformerBase extends PredicateHelper {
98+
99+
def buildNewOutput(output: Seq[Attribute], cond: Expression): Seq[Attribute] = {
100+
buildNewOutput(output, splitConjunctivePredicates(cond))
101+
}
102+
103+
def buildNewOutput(output: Seq[Attribute], conds: Seq[Expression]): Seq[Attribute] = {
104+
// Split out all the IsNotNulls from condition.
105+
val (notNullPreds, _) = conds.partition {
106+
case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(AttributeSet(output))
107+
case _ => false
108+
}
109+
110+
// The columns that will filter out by `IsNotNull` could be considered as not nullable.
111+
val notNullAttributes: Seq[ExprId] = notNullPreds.flatMap(_.references).distinct.map(_.exprId)
112+
output.map {
113+
a =>
114+
if (a.nullable && notNullAttributes.contains(a.exprId)) {
115+
a.withNullability(false)
116+
} else {
117+
a
118+
}
119+
}
120+
}
121+
}
122+
166123
abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val input: SparkPlan)
167124
extends UnaryTransformSupport
168125
with OrderPreservingNodeShim
@@ -171,7 +128,7 @@ abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val in
171128
with Logging {
172129

173130
// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
174-
@transient override lazy val metrics =
131+
@transient override lazy val metrics: Map[String, SQLMetric] =
175132
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetrics(sparkContext)
176133

177134
override protected def doValidateInternal(): ValidationResult = {
@@ -281,37 +238,11 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends ValidatablePlan {
281238
}
282239

283240
/**
284-
* Contains functions for the comparision and separation of the filter conditions in Scan and
285-
* Filter. Contains the function to manually push down the conditions into Scan.
241+
* Contains functions for the comparison and separation of the filter conditions in Scan and Filter.
242+
* Contains the function to manually push down the conditions into Scan.
286243
*/
287244
object FilterHandler extends PredicateHelper {
288245

289-
/**
290-
* Get the original filter conditions in Scan for the comparison with those in Filter.
291-
*
292-
* @param plan
293-
* : the Spark plan
294-
* @return
295-
* If the plan is FileSourceScanExec or BatchScanExec, return the filter conditions in it.
296-
* Otherwise, return empty sequence.
297-
*/
298-
def getScanFilters(plan: SparkPlan): Seq[Expression] = {
299-
plan match {
300-
case fileSourceScan: FileSourceScanExec =>
301-
fileSourceScan.dataFilters
302-
case batchScan: BatchScanExec =>
303-
batchScan.scan match {
304-
case scan: FileScan =>
305-
scan.dataFilters
306-
case _ =>
307-
throw new GlutenNotSupportException(
308-
s"${batchScan.scan.getClass.toString} is not supported")
309-
}
310-
case _ =>
311-
Seq()
312-
}
313-
}
314-
315246
/**
316247
* Compare the semantics of the filter conditions pushed down to Scan and in the Filter.
317248
*

gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
4040
/** Returns the filters that can be pushed down to native file scan */
4141
def filterExprs(): Seq[Expression]
4242

43+
def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer
44+
4345
def getMetadataColumns(): Seq[AttributeReference]
4446

4547
/** This can be used to report FileFormat for a file based scan operator. */

gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ case class BatchScanExecTransformer(
6868
output)
6969
)
7070
}
71+
72+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer = {
73+
copy(output = newOutput.map(_.asInstanceOf[AttributeReference]))
74+
}
7175
}
7276

7377
abstract class BatchScanExecTransformerBase(

gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ case class FileSourceScanExecTransformer(
7474
disableBucketedScan
7575
)
7676
}
77+
78+
override def withNewOutput(newOutput: Seq[Attribute]): BasicScanExecTransformer =
79+
copy(output = newOutput)
7780
}
7881

7982
abstract class FileSourceScanExecTransformerBase(

0 commit comments

Comments
 (0)