Skip to content

Commit 19f27ea

Browse files
[FLINK-38220][table-planner] Add Values and TableFunctionScan as sources (#26887)
1 parent 4a98ded commit 19f27ea

File tree

4 files changed

+84
-5
lines changed

4 files changed

+84
-5
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.ArrayList;
5959
import java.util.List;
6060
import java.util.Map;
61+
import java.util.Objects;
6162
import java.util.stream.Collectors;
6263
import java.util.stream.IntStream;
6364

@@ -159,9 +160,9 @@ public StreamExecMultiJoin(
159160
validateInputs(inputProperties, joinTypes, joinConditions, inputUniqueKeys);
160161
this.joinTypes = checkNotNull(joinTypes);
161162
this.joinConditions = checkNotNull(joinConditions);
162-
this.multiJoinCondition = multiJoinCondition;
163-
this.joinAttributeMap = checkNotNull(joinAttributeMap);
164163
this.inputUniqueKeys = checkNotNull(inputUniqueKeys);
164+
this.multiJoinCondition = multiJoinCondition;
165+
this.joinAttributeMap = Objects.requireNonNullElseGet(joinAttributeMap, Map::of);
165166
this.stateMetadataList = stateMetadataList;
166167
}
167168

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434
import org.apache.calcite.rel.core.Join;
3535
import org.apache.calcite.rel.core.JoinInfo;
3636
import org.apache.calcite.rel.core.JoinRelType;
37+
import org.apache.calcite.rel.core.TableFunctionScan;
38+
import org.apache.calcite.rel.core.TableScan;
39+
import org.apache.calcite.rel.core.Values;
3740
import org.apache.calcite.rel.logical.LogicalJoin;
3841
import org.apache.calcite.rel.logical.LogicalSnapshot;
39-
import org.apache.calcite.rel.logical.LogicalTableScan;
4042
import org.apache.calcite.rel.metadata.RelColumnOrigin;
4143
import org.apache.calcite.rel.metadata.RelMetadataQuery;
4244
import org.apache.calcite.rel.rules.CoreRules;
@@ -546,7 +548,10 @@ private Tuple2<RelNode, Integer> getTargetInputAndIdx(int inputRefIndex, List<Re
546548

547549
assert targetInput != null;
548550

549-
if (targetInput instanceof LogicalTableScan) {
551+
if (targetInput instanceof TableScan
552+
|| targetInput instanceof Values
553+
|| targetInput instanceof TableFunctionScan
554+
|| targetInput.getInputs().isEmpty()) {
550555
return new Tuple2<>(targetInput, idxInTargetInput);
551556
} else {
552557
return getTargetInputAndIdx(idxInTargetInput, targetInput.getInputs());

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ public List<TableTestProgram> programs() {
3737
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION,
3838
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY,
3939
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY,
40+
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE,
4041
MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES,
41-
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE);
42+
MultiJoinTestPrograms.MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS,
43+
MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS);
4244
}
4345
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,4 +1031,75 @@ public class MultiJoinTestPrograms {
10311031
+ "LEFT JOIN Payments p ON ue.user_id = p.user_id "
10321032
+ "GROUP BY ue.user_id, us.name")
10331033
.build();
1034+
1035+
public static final TableTestProgram MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS =
1036+
TableTestProgram.of(
1037+
"three-way-left-outer-with-null-keys",
1038+
"left outer join with NULL keys on multiple inputs")
1039+
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
1040+
.setupSql(
1041+
"CREATE VIEW UsersNulls AS SELECT * FROM (VALUES "
1042+
+ "('1','Gus'),"
1043+
+ "(CAST(NULL AS STRING), 'NullUser')"
1044+
+ ") AS T(user_id, name)")
1045+
.setupSql(
1046+
"CREATE VIEW OrdersNulls AS SELECT * FROM (VALUES "
1047+
+ "('1','order1'),"
1048+
+ "(CAST(NULL AS STRING), 'nullOrder')"
1049+
+ ") AS T(user_id, order_id)")
1050+
.setupSql(
1051+
"CREATE VIEW PaymentsNulls AS SELECT * FROM (VALUES "
1052+
+ "('1','payment1'),"
1053+
+ "('1','payment3'),"
1054+
+ "(CAST(NULL AS STRING), 'paymentNull')"
1055+
+ ") AS T(user_id, payment_id)")
1056+
.setupTableSink(
1057+
SinkTestStep.newBuilder("sink")
1058+
.addSchema(
1059+
"user_id STRING",
1060+
"name STRING",
1061+
"order_id STRING",
1062+
"payment_id STRING")
1063+
.consumedValues(
1064+
"+I[1, Gus, order1, payment1]",
1065+
"+I[1, Gus, order1, payment3]",
1066+
"+I[null, NullUser, null, null]")
1067+
.testMaterializedData()
1068+
.build())
1069+
.runSql(
1070+
"INSERT INTO sink "
1071+
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id "
1072+
+ "FROM UsersNulls u "
1073+
+ "LEFT JOIN OrdersNulls o ON u.user_id = o.user_id "
1074+
+ "LEFT JOIN PaymentsNulls p ON u.user_id = p.user_id")
1075+
.build();
1076+
1077+
public static final TableTestProgram MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS =
1078+
TableTestProgram.of(
1079+
"null-safe-join-with-null-keys",
1080+
"join with IS NOT DISTINCT FROM to match NULL keys")
1081+
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
1082+
.setupSql(
1083+
"CREATE VIEW UsersNullSafe AS SELECT * FROM (VALUES "
1084+
+ "('1','Gus'),"
1085+
+ "(CAST(NULL AS STRING), 'NullUser')"
1086+
+ ") AS T(user_id, name)")
1087+
.setupSql(
1088+
"CREATE VIEW OrdersNullSafe AS SELECT * FROM (VALUES "
1089+
+ "('1','order1'),"
1090+
+ "(CAST(NULL AS STRING), 'nullOrder')"
1091+
+ ") AS T(user_id, order_id)")
1092+
.setupTableSink(
1093+
SinkTestStep.newBuilder("sink")
1094+
.addSchema("user_id STRING", "name STRING", "order_id STRING")
1095+
.consumedValues(
1096+
"+I[1, Gus, order1]", "+I[null, NullUser, nullOrder]")
1097+
.testMaterializedData()
1098+
.build())
1099+
.runSql(
1100+
"INSERT INTO sink "
1101+
+ "SELECT u.user_id, u.name, o.order_id "
1102+
+ "FROM UsersNullSafe u "
1103+
+ "INNER JOIN OrdersNullSafe o ON u.user_id IS NOT DISTINCT FROM o.user_id")
1104+
.build();
10341105
}

0 commit comments

Comments
 (0)