Skip to content

Commit 5440238

Browse files
[FLINK-38916][table-planner] MultiJoin produces incorrect results for OR join conditions when parallelism is greater than 1
This closes #27498.
1 parent db3777d commit 5440238

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,13 @@ private static void extractEqualityConditions(
7171
final SqlKind kind = call.getOperator().getKind();
7272

7373
if (kind != SqlKind.EQUALS) {
74-
for (final RexNode operand : call.getOperands()) {
75-
extractEqualityConditions(
76-
operand, inputOffsets, inputFieldCounts, joinAttributeMap);
74+
// Only conjunctions (AND) can contain equality conditions that are valid for multijoin.
75+
// All other condition types are deferred to the postJoinFilter.
76+
if (kind == SqlKind.AND) {
77+
for (final RexNode operand : call.getOperands()) {
78+
extractEqualityConditions(
79+
operand, inputOffsets, inputFieldCounts, joinAttributeMap);
80+
}
7781
}
7882
return;
7983
}

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2794,15 +2794,20 @@ LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6], location
27942794
<Resource name="optimized rel plan">
27952795
<![CDATA[
27962796
Calc(select=[user_id, name, order_id, payment_id, location])
2797-
+- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey], joinConditions=[=(user_id0, user_id), OR(=(user_id, user_id1), =(name, payment_id)), =(user_id1, user_id2)], select=[user_id,name,order_id,user_id0,payment_id,user_id1,location,user_id2], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id2)])
2798-
:- Exchange(distribution=[hash[user_id]])
2799-
: +- ChangelogNormalize(key=[user_id])
2800-
: +- Exchange(distribution=[hash[user_id]])
2801-
: +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name])
2802-
:- Exchange(distribution=[hash[user_id]])
2803-
: +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
2804-
:- Exchange(distribution=[hash[user_id]])
2805-
: +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
2797+
+- MultiJoin(commonJoinKey=[user_id1], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(user_id1, user_id2)], select=[user_id,name,order_id,payment_id,user_id1,location,user_id2], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id2)])
2798+
:- Exchange(distribution=[hash[user_id1]])
2799+
: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[OR(=(user_id, user_id1), =(name, payment_id))], select=[user_id,name,order_id,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)])
2800+
: :- Exchange(distribution=[single])
2801+
: : +- Calc(select=[user_id, name, order_id])
2802+
: : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id0, user_id)], select=[user_id,name,order_id,user_id0], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
2803+
: : :- Exchange(distribution=[hash[user_id]])
2804+
: : : +- ChangelogNormalize(key=[user_id])
2805+
: : : +- Exchange(distribution=[hash[user_id]])
2806+
: : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name])
2807+
: : +- Exchange(distribution=[hash[user_id]])
2808+
: : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
2809+
: +- Exchange(distribution=[single])
2810+
: +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
28062811
+- Exchange(distribution=[hash[user_id]])
28072812
+- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id])
28082813
]]>

0 commit comments

Comments
 (0)