Skip to content
Open
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
816d27d
Start adding a test for relationship pushdown where we don't push dow…
holdenk Apr 1, 2024
4bf7342
Break up the filter pushdown test into two parts with || and && since…
holdenk Apr 1, 2024
5c3e437
Add an expectedCost to keep track of if elements are expensive or not…
holdenk Apr 1, 2024
8f44a4b
Get the filter push down/not-push down working.
holdenk Apr 1, 2024
3408090
A few more "expensive" type operations we would want to use the proje…
holdenk Apr 1, 2024
0ab43c0
Fix push to do alias replace with a non-empty stay up and non-empty p…
holdenk Apr 4, 2024
fa486c9
Fill out how we do out filter push down to also push the parts of the…
holdenk Apr 16, 2024
b2f5d80
Fix up tests, add a bit more coverage, fix the case where there are n…
holdenk Apr 16, 2024
699f74d
Make projection column ordering stable w/ filter pushdown.
holdenk Apr 17, 2024
8f2e942
Simplify alias use tracking, simplify non-alias filter pushdown, simp…
holdenk Apr 19, 2024
83b0086
Remove some printlns & use the heuristic for pushing.
holdenk Apr 20, 2024
af86854
Another attempt at using Advisory filters, but looking at the code we…
holdenk Apr 25, 2024
a597a70
Revert "Another attempt at using Advisory filters, but looking at the…
holdenk Apr 25, 2024
8ec9cec
Avoid introducing new filters for \'cheap\' aliases that we're going …
holdenk Apr 25, 2024
3dcab7a
Handle expensive only filter
holdenk May 8, 2024
338045c
Add an extra test to catch the no cheap refs only expensive refs
holdenk May 8, 2024
f55337f
Make the no double filter eval logic configurable (default to true to…
sfc-gh-hkarau Nov 4, 2025
936c958
Switch from numeric cost to boolean expensive, add a quick heuristic …
sfc-gh-hkarau Nov 7, 2025
312c457
Quick bits of code review feedback from @cloud-fan.
sfc-gh-hkarau Nov 14, 2025
47dfdfa
hasExpensiveChild needs to visible to sub-classes that can be expensi…
sfc-gh-hkarau Nov 14, 2025
10942ff
Simplify the pushdown logic.
sfc-gh-hkarau Nov 14, 2025
ccb798a
Revert "Simplify the pushdown logic."
sfc-gh-hkarau Nov 14, 2025
942b552
add a new case where we don't have any expensive aliases left behind …
sfc-gh-hkarau Nov 14, 2025
bb28e5d
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Nov 27, 2025
6e8dd38
Bump version to 4.2
sfc-gh-hkarau Nov 27, 2025
63c93cf
Use exist instead of map + exists
holdenk Dec 10, 2025
cbce842
Make _expensiveRegex private
holdenk Dec 10, 2025
a57acfa
Start splitting the rule
holdenk Dec 12, 2025
a195c33
Maybe match match
holdenk Dec 12, 2025
c654243
Fix typo in Evail -> Eval
sfc-gh-hkarau Dec 12, 2025
90b61a5
Try and rewrite the logic to be more readable and clearly broken down…
sfc-gh-hkarau Dec 12, 2025
fe9787f
eh mini refactor attempt
sfc-gh-hkarau Dec 12, 2025
e409def
A bit more hacking on cleaning up the refs (still needs more TLC)
sfc-gh-hkarau Dec 17, 2025
faa5d03
Outer no 1,2,or 3 check doesn't make sense, fix up some types
sfc-gh-hkarau Dec 18, 2025
944bea6
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Dec 18, 2025
071f2f3
Fix the final projection to only pull what we need.
sfc-gh-hkarau Dec 18, 2025
195bc90
Use the basechild correctly
sfc-gh-hkarau Dec 18, 2025
5c83ce1
Add test cases for filter pushdown projection splitting (Cases 1, 2, …
sfc-gh-hkarau Dec 18, 2025
f8e0663
Fix claude's auto generated test, we only split the projection parts …
sfc-gh-hkarau Dec 18, 2025
4b57e4d
Add a note about how it's deterministic so the lazy check is _ok_ but…
sfc-gh-hkarau Dec 18, 2025
b1b42b8
Oh it was not actually deterministic, make it so for testing.
sfc-gh-hkarau Dec 18, 2025
be63017
That auto generated test was incorrect, we can split on && so swap to…
sfc-gh-hkarau Dec 18, 2025
1eb9e42
Drop the printlns
sfc-gh-hkarau Dec 18, 2025
8a80e98
Oh wait nvm, since we're working at the f#0 not 'f' level we're fine.
sfc-gh-hkarau Dec 18, 2025
e0275ff
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Dec 22, 2025
e0e233a
Work on wording of the comments.
sfc-gh-hkarau Dec 22, 2025
9c83e99
A few more comments.
sfc-gh-hkarau Dec 22, 2025
18c60c5
Apply simplification in CR for checking is a filter uses only cheap e…
holdenk Dec 23, 2025
23f698a
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Dec 23, 2025
89d32b7
The cond == replaced check should be un-needed.
sfc-gh-hkarau Dec 23, 2025
4e0d9dc
Remove un-needed check for impossible case.
sfc-gh-hkarau Dec 23, 2025
8a88926
Since the used is a subset of aliasMap we can more inexpensively chec…
sfc-gh-hkarau Dec 23, 2025
6ed0ab9
Add a bit more comment explaining where the final project is added ba…
sfc-gh-hkarau Dec 23, 2025
1d522dd
Rename expensiveFiltersDone to splittableFiltersDone.
sfc-gh-hkarau Dec 23, 2025
a102db8
Make sure leaveAsIs code path is exercised
sfc-gh-hkarau Dec 29, 2025
646bc8a
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Dec 29, 2025
073b5af
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Jan 12, 2026
74f76ae
Back out the case 3 optimization (splitting expensive proj + filter c…
sfc-gh-hkarau Jan 12, 2026
afbc2ea
Update comment, we don't do the filter/proj/filter/proj split anymore…
sfc-gh-hkarau Jan 12, 2026
f6bd41c
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Feb 18, 2026
6e2fd3b
Simplify the intermediate projection because we do not do any splits.
sfc-gh-hkarau Feb 18, 2026
fb65283
Clarify comment.
sfc-gh-hkarau Feb 18, 2026
93a1b61
Update the regexp matcher to look for expensive regexes a bit better …
sfc-gh-hkarau Feb 18, 2026
511068e
Let's just go ahead and use the replaced value to avoid double callin…
sfc-gh-hkarau Feb 18, 2026
07a9556
Fix replaced ref when avoidDoubleFilterEval is true.
sfc-gh-hkarau Feb 18, 2026
1a44511
Improve heuristic for expensive/cheap regexes but it is just a heuris…
sfc-gh-hkarau Feb 18, 2026
8df91b0
CR feedback: short circuit on legacy behavior, rather than always add…
sfc-gh-hkarau Feb 20, 2026
eab7c8c
Make a combined SPARK-47672: Case 1, 2, and 3 make sure we leave up a…
sfc-gh-hkarau Feb 20, 2026
7cb8b59
Merge branch 'master' into SPARK-47672-avoid-double-eval-from-filter-…
sfc-gh-hkarau Feb 21, 2026
79fbfc9
Fix case 1 test.
sfc-gh-hkarau Feb 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ trait AliasHelper {
})
}

/**
* Replace all attributes, that reference an alias, with the aliased expression.
* Tracks which aliases were replaced and returns them.
*/
protected def replaceAliasWhileTracking(
expr: Expression,
aliasMap: AttributeMap[Alias]): (Expression, AttributeMap[Alias]) = {
// Use transformUp to prevent infinite recursion when the replacement expression
// redefines the same ExprId,
var replaced = AttributeMap.empty[Alias]
val newExpr = trimAliases(expr.transformUp {
case a: Attribute =>
// If we replace an alias add it to replaced
val newElem = aliasMap.get(a)
newElem match {
case None => a
case Some(b) =>
replaced += (a, b)
b
}
})
(newExpr, replaced)
}

/**
* Replace all attributes, that reference an alias, with the aliased expression,
* but keep the name of the outermost attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ case class CallMethodViaReflection(
with CodegenFallback
with QueryErrorsBase {

// This could be pretty much anything.
override def expensive: Boolean = true

def this(children: Seq[Expression]) =
this(children, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ abstract class Expression extends TreeNode[Expression] {
} else {
""
}

/**
* Mark if an expression is likely to be expensive.
* The current only consumer of this is the pushdown optimizer.
* By default an expression is expensive if any of it's children are expensive.
*/
def expensive: Boolean = hasExpensiveChild

protected lazy val hasExpensiveChild: Boolean = children.exists(_.expensive)
}

object ExpressionPatternBitMask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,4 +1210,6 @@ case class ScalaUDF(

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): ScalaUDF =
copy(children = newChildren)

override def expensive: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -2293,6 +2293,8 @@ case class ArrayJoin(
override def dataType: DataType = array.dataType.asInstanceOf[ArrayType].elementType

override def prettyName: String = "array_join"

override def expensive: Boolean = true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,31 @@ abstract class StringRegexExpression extends BinaryExpression
matches(regex, input1.asInstanceOf[UTF8String].toString)
}
}

override def expensive: Boolean = hasExpensiveChild || _expensiveRegex

// Heuristic, not designed to be perfect. Look for things likely to have
// back tracking.
private val detectExpensiveRegexPattern = Pattern.compile("\\+\\*\\{")

private lazy val _expensiveRegex = {
// A quick heuristic for expensive a pattern is.
left match {
case StringLiteral(str) =>
// If we have a clear start limited back tracking required.
if (str.startsWith("^") || str.startsWith("\\b")) {
false
} else if (detectExpensiveRegexPattern.matcher(str).matches()) {
// Greedy matching can be tricky.
true
} else {
// Default to pushdown for now.
false
}
case _ =>
true // per row regex compilation.
}
}
}

private[catalyst] object StringRegexExpression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2059,23 +2059,100 @@ object PushDownPredicates extends Rule[LogicalPlan] {
* Pushes [[Filter]] operators through many operators iff:
* 1) the operator is deterministic
* 2) the predicate is deterministic and the operator will not change any of rows.
* 3) We don't add double evaluation OR double evaluation would be cheap OR we're configured to.
*
* This heuristic is valid assuming the expression evaluation cost is minimal.
*/
object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// Projections are a special case because the filter _may_ contain references to fields added in
// the projection that we wish to copy. We shouldn't blindly copy everything
// since double evaluation all operations can be expensive (unless the broken behavior is
// enabled by the user). The double filter eval regression was added in Spark 3 fixed in 4.2.
// The _new_ default algorithm works as follows:
// Provided filters are broken up based on their &&s for separate evaluation.
// We track which components of the projection are used in the filters.
//
// 1) The filter does not reference anything in the projection: pushed
// 2) Filter which reference _inexpensive_ items in projection: pushed and reference resolved
// resulting in double evaluation, but only of inexpensive items -- worth it to filter
// records sooner.
// (Case 1 & 2 are treated as "cheap" predicates)
// 3) When an a filter references expensive to compute references we do not push it.
// Note that a given filter may contain parts (sepereated by logical ands) from all cases.
// We handle each part separately according to the logic above.
// Additional restriction:
// SPARK-13473: We can't push the predicate down when the underlying projection output non-
// deterministic field(s). Non-deterministic expressions are essentially stateful. This
// implies that, for a given input row, the output are determined by the expression's initial
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
// This also applies to Aggregate.
case Filter(condition, project @ Project(fields, grandChild))
case f @ Filter(condition, project @ Project(fields, grandChild))
if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
// All of the aliases in the projection
val aliasMap = getAliasMap(project)
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
// Break up the filter into its respective components by &&s.
val splitCondition = splitConjunctivePredicates(condition)
// Find the different aliases each component of the filter uses.
val usedAliasesForCondition = splitCondition.map { cond =>
// If the legacy double evaluation behavior is enabled we just say
// every filter is "free."
if (!SQLConf.get.avoidDoubleFilterEval) {
val replaced = replaceAlias(cond, aliasMap)
(cond, AttributeMap.empty[Alias], replaced)
} else {
// Here we get which aliases were used in a given filter so we can see if the filter
// referenced an expensive alias v.s. just checking if the filter is expensive.
val (replaced, usedAliases) = replaceAliasWhileTracking(cond, aliasMap)
(cond, usedAliases, replaced)
}
}
// Split the filter's components into cheap and expensive while keeping track of
// what each references from the projection.
val (cheapWithUsed, expensiveWithUsed) = usedAliasesForCondition
.partition { case (cond, used, replaced) =>
if (!SQLConf.get.avoidDoubleFilterEval) {
// If we are always pushing through short circuit the check.
true
} else {
// Didn't use anything? We're good
if (used.isEmpty) {
true
} else if (!used.exists(_._2.child.expensive)) {
// If it's cheap we can push it because it might eliminate more data quickly and
// it may also be something which could be evaluated at the storage layer.
// We may wish to improve this heuristic in the future.
true
} else {
false
}
}
}
// Short circuit if we do not have any cheap filters return the original filter as is.
if (cheapWithUsed.isEmpty) {
f
} else {
val cheap: Seq[Expression] = cheapWithUsed.map(_._3)
// Make a base instance which has all of the cheap filters pushed down.
// For all filter which do not reference any expensive aliases then
// just push the filter while resolving the non-expensive aliases.
val combinedCheapFilter = cheap.reduce(And)
val baseChild: LogicalPlan = Filter(combinedCheapFilter, child = grandChild)
// Take our projection and place it on top of the pushed filters.
val topProjection = project.copy(child = baseChild)

// If we pushed all the filters we can return the projection
val result = if (expensiveWithUsed.isEmpty) {
topProjection
} else {
// Finally add any filters which could not be pushed
val remainingConditions = expensiveWithUsed.map(_._1).toSeq
Filter(remainingConditions.reduce(And), topProjection)
}
result
}

// We can push down deterministic predicate through Aggregate, including throwable predicate.
// If we can push down a filter through Aggregate, it means the filter only references the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val AVOID_DOUBLE_FILTER_EVAL =
buildConf("spark.sql.optimizer.avoidDoubleFilterEval")
.doc("When true avoid pushing expensive (UDF, etc.) filters down if it could result in" +
"double evaluation. This was the behaviour prior to 3.X.")
.version("4.2.0")
.booleanConf
.createWithDefault(true)

val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules")
.doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " +
"specified by their rule names and separated by comma. It is not guaranteed that all the " +
Expand Down Expand Up @@ -7996,6 +8004,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def preserveCharVarcharTypeInfo: Boolean = getConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO)

def avoidDoubleFilterEval: Boolean = getConf(AVOID_DOUBLE_FILTER_EVAL)

def readSideCharPadding: Boolean = getConf(SQLConf.READ_SIDE_CHAR_PADDING)

def cliPrintHeader: Boolean = getConf(SQLConf.CLI_PRINT_HEADER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

Expand All @@ -52,11 +53,14 @@ class FilterPushdownSuite extends PlanTest {
val attrB = $"b".int
val attrC = $"c".int
val attrD = $"d".int
val attrE = $"e".string

val testRelation = LocalRelation(attrA, attrB, attrC)

val testRelation1 = LocalRelation(attrD)

val testStringRelation = LocalRelation(attrA, attrB, attrE)

val simpleDisjunctivePredicate =
("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11)
val expectedPredicatePushDownResult = {
Expand Down Expand Up @@ -152,17 +156,137 @@ class FilterPushdownSuite extends PlanTest {
test("can't push without rewrite") {
val originalQuery =
testRelation
.select($"a" + $"b" as "e")
.select($"a" + $"b" as "e", $"a" - $"b" as "f")
.where($"e" === 1)
.analyze

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where($"a" + $"b" === 1)
.select($"a" + $"b" as "e")
.select($"a" + $"b" as "e", $"a" - $"b" as "f")
.analyze

comparePlans(optimized, correctAnswer)
}

test("SPARK-47672: Do double evaluation when configured") {
withSQLConf(SQLConf.AVOID_DOUBLE_FILTER_EVAL.key -> "false") {
val originalQuery = testStringRelation
.select($"a", $"e".rlike("magic") as "f", $"e".rlike("notmagic") as "j", $"b")
.where($"a" > 5 && $"f")
.analyze

val optimized = Optimize.execute(originalQuery)

val correctAnswer = testStringRelation
.where($"a" > 5 && $"e".rlike("magic"))
.select($"a", $"e".rlike("magic") as "f", $"e".rlike("notmagic") as "j", $"b")
.analyze

comparePlans(optimized, correctAnswer)
}
}

test("SPARK-47672: Make sure that we handle the case where everything is expensive") {
val originalQuery = testStringRelation
.select($"e".rlike("magic") as "f")
.where($"f")
.analyze

val optimized = Optimize.execute(originalQuery)

val correctAnswer = testStringRelation
.select($"e".rlike("magic") as "f")
.where($"f")
.analyze

comparePlans(optimized, correctAnswer)
}

test("SPARK-47672: Ensure filter pushdown without alias reference does not move a projection.") {
val originalQuery = testStringRelation
.select($"a", $"e".rlike("magic") as "f", $"b" + $"a")
.where($"a" > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

val correctAnswer = testStringRelation
.where($"a" > 5)
.select($"a", $"e".rlike("magic") as "f", $"b" + $"a")
.analyze

comparePlans(optimized, correctAnswer)
}


test("SPARK-47672: Inexpensive filter pushdown should not move projections") {
val originalQuery = testStringRelation
.select($"a" as "c", $"b" + $"a")
.where($"c" > 5)
.analyze

val optimized = Optimize.execute(originalQuery)

val correctAnswer = testStringRelation
.where($"a" > 5)
.select($"a" as "c", $"b" + $"a")
.analyze

comparePlans(optimized, correctAnswer)
}

test("SPARK-47672: Avoid double evaluation with projections can't push past certain items") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a more complicated case of the test SPARK-47672: Make sure that we handle the case where everything is expensive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much, makes sure we handle the split correctly though.

val originalQuery = testStringRelation
.select($"a", $"e".rlike("magic") as "f")
.where($"a" > 5 || $"f")
.analyze

val optimized = Optimize.execute(originalQuery)

val correctAnswer = testStringRelation
.select($"a", $"e".rlike("magic") as "f")
.where($"a" > 5 || $"f")
.analyze

comparePlans(optimized, correctAnswer)
}

// Case 1: Multiple filters that don't reference any projection aliases - all should be pushed
test("SPARK-47672: Case 1 - multiple filters not referencing projection aliases") {
val originalQuery = testStringRelation
.select($"a" as "c", $"e".rlike("magic") as "f", $"b" as "d")
.where($"c" > 5 && $"d" < 10)
.analyze

val optimized = Optimize.execute(originalQuery)

// Both filters on c and d should be pushed down since they just reference
// simple aliases (c->a, d->b) which are inexpensive
val correctAnswer = testStringRelation
.where($"a" > 5 && $"b" < 10)
.select($"a" as "c", $"e".rlike("magic") as "f", $"b" as "d")
.analyze

comparePlans(optimized, correctAnswer)
}

// Case 2: Multiple filters with inexpensive references - all should be pushed
test("SPARK-47672: Case 2 - multiple filters with inexpensive alias references") {
val originalQuery = testStringRelation
.select($"a" + $"b" as "sum", $"a" - $"b" as "diff", $"e".rlike("magic") as "f")
.where($"sum" > 10 && $"diff" < 5)
.analyze

val optimized = Optimize.execute(originalQuery)

// Both sum and diff are inexpensive (arithmetic), so both filters should be pushed
val correctAnswer = testStringRelation
.where($"a" + $"b" > 10 && $"a" - $"b" < 5)
.select($"a" + $"b" as "sum", $"a" - $"b" as "diff", $"e".rlike("magic") as "f")
.analyze

comparePlans(optimized, correctAnswer)
}

Expand Down
Loading