Skip to content

Commit f79e63e

Browse files
authored
[FLINK-38771][table] Allow scalar args for multi-table PTFs
This closes #27320.
1 parent 4072808 commit f79e63e

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,14 @@ private static void checkReservedArgs(List<StaticArgument> staticArgs) {
182182
}
183183

184184
private static void checkMultipleTableArgs(List<StaticArgument> staticArgs) {
185-
if (staticArgs.stream().filter(arg -> arg.is(StaticArgumentTrait.TABLE)).count() <= 1) {
185+
final List<StaticArgument> tableArgs =
186+
staticArgs.stream()
187+
.filter(arg -> arg.is(StaticArgumentTrait.TABLE))
188+
.collect(Collectors.toList());
189+
if (tableArgs.size() <= 1) {
186190
return;
187191
}
188-
if (staticArgs.stream().anyMatch(arg -> !arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
192+
if (tableArgs.stream().anyMatch(arg -> !arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE))) {
189193
throw new ValidationException(
190194
"All table arguments must use set semantics if multiple table arguments are declared.");
191195
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public List<TableTestProgram> programs() {
9898
ProcessTableFunctionTestPrograms.PROCESS_LIST_STATE,
9999
ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE,
100100
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT,
101+
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_WITH_SCALAR_ARGS,
101102
ProcessTableFunctionTestPrograms.PROCESS_STATEFUL_MULTI_INPUT_WITH_TIMEOUT,
102103
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_MULTI_INPUT);
103104
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ListStateFunction;
3838
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MapStateFunction;
3939
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
40+
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputWithScalarArgsFunction;
4041
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiStateFunction;
4142
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NamedTimersFunction;
4243
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NonNullMapStateFunction;
@@ -1417,6 +1418,36 @@ public class ProcessTableFunctionTestPrograms {
14171418
"INSERT INTO sink SELECT * FROM f(in1 => TABLE t PARTITION BY name, in2 => TABLE city PARTITION BY name)")
14181419
.build();
14191420

1421+
public static final TableTestProgram PROCESS_MULTI_INPUT_WITH_SCALAR_ARGS =
1422+
TableTestProgram.of(
1423+
"process-multi-input-scalar-args",
1424+
"takes multiple tables and some scalar arguments")
1425+
.setupTemporarySystemFunction("f", MultiInputWithScalarArgsFunction.class)
1426+
.setupSql(MULTI_VALUES)
1427+
.setupSql(CITY_VALUES)
1428+
.setupTableSink(
1429+
SinkTestStep.newBuilder("sink")
1430+
.addSchema(MULTI_BASE_SINK_SCHEMA)
1431+
.consumedValues(
1432+
"+I[Bob, Bob, {null, +I[Bob, London], {A=1, B=2}, 12, +I[true, Hello]}]",
1433+
"+I[Bob, Bob, {+I[Bob, 12], null, {A=1, B=2}, 12, +I[true, Hello]}]",
1434+
"+I[Alice, Alice, {null, +I[Alice, Berlin], {A=1, B=2}, 12, +I[true, Hello]}]",
1435+
"+I[Alice, Alice, {+I[Alice, 42], null, {A=1, B=2}, 12, +I[true, Hello]}]",
1436+
"+I[Charly, Charly, {null, +I[Charly, Paris], {A=1, B=2}, 12, +I[true, Hello]}]",
1437+
"+I[Bob, Bob, {+I[Bob, 99], null, {A=1, B=2}, 12, +I[true, Hello]}]",
1438+
"+I[Bob, Bob, {+I[Bob, 100], null, {A=1, B=2}, 12, +I[true, Hello]}]",
1439+
"+I[Alice, Alice, {+I[Alice, 400], null, {A=1, B=2}, 12, +I[true, Hello]}]")
1440+
.build())
1441+
.runSql(
1442+
"INSERT INTO sink SELECT * FROM f("
1443+
+ "m => MAP['A', '1', 'B', '2'],"
1444+
+ "in1 => TABLE t PARTITION BY name,"
1445+
+ "i => 12,"
1446+
+ "in2 => TABLE city PARTITION BY name,"
1447+
+ "r => ROW(TRUE, 'Hello')"
1448+
+ ")")
1449+
.build();
1450+
14201451
public static final TableTestProgram PROCESS_MULTI_INPUT_RESTORE =
14211452
TableTestProgram.of("process-multi-input-restore", "takes multiple tables")
14221453
.setupTemporarySystemFunction("f", MultiInputFunction.class)

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,20 @@ public void eval(
942942
}
943943
}
944944

945+
/** Testing function. */
946+
public static class MultiInputWithScalarArgsFunction extends AppendProcessTableFunctionBase {
947+
public void eval(
948+
Context ctx,
949+
Map<String, String> m,
950+
@ArgumentHint(SET_SEMANTIC_TABLE) Row in1,
951+
Integer i,
952+
@ArgumentHint({SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY}) Row in2,
953+
@DataTypeHint("ROW<b BOOLEAN, s STRING>") Row r)
954+
throws Exception {
955+
collectObjects(in1, in2, m, i, r);
956+
}
957+
}
958+
945959
/** Testing function. */
946960
public static class TimedJoinFunction extends AppendProcessTableFunctionBase {
947961
public void eval(

0 commit comments

Comments
 (0)