Skip to content

Commit d0ab038

Browse files
authored
[FLINK-38752][table] Drop EquivalentExprShuttle
1 parent 23f2e39 commit d0ab038

File tree

5 files changed

+13
-151
lines changed

5 files changed

+13
-151
lines changed

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala

Lines changed: 2 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import org.apache.calcite.rex._
3737
import org.apache.calcite.sql.`type`.SqlTypeName
3838
import org.apache.calcite.sql.{SqlAsOperator, SqlKind, SqlOperator}
3939
import org.apache.calcite.sql.fun.{SqlCastFunction, SqlStdOperatorTable}
40-
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
4140
import org.apache.calcite.util._
4241

4342
import java.lang.{Iterable => JIterable}
@@ -47,7 +46,6 @@ import java.util.Optional
4746
import java.util.function.Predicate
4847

4948
import scala.collection.JavaConversions._
50-
import scala.collection.mutable
5149

5250
/** Utility methods concerning [[RexNode]]. */
5351
object FlinkRexUtil {
@@ -201,119 +199,14 @@ object FlinkRexUtil {
201199
RexUtil.composeDisjunction(rexBuilder, nodes)
202200
}
203201

204-
/**
205-
* Merges same expressions and then simplifies the result expression by [[RexSimplify]].
206-
*
207-
* Examples for merging same expressions:
208-
* 1. a = b AND b = a -> a = b 2. a = b OR b = a -> a = b 3. (a > b AND c < 10) AND b < a -> a >
209-
* b AND c < 10 4. (a > b OR c < 10) OR b < a -> a > b OR c < 10 5. a = a, a >= a, a <= a ->
210-
* true 6. a <> a, a > a, a < a -> false
211-
*/
202+
/** Simplifies the result expression by [[RexSimplify]]. */
212203
def simplify(rexBuilder: RexBuilder, expr: RexNode, executor: RexExecutor): RexNode = {
213204
if (expr.isAlwaysTrue || expr.isAlwaysFalse) {
214205
return expr
215206
}
216207

217-
val exprShuttle = new EquivalentExprShuttle(rexBuilder)
218-
val equiExpr = expr.accept(exprShuttle)
219-
val exprMerger = new SameExprMerger(rexBuilder)
220-
val sameExprMerged = exprMerger.mergeSameExpr(equiExpr)
221-
val binaryComparisonExprReduced =
222-
sameExprMerged.accept(new BinaryComparisonExprReducer(rexBuilder))
223-
224208
val rexSimplify = new RexSimplify(rexBuilder, RelOptPredicateList.EMPTY, executor)
225-
rexSimplify.simplifyUnknownAs(binaryComparisonExprReduced, RexUnknownAs.falseIf(true))
226-
}
227-
228-
val BINARY_COMPARISON: util.Set[SqlKind] = util.EnumSet.of(
229-
SqlKind.EQUALS,
230-
SqlKind.NOT_EQUALS,
231-
SqlKind.GREATER_THAN,
232-
SqlKind.GREATER_THAN_OR_EQUAL,
233-
SqlKind.LESS_THAN,
234-
SqlKind.LESS_THAN_OR_EQUAL)
235-
236-
private class BinaryComparisonExprReducer(rexBuilder: RexBuilder) extends RexShuttle {
237-
override def visitCall(call: RexCall): RexNode = {
238-
val kind = call.getOperator.getKind
239-
if (!kind.belongsTo(BINARY_COMPARISON)) {
240-
super.visitCall(call)
241-
} else {
242-
val operand0 = call.getOperands.get(0)
243-
val operand1 = call.getOperands.get(1)
244-
(operand0, operand1) match {
245-
case (op0: RexInputRef, op1: RexInputRef) if op0.getIndex == op1.getIndex =>
246-
kind match {
247-
case SqlKind.EQUALS | SqlKind.LESS_THAN_OR_EQUAL | SqlKind.GREATER_THAN_OR_EQUAL =>
248-
rexBuilder.makeLiteral(true)
249-
case SqlKind.NOT_EQUALS | SqlKind.LESS_THAN | SqlKind.GREATER_THAN =>
250-
rexBuilder.makeLiteral(false)
251-
case _ => super.visitCall(call)
252-
}
253-
case _ => super.visitCall(call)
254-
}
255-
}
256-
}
257-
}
258-
259-
private class SameExprMerger(rexBuilder: RexBuilder) extends RexShuttle {
260-
private val sameExprMap = mutable.HashMap[String, RexNode]()
261-
262-
private def mergeSameExpr(expr: RexNode, equiExpr: RexLiteral): RexNode = {
263-
if (sameExprMap.contains(expr.toString)) {
264-
equiExpr
265-
} else {
266-
sameExprMap.put(expr.toString, expr)
267-
expr
268-
}
269-
}
270-
271-
def mergeSameExpr(expr: RexNode): RexNode = {
272-
// merges same expressions in the operands of AND and OR
273-
// e.g. a = b AND a = b -> a = b AND true
274-
// a = b OR a = b -> a = b OR false
275-
val newExpr1 = expr.accept(this)
276-
277-
// merges same expressions in conjunctions
278-
// e.g. (a > b AND c < 10) AND a > b -> a > b AND c < 10 AND true
279-
sameExprMap.clear()
280-
val newConjunctions = RelOptUtil.conjunctions(newExpr1).map {
281-
ex => mergeSameExpr(ex, rexBuilder.makeLiteral(true))
282-
}
283-
val newExpr2 = newConjunctions.size match {
284-
case 0 => newExpr1 // true AND true
285-
case 1 => newConjunctions.head
286-
case _ => rexBuilder.makeCall(AND, newConjunctions: _*)
287-
}
288-
289-
// merges same expressions in disjunctions
290-
// e.g. (a > b OR c < 10) OR a > b -> a > b OR c < 10 OR false
291-
sameExprMap.clear()
292-
val newDisjunctions = RelOptUtil.disjunctions(newExpr2).map {
293-
ex => mergeSameExpr(ex, rexBuilder.makeLiteral(false))
294-
}
295-
val newExpr3 = newDisjunctions.size match {
296-
case 0 => newExpr2 // false OR false
297-
case 1 => newDisjunctions.head
298-
case _ => rexBuilder.makeCall(OR, newDisjunctions: _*)
299-
}
300-
newExpr3
301-
}
302-
303-
override def visitCall(call: RexCall): RexNode = {
304-
val newCall = call.getOperator match {
305-
case AND | OR =>
306-
sameExprMap.clear()
307-
val newOperands = call.getOperands.map {
308-
op =>
309-
val value = if (call.getOperator == AND) true else false
310-
mergeSameExpr(op, rexBuilder.makeLiteral(value))
311-
}
312-
call.clone(call.getType, newOperands)
313-
case _ => call
314-
}
315-
super.visitCall(newCall)
316-
}
209+
rexSimplify.simplifyUnknownAs(expr, RexUnknownAs.falseIf(true))
317210
}
318211

319212
/**
@@ -412,37 +305,6 @@ object FlinkRexUtil {
412305
}
413306
})
414307

415-
private class EquivalentExprShuttle(rexBuilder: RexBuilder) extends RexShuttle {
416-
private val equiExprSet = mutable.HashSet[RexNode]()
417-
418-
override def visitCall(call: RexCall): RexNode = {
419-
call.getOperator match {
420-
case EQUALS | NOT_EQUALS | GREATER_THAN | LESS_THAN | GREATER_THAN_OR_EQUAL |
421-
LESS_THAN_OR_EQUAL =>
422-
if (equiExprSet.contains(call)) {
423-
swapOperands(call)
424-
} else {
425-
equiExprSet.add(call)
426-
call
427-
}
428-
case _ => super.visitCall(call)
429-
}
430-
}
431-
432-
private def swapOperands(call: RexCall): RexCall = {
433-
val newOp = call.getOperator match {
434-
case EQUALS | NOT_EQUALS => call.getOperator
435-
case GREATER_THAN => LESS_THAN
436-
case GREATER_THAN_OR_EQUAL => LESS_THAN_OR_EQUAL
437-
case LESS_THAN => GREATER_THAN
438-
case LESS_THAN_OR_EQUAL => GREATER_THAN_OR_EQUAL
439-
case _ => throw new IllegalArgumentException(s"Unsupported operator: ${call.getOperator}")
440-
}
441-
val operands = call.getOperands
442-
rexBuilder.makeCall(newOp, operands.last, operands.head).asInstanceOf[RexCall]
443-
}
444-
}
445-
446308
def getExpressionString(expr: RexNode, inFields: Seq[String]): String = {
447309
getExpressionString(expr, inFields, ExpressionDetail.Digest)
448310
}

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ LogicalProject(a=[$0], d=[$3])
9898
<Resource name="optimized rel plan">
9999
<![CDATA[
100100
LogicalProject(a=[$0], d=[$3])
101-
+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=(1, $0), =(1, $1)), AND(=(2, $0), =(2, $1))), OR(AND(=(2, $3), =(2, $4)), AND(=(1, $3), =(1, $4))))], joinType=[inner])
101+
+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=($0, 1), =($1, 1)), AND(=($0, 2), =($1, 2))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner])
102102
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
103103
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
104104
]]>
@@ -130,7 +130,7 @@ LogicalProject(a=[$0], d=[$6])
130130
<Resource name="optimized rel plan">
131131
<![CDATA[
132132
LogicalProject(a=[$0], d=[$6])
133-
+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=(1, $0), =(1, $1)), =(2, $0)), OR(AND(=(2, $3), =(2, $7)), AND(=(2, $4), =(1, $6), =(1, $7))))], joinType=[inner])
133+
+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=($0, 1), =($1, 1)), =($0, 2)), OR(AND(=($3, 2), =($7, 2)), AND(=($4, 2), =($6, 1), =($7, 1))))], joinType=[inner])
134134
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
135135
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7])
136136
+- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
@@ -155,7 +155,7 @@ LogicalProject(a=[$0], d=[$3])
155155
<Resource name="optimized rel plan">
156156
<![CDATA[
157157
LogicalProject(a=[$0], d=[$3])
158-
+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))), OR(AND(=(2, $3), =(2, $4)), AND(=(1, $3), =(1, $4))))], joinType=[inner])
158+
+- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner])
159159
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
160160
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
161161
]]>

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ LogicalProject(a1=[$0], b1=[$3])
3333
Calc(select=[a1, b1])
3434
+- Join(joinType=[InnerJoin], where=[(((a1 = 1) AND (b1 = 1)) OR ((a2 = 2) AND (b2 = 2)))], select=[a1, a2, b1, b2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
3535
:- Exchange(distribution=[single])
36-
: +- Calc(select=[a1, a2], where=[((1 = a1) OR (2 = a2))])
36+
: +- Calc(select=[a1, a2], where=[((a1 = 1) OR (a2 = 2))])
3737
: +- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3])
3838
+- Exchange(distribution=[single])
39-
+- Calc(select=[b1, b2], where=[((1 = b1) OR (2 = b2))])
39+
+- Calc(select=[b1, b2], where=[((b1 = 1) OR (b2 = 2))])
4040
+- TableSourceScan(table=[[default_catalog, default_database, B]], fields=[b1, b2, b3])
4141
]]>
4242
</Resource>
@@ -60,7 +60,7 @@ Join(joinType=[InnerJoin], where=[(((a = 1) AND (x = 1)) OR ((a = 2) AND y IS NU
6060
: +- Calc(select=[a, b, c], where=[SEARCH(a, Sarg[1, 2])])
6161
: +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
6262
+- Exchange(distribution=[single])
63-
+- Calc(select=[x, y, z], where=[(y IS NULL OR (1 = x))])
63+
+- Calc(select=[x, y, z], where=[(y IS NULL OR (x = 1))])
6464
+- TableSourceScan(table=[[default_catalog, default_database, s]], fields=[x, y, z])
6565
]]>
6666
</Resource>
@@ -82,7 +82,7 @@ LogicalProject(a1=[$0], b1=[$3])
8282
Calc(select=[a1, b1])
8383
+- Join(joinType=[InnerJoin], where=[(((a1 = 1) AND (b1 = 1)) OR (a2 = 2))], select=[a1, a2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
8484
:- Exchange(distribution=[single])
85-
: +- Calc(select=[a1, a2], where=[((1 = a1) OR (2 = a2))])
85+
: +- Calc(select=[a1, a2], where=[((a1 = 1) OR (a2 = 2))])
8686
: +- TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a1, a2, a3])
8787
+- Exchange(distribution=[single])
8888
+- Calc(select=[b1])

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/JoinTest.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ LogicalProject(a=[$0], b=[$1], d=[$3], e=[$4])
3030
<![CDATA[
3131
Join(joinType=[InnerJoin], where=[(((a = 1) AND (d = 1)) OR ((b = 2) AND (d = 5)))], select=[a, b, d, e], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
3232
:- Exchange(distribution=[single])
33-
: +- Calc(select=[a, b], where=[((1 = a) OR (2 = b))])
33+
: +- Calc(select=[a, b], where=[((a = 1) OR (b = 2))])
3434
: +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c])
3535
+- Exchange(distribution=[single])
3636
+- Calc(select=[d, e], where=[SEARCH(d, Sarg[1, 5])])
@@ -56,7 +56,7 @@ Calc(select=[a, b, d, e])
5656
: +- Calc(select=[a, b], where=[SEARCH(a, Sarg[0, 1])])
5757
: +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c])
5858
+- Exchange(distribution=[single])
59-
+- Calc(select=[d, e, f], where=[(f IS NULL OR (3 = d))])
59+
+- Calc(select=[d, e, f], where=[(f IS NULL OR (d = 3))])
6060
+- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[d, e, f])
6161
]]>
6262
</Resource>

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,8 @@ class FlinkRexUtilTest {
353353
rexBuilder.makeCall(GREATER_THAN, a, rexBuilder.makeLiteral("l"))),
354354
rexBuilder.makeCall(
355355
OR,
356-
rexBuilder.makeCall(EQUALS, a, b),
357-
rexBuilder.makeCall(LESS_THAN, c, d),
356+
rexBuilder.makeCall(EQUALS, b, a),
357+
rexBuilder.makeCall(GREATER_THAN, d, c),
358358
rexBuilder.makeCall(LESS_THAN, b, rexBuilder.makeLiteral("k")))
359359
)
360360
.toString,

0 commit comments

Comments
 (0)