Skip to content

Commit a54a803

Browse files
authored
[GLUTEN-11088][VL] fix bloomFilter in GlutenDataFrameStatSuite (#11211)
1 parent e85bb59 commit a54a803

File tree

4 files changed

+33
-11
lines changed

4 files changed

+33
-11
lines changed

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,11 @@ object VeloxRuleApi {
8484
injector.injectPreTransform(c => FallbackMultiCodegens.apply(c.session))
8585
injector.injectPreTransform(c => MergeTwoPhasesHashBaseAggregate(c.session))
8686
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
87-
injector.injectPreTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
87+
injector.injectPreTransform(
88+
c =>
89+
BloomFilterMightContainJointRewriteRule.apply(
90+
c.session,
91+
c.caller.isBloomFilterStatFunction()))
8892
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))
8993
injector.injectPreTransform(_ => EliminateRedundantGetTimestamp)
9094

@@ -162,7 +166,11 @@ object VeloxRuleApi {
162166
injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session))
163167
injector.injectPreTransform(c => MergeTwoPhasesHashBaseAggregate(c.session))
164168
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
165-
injector.injectPreTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
169+
injector.injectPreTransform(
170+
c =>
171+
BloomFilterMightContainJointRewriteRule.apply(
172+
c.session,
173+
c.caller.isBloomFilterStatFunction()))
166174
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))
167175
injector.injectPreTransform(_ => EliminateRedundantGetTimestamp)
168176

backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@ import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.catalyst.rules.Rule
2626
import org.apache.spark.sql.execution.SparkPlan
2727

28-
case class BloomFilterMightContainJointRewriteRule(spark: SparkSession) extends Rule[SparkPlan] {
28+
case class BloomFilterMightContainJointRewriteRule(
29+
spark: SparkSession,
30+
isBloomFilterStatFunction: Boolean)
31+
extends Rule[SparkPlan] {
2932
override def apply(plan: SparkPlan): SparkPlan = {
30-
if (!GlutenConfig.get.enableNativeBloomFilter) {
33+
if (isBloomFilterStatFunction || !GlutenConfig.get.enableNativeBloomFilter) {
3134
return plan
3235
}
3336
val out = plan.transformWithSubqueries {

gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ trait CallerInfo {
3030
def isAqe(): Boolean
3131
def isCache(): Boolean
3232
def isStreaming(): Boolean
33+
def isBloomFilterStatFunction(): Boolean
3334
}
3435

3536
object CallerInfo {
@@ -41,7 +42,8 @@ object CallerInfo {
4142
private class Impl(
4243
override val isAqe: Boolean,
4344
override val isCache: Boolean,
44-
override val isStreaming: Boolean
45+
override val isStreaming: Boolean,
46+
override val isBloomFilterStatFunction: Boolean
4547
) extends CallerInfo
4648

4749
/*
@@ -55,7 +57,8 @@ object CallerInfo {
5557
new Impl(
5658
isAqe = inAqeCall(stack),
5759
isCache = inCacheCall(stack),
58-
isStreaming = inStreamingCall(stack))
60+
isStreaming = inStreamingCall(stack),
61+
isBloomFilterStatFunction = inBloomFilterStatFunctionCall(stack))
5962
}
6063

6164
private def inAqeCall(stack: Seq[StackTraceElement]): Boolean = {
@@ -70,11 +73,21 @@ object CallerInfo {
7073
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
7174
}
7275

76+
private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]): Boolean = {
77+
val res = stack.exists(
78+
_.getClassName.equals("org.apache.spark.sql.DataFrameStatFunctions")
79+
&& stack.exists(_.getMethodName.equals("bloomFilter")))
80+
res
81+
}
82+
7383
/** For testing only. */
74-
def withLocalValue[T](isAqe: Boolean, isCache: Boolean, isStreaming: Boolean = false)(
75-
body: => T): T = {
84+
def withLocalValue[T](
85+
isAqe: Boolean,
86+
isCache: Boolean,
87+
isStreaming: Boolean = false,
88+
isBloomFilterStatFunction: Boolean = false)(body: => T): T = {
7689
val prevValue = localStorage.get()
77-
val newValue = new Impl(isAqe, isCache, isStreaming)
90+
val newValue = new Impl(isAqe, isCache, isStreaming, isBloomFilterStatFunction)
7891
localStorage.set(Some(newValue))
7992
try {
8093
body

gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,6 @@ class VeloxTestSettings extends BackendTestSettings {
787787
// Not really an issue.
788788
.exclude("SPARK-10740: handle nondeterministic expressions correctly for set operations")
789789
enableSuite[GlutenDataFrameStatSuite]
790-
// TODO: fix in Spark-4.0
791-
.exclude("Bloom filter")
792790
enableSuite[GlutenDataFrameSuite]
793791
// Rewrite these tests because it checks Spark's physical operators.
794792
.excludeByPrefix("SPARK-22520", "reuse exchange")

0 commit comments

Comments
 (0)