[SPARK-47672][SQL] Avoid double eval from filter pushDown w/ projection pushdown#46143
Conversation
a97d56f to
f1d1ddd
Compare
|
CC @cloud-fan do you have thoughts / cycles? |
|
+CC @shardulm94 |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
ac85ead to
a5d8400
Compare
|
Hi @cloud-fan looks like the "with" suggestion ended up being more complicated than originally suggested( see #46499 (comment) ). In the interest of progress and avoiding double evaluation of a lot of really expensive things we don't need I intend to update this PR and merge it. We can still circle back to the with approach eventually. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
Just wondering if we have a consesus on the best way to go about this @zml1206 / @cloud-fan ? I'm thinking based on the > 1 year since with change it might be more complicated than we originally thought. I can re-explore as well is @zml1206 is busy but we could also go for the simpler solution in the meantime since double UDF evaluation is bad. |
|
Hi @holdenk , we tried very hard to solve this issue efficiently but failed. The idea was to let filter carry a project list and push them down together, but when we push through Project/Aggregate which also contains a project list, we may still hit expression duplication and need to make a decision based on cost. Sorry I should have moved back to this PR earlier. I think we can simplify it a bit as we will likely never have a practical cost model for Spark expressions. Let's just avoid UDF expression (extends marker expression |
|
Sounds like a plan, I'll work on simplifying this code. |
…n an 'expensive' projected operation (rlike) Co-authored-by: Holden Karau <holden@pigscanfly.ca>
… we can't do pawrtial on the || Co-authored-by: Holden Karau <holden@pigscanfly.ca>
… (idk seems better than the hack of determining if something is eligible for pushdown, but we could also use that maybe? idk) & start updating optimizer filter pushdown past a project for _partial_ pushdowns. Co-authored-by: Holden Karau <holden@pigscanfly.ca>
Co-authored-by: Holden Karau <holden@pigscanfly.ca>
…ct to 'save' Co-authored-by: Holden Karau <holden@pigscanfly.ca>
…ushDown Co-authored-by: Holden Karau <holden@pigscanfly.ca>
… projection that we are using Co-authored-by: Holden Karau <holden@pigscanfly.ca>
…o aliases Co-authored-by: Holden Karau <holden@pigscanfly.ca>
|
To discuss #46143 (comment) further:
That's why my initial suggestion was to not do this optimization at all. We just keep the |
So just always leave up complex filters and don't don't attempt to split them if needed? I think that's sub-optimal for fairly self evident reasons but if you still find the current implementation too complex I could move it into a follow-on PR so there's less to review here and we just fix the perf regression introduced in 3.0 |
|
A followup SGTM, at least we can fix the perf regression first. |
|
Awesome, I'll rework this then :) |
…pushdown-split-projection Co-authored-by: Holden Karau <holden@pigscanfly.ca>
…ombos) Co-authored-by: Holden Karau <holden@pigscanfly.ca>
… so need to ref it. Co-authored-by: Holden Karau <holden@pigscanfly.ca>
814405a to
afbc2ea
Compare
|
@cloud-fan updated to just fix the regression. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
Show resolved
Hide resolved
| val resolvedCheapFilter = replaceAlias(combinedCheapFilter, aliasMap) | ||
| val baseChild: LogicalPlan = Filter(resolvedCheapFilter, child = grandChild) | ||
| // Insert a last projection to match the desired column ordering and | ||
| // evaluate any stragglers and select the already computed columns. |
There was a problem hiding this comment.
hmm why is this needed if we only push a Filter through a Project?
| (cond, AttributeMap.empty[Alias]) | ||
| } else { | ||
| val (replaced, usedAliases) = replaceAliasWhileTracking(cond, aliasMap) | ||
| (cond, usedAliases) |
There was a problem hiding this comment.
we return cond instead of replaced, which indicates that the new function replaceAliasWhileTracking is not necessary for this new simply version.
I think we can make it very simple:
val (stayUp, pushDown) = splitConjunctivePredicates(condition).partition { predicate =>
replaceAlias(predicate, aliasMap).expensive
}
(stayUp, pushDown) match {
case (_, Nil) =>
// Nothing to push down, keep the same filter.
filter
case (Nil, _) =>
// Push all
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
case _ =>
Filter(
stayUp.reduce(And),
project.copy(
child = Filter(replaceAlias(pushDown.reduce(And), aliasMap), grandChild)
)
)
}
There was a problem hiding this comment.
So the whiletracking API is giving us back the usedAliases which we are returning and replaceAlias does not. If we just look at if the substituted expression is expensive rather than the alias we'll incorrectly leave up a filter which does an expensive regex over a projected inexpesnive column. I'll write this in the comments though.
…pushdown-split-projection
…but still remember this is a best effort heuristic
…g alias replace and discussions around why not to use replaceAliasWhileTracking
| val usedAliasesForCondition = splitCondition.map { cond => | ||
| // If the legacy double evaluation behavior is enabled we just say | ||
| // every filter is "free." | ||
| if (!SQLConf.get.avoidDoubleFilterEval) { |
There was a problem hiding this comment.
If avoidDoubleFilterEval is false we can just return project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)), to make sure the legacy code path is exactly the same as before.
| newElem match { | ||
| case None => a | ||
| case Some(b) => | ||
| replaced += (a, b) |
There was a problem hiding this comment.
we should skip this operation if the AttributeMap already contains a. It does not change the result but is more efficient.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
| // For all filter which do not reference any expensive aliases then | ||
| // just push the filter while resolving the non-expensive aliases. | ||
| val combinedCheapFilter = cheap.reduce(And) | ||
| val baseChild: LogicalPlan = Filter(combinedCheapFilter, child = grandChild) |
There was a problem hiding this comment.
| val baseChild: LogicalPlan = Filter(combinedCheapFilter, child = grandChild) | |
| val baseChild = Filter(combinedCheapFilter, child = grandChild) |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
Outdated
Show resolved
Hide resolved
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| test("SPARK-47672: Avoid double evaluation with projections can't push past certain items") { |
There was a problem hiding this comment.
Is this just a more complicated case of the test SPARK-47672: Make sure that we handle the case where everything is expensive?
There was a problem hiding this comment.
Pretty much, makes sure we handle the split correctly though.
| test("SPARK-47672: Case 1 - multiple filters not referencing projection aliases") { | ||
| val originalQuery = testStringRelation | ||
| .select($"a" as "c", $"e".rlike("magic") as "f", $"b" as "d") | ||
| .where($"c" > 5 && $"d" < 10) |
There was a problem hiding this comment.
c and d do reference projection aliases, shall we use .select($"a", $"b", ...)?
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| // Case 2: Multiple filters with inexpensive references - all should be pushed |
There was a problem hiding this comment.
do we have a test case for the mixed cases?
…ing into the tracking map only add if not present (same result, possible faster, since value is always the same), remove val result = ... (was useful for debugging).
…nd push down correctly. test
What changes were proposed in this pull request?
Changes the filter pushDown optimizer to not push down past projections of the same element if we reasonable expect that computing that element is likely to be expensive.
This is a slightly complex alternative to #45802 which also moves parts of projections down so that the filters can move further down.
An expression can indicate if it is too expensive to be worth the potential savings of being double evaluated as a result of pushdown (by default we do this for all UDFs).
Future Work / What else remains to do?
Right now if a cond is expensive and it references something in the projection we don't push-down. We could probably do better and gate this on if the thing we are reference is expensive rather than the condition it's self. We could do this as a follow up item or as part of this PR.
Why are the changes needed?
Currently Spark may double compute expensive operations (like json parsing, UDF eval, etc.) as a result of filter pushdown past projections.
Does this PR introduce any user-facing change?
SQL optimizer change may impact some user queries, results should be the same and hopefully a little faster.
How was this patch tested?
New tests were added to the FilterPushDownSuite, and the initial problem of double evaluation was confirmed with a github gist
Was this patch authored or co-authored using generative AI tooling?
Used claude to generate more test coverage.