Skip to content

Commit 2e44db6

Browse files
gustavodemoraisdawidwys
authored andcommitted
[FLINK-38211][table] Move testMultiSinkOnMultiJoinedView to MJ tests
1 parent 46d8250 commit 2e44db6

File tree

4 files changed

+161
-151
lines changed

4 files changed

+161
-151
lines changed

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import scala.Enumeration;
3737

38-
import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED;
3938
import static org.junit.jupiter.api.Assertions.assertEquals;
4039
import static org.junit.jupiter.api.Assertions.assertThrows;
4140
import static scala.runtime.BoxedUnit.UNIT;
@@ -346,87 +345,6 @@ void testMultiSinkOnJoinedView() {
346345
false);
347346
}
348347

349-
@Test
350-
void testMultiSinkOnMultiJoinedView() {
351-
tEnv.getConfig().set(TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true);
352-
tEnv.executeSql(
353-
"create temporary table src1 (\n"
354-
+ " a int,\n"
355-
+ " b bigint,\n"
356-
+ " c string,\n"
357-
+ " d int,\n"
358-
+ " primary key(a, c) not enforced\n"
359-
+ ") with (\n"
360-
+ " 'connector' = 'values',\n"
361-
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
362-
+ ")");
363-
tEnv.executeSql(
364-
"create temporary table src2 (\n"
365-
+ " a int,\n"
366-
+ " b bigint,\n"
367-
+ " c string,\n"
368-
+ " d int,\n"
369-
+ " primary key(a, c) not enforced\n"
370-
+ ") with (\n"
371-
+ " 'connector' = 'values',\n"
372-
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
373-
+ ")");
374-
tEnv.executeSql(
375-
"create temporary table sink1 (\n"
376-
+ " a int,\n"
377-
+ " b string,\n"
378-
+ " c bigint,\n"
379-
+ " d bigint\n"
380-
+ ") with (\n"
381-
+ " 'connector' = 'values',\n"
382-
+ " 'sink-insert-only' = 'false'\n"
383-
+ ")");
384-
tEnv.executeSql(
385-
"create temporary table sink2 (\n"
386-
+ " a int,\n"
387-
+ " b string,\n"
388-
+ " c bigint,\n"
389-
+ " d string\n"
390-
+ ") with (\n"
391-
+ " 'connector' = 'values',\n"
392-
+ " 'sink-insert-only' = 'false'\n"
393-
+ ")");
394-
tEnv.executeSql(
395-
"create temporary view v1 as\n"
396-
+ "select\n"
397-
+ " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n"
398-
+ "from (\n"
399-
+ " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n"
400-
+ " from src1\n"
401-
+ " ) t1\n"
402-
+ "join (\n"
403-
+ " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n"
404-
+ " from src2\n"
405-
+ ") t2\n"
406-
+ " on t1.a = t2.d");
407-
408-
StatementSet stmtSet = tEnv.createStatementSet();
409-
stmtSet.addInsertSql(
410-
"insert into sink1\n"
411-
+ " select a, `day`, sum(b), count(distinct c)\n"
412-
+ " from v1\n"
413-
+ " group by a, `day`");
414-
stmtSet.addInsertSql(
415-
"insert into sink2\n"
416-
+ " select a, `day`, b, c\n"
417-
+ " from v1\n"
418-
+ " where b > 100");
419-
420-
util.doVerifyPlan(
421-
stmtSet,
422-
new ExplainDetail[] {ExplainDetail.PLAN_ADVICE},
423-
false,
424-
new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()},
425-
() -> UNIT,
426-
false,
427-
false);
428-
}
429-
430348
@Test
431349
void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() {
432350
// from NonDeterministicDagTest#testCdcJoinDimWithPkOutputNoPkSinkWithoutPk

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,25 @@
1818

1919
package org.apache.flink.table.planner.plan.stream.sql;
2020

21+
import org.apache.flink.table.api.ExplainDetail;
22+
import org.apache.flink.table.api.StatementSet;
2123
import org.apache.flink.table.api.TableConfig;
2224
import org.apache.flink.table.api.config.OptimizerConfigOptions;
25+
import org.apache.flink.table.planner.utils.PlanKind;
26+
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
2327
import org.apache.flink.table.planner.utils.TableTestBase;
24-
import org.apache.flink.table.planner.utils.TableTestUtil;
2528

2629
import org.junit.jupiter.api.BeforeEach;
2730
import org.junit.jupiter.api.Test;
2831

32+
import scala.Enumeration;
33+
34+
import static scala.runtime.BoxedUnit.UNIT;
35+
2936
/** Tests for multi-join plans. */
3037
public class MultiJoinTest extends TableTestBase {
3138

32-
private TableTestUtil util;
39+
private StreamTableTestUtil util;
3340

3441
@BeforeEach
3542
void setup() {
@@ -553,4 +560,89 @@ void testPreservesUpsertKeyFourWayComplex() {
553560
+ "JOIN AddressPK a"
554561
+ " ON u.user_id = a.user_id AND a.location IS NOT NULL");
555562
}
563+
564+
@Test
565+
void testMultiSinkOnMultiJoinedView() {
566+
util.tableEnv()
567+
.executeSql(
568+
"create temporary table src1 (\n"
569+
+ " a int,\n"
570+
+ " b bigint,\n"
571+
+ " c string,\n"
572+
+ " d int,\n"
573+
+ " primary key(a, c) not enforced\n"
574+
+ ") with (\n"
575+
+ " 'connector' = 'values',\n"
576+
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
577+
+ ")");
578+
util.tableEnv()
579+
.executeSql(
580+
"create temporary table src2 (\n"
581+
+ " a int,\n"
582+
+ " b bigint,\n"
583+
+ " c string,\n"
584+
+ " d int,\n"
585+
+ " primary key(a, c) not enforced\n"
586+
+ ") with (\n"
587+
+ " 'connector' = 'values',\n"
588+
+ " 'changelog-mode' = 'I,UA,UB,D'\n"
589+
+ ")");
590+
util.tableEnv()
591+
.executeSql(
592+
"create temporary table sink1 (\n"
593+
+ " a int,\n"
594+
+ " b string,\n"
595+
+ " c bigint,\n"
596+
+ " d bigint\n"
597+
+ ") with (\n"
598+
+ " 'connector' = 'values',\n"
599+
+ " 'sink-insert-only' = 'false'\n"
600+
+ ")");
601+
util.tableEnv()
602+
.executeSql(
603+
"create temporary table sink2 (\n"
604+
+ " a int,\n"
605+
+ " b string,\n"
606+
+ " c bigint,\n"
607+
+ " d string\n"
608+
+ ") with (\n"
609+
+ " 'connector' = 'values',\n"
610+
+ " 'sink-insert-only' = 'false'\n"
611+
+ ")");
612+
util.tableEnv()
613+
.executeSql(
614+
"create temporary view v1 as\n"
615+
+ "select\n"
616+
+ " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n"
617+
+ "from (\n"
618+
+ " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n"
619+
+ " from src1\n"
620+
+ " ) t1\n"
621+
+ "join (\n"
622+
+ " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n"
623+
+ " from src2\n"
624+
+ ") t2\n"
625+
+ " on t1.a = t2.d");
626+
627+
StatementSet stmtSet = util.tableEnv().createStatementSet();
628+
stmtSet.addInsertSql(
629+
"insert into sink1\n"
630+
+ " select a, `day`, sum(b), count(distinct c)\n"
631+
+ " from v1\n"
632+
+ " group by a, `day`");
633+
stmtSet.addInsertSql(
634+
"insert into sink2\n"
635+
+ " select a, `day`, b, c\n"
636+
+ " from v1\n"
637+
+ " where b > 100");
638+
639+
util.doVerifyPlan(
640+
stmtSet,
641+
new ExplainDetail[] {ExplainDetail.PLAN_ADVICE},
642+
false,
643+
new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()},
644+
() -> UNIT,
645+
false,
646+
false);
647+
}
556648
}

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -115,41 +115,6 @@ source node:
115115
TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta_rename, project=[a, b, c], metadata=[metadata_3]]], fields=[a, b, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])
116116
117117
118-
]]>
119-
</Resource>
120-
</TestCase>
121-
<TestCase name="testMultiSinkOnMultiJoinedView">
122-
<Resource name="optimized rel plan with advice">
123-
<![CDATA[
124-
Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXPR$3])
125-
+- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3])
126-
+- Exchange(distribution=[hash[a, day]])
127-
+- Calc(select=[a, day, b0 AS b, c])
128-
+- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
129-
:- Exchange(distribution=[hash[a]])
130-
: +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
131-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a])
132-
+- Exchange(distribution=[hash[d]])
133-
+- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
134-
135-
Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c])
136-
+- Calc(select=[a, day, b0 AS b, c])
137-
+- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
138-
:- Exchange(distribution=[hash[a]])
139-
: +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
140-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a])
141-
+- Exchange(distribution=[hash[d]])
142-
+- Calc(select=[b, c, d], where=[>(b, 100)])
143-
+- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
144-
145-
advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value).
146-
advice[2]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.
147-
148-
related rel plan:
149-
Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D])
150-
+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D])
151-
152-
153118
]]>
154119
</Resource>
155120
</TestCase>

0 commit comments

Comments
 (0)