Skip to content

Commit 2b657fe

Browse files
mihailotim-dbcloud-fan
authored andcommitted
[SPARK-50798][SQL][FOLLOWUP] Normalize expressions with random seed
### What changes were proposed in this pull request? This PR introduces normalization of expressions with random seed. ### Why are the changes needed? Single-pass and fixed-point need to produce same plans after normalization, so we need to normalize non-deterministic expressions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case to `NormalizePlanSuite`. ### Was this patch authored or co-authored using generative AI tooling? No Closes #49623 from mihailotim-db/mihailotim-db/normalization_followup_nondeterministic. Authored-by: Mihailo Timotic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent be77e1e commit 2b657fe

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ object NormalizePlan extends PredicateHelper {
4141
def normalizeExpressions(plan: LogicalPlan): LogicalPlan = {
4242
val withNormalizedRuntimeReplaceable = normalizeRuntimeReplaceable(plan)
4343
withNormalizedRuntimeReplaceable transformAllExpressions {
44-
case c: CommonExpressionDef =>
45-
c.copy(id = new CommonExpressionId(id = 0))
46-
case c: CommonExpressionRef =>
47-
c.copy(id = new CommonExpressionId(id = 0))
44+
case commonExpressionDef: CommonExpressionDef =>
45+
commonExpressionDef.copy(id = new CommonExpressionId(id = 0))
46+
case commonExpressionRef: CommonExpressionRef =>
47+
commonExpressionRef.copy(id = new CommonExpressionId(id = 0))
48+
case expressionWithRandomSeed: ExpressionWithRandomSeed =>
49+
expressionWithRandomSeed.withNewSeed(0)
4850
}
4951
}
5052

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/NormalizePlanSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst.plans
1919

20+
import scala.util.Random
21+
2022
import org.apache.spark.SparkFunSuite
2123
import org.apache.spark.sql.catalyst.SQLConfHelper
24+
import org.apache.spark.sql.catalyst.dsl.expressions._
2225
import org.apache.spark.sql.catalyst.dsl.plans._
2326
import org.apache.spark.sql.catalyst.expressions.{
2427
AssertTrue,
@@ -100,6 +103,18 @@ class NormalizePlanSuite extends SparkFunSuite with SQLConfHelper {
100103
assert(NormalizePlan(baselinePlanDef) == NormalizePlan(testPlanDef))
101104
}
102105

106+
test("Normalize non-deterministic expressions") {
107+
val random = new Random()
108+
val baselineExpression = rand(random.nextLong())
109+
val testExpression = rand(random.nextLong())
110+
111+
val baselinePlan = LocalRelation().select(baselineExpression)
112+
val testPlan = LocalRelation().select(testExpression)
113+
114+
assert(baselinePlan != testPlan)
115+
assert(NormalizePlan(baselinePlan) == NormalizePlan(testPlan))
116+
}
117+
103118
private def setTimezoneForAllExpression(plan: LogicalPlan): LogicalPlan = {
104119
plan.transformAllExpressions {
105120
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>

0 commit comments

Comments
 (0)