Skip to content

Commit 00241a6

Browse files
authored
[FLINK-38201][table-planner] SinkUpsertMaterializer should not be inserted for retract sinks
This closes #26879.
1 parent 19f27ea commit 00241a6

File tree

7 files changed

+201
-34
lines changed

7 files changed

+201
-34
lines changed

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,33 +1052,29 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
10521052
val inputChangelogMode =
10531053
ChangelogPlanUtils.getChangelogMode(sink.getInput.asInstanceOf[StreamPhysicalRel]).get
10541054
val primaryKeys = sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
1055-
val upsertMaterialize =
1056-
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
1057-
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
1058-
case UpsertMaterialize.NONE => false
1059-
case UpsertMaterialize.AUTO =>
1060-
val sinkAcceptInsertOnly = sink.tableSink
1061-
.getChangelogMode(inputChangelogMode)
1062-
.containsOnly(RowKind.INSERT)
1063-
val inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT)
1064-
1065-
if (!sinkAcceptInsertOnly && !inputInsertOnly && primaryKeys.nonEmpty) {
1066-
val pks = ImmutableBitSet.of(primaryKeys: _*)
1067-
val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
1068-
val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
1069-
// if input has update and primary key != upsert key (upsert key can be null) we should
1070-
// enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink
1071-
// pk(s) contains input changeLogUpsertKeys
1072-
if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains)) {
1073-
true
1074-
} else {
1075-
false
1076-
}
1077-
} else {
1078-
false
1079-
}
1080-
}
1081-
upsertMaterialize
1055+
val sinkChangelogMode = sink.tableSink.getChangelogMode(inputChangelogMode)
1056+
val inputIsAppend = inputChangelogMode.containsOnly(RowKind.INSERT)
1057+
val sinkIsAppend = sinkChangelogMode.containsOnly(RowKind.INSERT)
1058+
val sinkIsRetract = sinkChangelogMode.contains(RowKind.UPDATE_BEFORE)
1059+
1060+
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
1061+
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
1062+
case UpsertMaterialize.NONE => false
1063+
case UpsertMaterialize.AUTO =>
1064+
if (inputIsAppend || sinkIsAppend || sinkIsRetract) {
1065+
return false
1066+
}
1067+
if (primaryKeys.isEmpty) {
1068+
return false
1069+
}
1070+
val pks = ImmutableBitSet.of(primaryKeys: _*)
1071+
val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
1072+
val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
1073+
// if input has updates and primary key != upsert key (upsert key can be null) we should
1074+
// enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink
1075+
// pk(s) contains input changeLogUpsertKeys
1076+
changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains)
1077+
}
10821078
}
10831079
}
10841080

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.stream;
20+
21+
import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
22+
import org.apache.flink.table.test.program.TableTestProgram;
23+
24+
import java.util.List;
25+
26+
/** Semantic tests for {@link StreamExecSink}. */
27+
public class SinkSemanticTests extends SemanticTestBase {
28+
29+
@Override
30+
public List<TableTestProgram> programs() {
31+
return List.of(
32+
SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
33+
SinkTestPrograms.INSERT_RETRACT_WITH_PK);
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.stream;
20+
21+
import org.apache.flink.table.test.program.SinkTestStep;
22+
import org.apache.flink.table.test.program.SourceTestStep;
23+
import org.apache.flink.table.test.program.TableTestProgram;
24+
import org.apache.flink.types.Row;
25+
import org.apache.flink.types.RowKind;
26+
27+
/** Tests for verifying sink semantics. */
28+
public class SinkTestPrograms {
29+
30+
public static final TableTestProgram INSERT_RETRACT_WITHOUT_PK =
31+
TableTestProgram.of(
32+
"insert-retract-without-pk",
33+
"The sink accepts retract input. Retract is directly passed through.")
34+
.setupTableSource(
35+
SourceTestStep.newBuilder("source_t")
36+
.addSchema("name STRING", "score INT")
37+
.addOption("changelog-mode", "I")
38+
.producedValues(
39+
Row.ofKind(RowKind.INSERT, "Alice", 3),
40+
Row.ofKind(RowKind.INSERT, "Bob", 5),
41+
Row.ofKind(RowKind.INSERT, "Bob", 6),
42+
Row.ofKind(RowKind.INSERT, "Charly", 33))
43+
.build())
44+
.setupTableSink(
45+
SinkTestStep.newBuilder("sink_t")
46+
.addSchema("name STRING", "score BIGINT")
47+
.addOption("sink-changelog-mode-enforced", "I,UB,UA,D")
48+
.consumedValues(
49+
"+I[Alice, 3]",
50+
"+I[Bob, 5]",
51+
"-U[Bob, 5]",
52+
"+U[Bob, 11]",
53+
"+I[Charly, 33]")
54+
.build())
55+
.runSql(
56+
"INSERT INTO sink_t SELECT name, SUM(score) FROM source_t GROUP BY name")
57+
.build();
58+
59+
public static final TableTestProgram INSERT_RETRACT_WITH_PK =
60+
TableTestProgram.of(
61+
"insert-retract-with-pk",
62+
"The sink accepts retract input. Although upsert keys (name) and primary keys (UPPER(name))"
63+
+ "don't match, the retract changelog is passed through.")
64+
.setupTableSource(
65+
SourceTestStep.newBuilder("source_t")
66+
.addSchema("name STRING", "score INT")
67+
.addOption("changelog-mode", "I")
68+
.producedValues(
69+
Row.ofKind(RowKind.INSERT, "Alice", 3),
70+
Row.ofKind(RowKind.INSERT, "Bob", 5),
71+
Row.ofKind(RowKind.INSERT, "Bob", 6),
72+
Row.ofKind(RowKind.INSERT, "Charly", 33))
73+
.build())
74+
.setupTableSink(
75+
SinkTestStep.newBuilder("sink_t")
76+
.addSchema(
77+
"name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT")
78+
.addOption("sink-changelog-mode-enforced", "I,UB,UA,D")
79+
.consumedValues(
80+
"+I[ALICE, 3]",
81+
"+I[BOB, 5]",
82+
"-U[BOB, 5]",
83+
"+U[BOB, 11]",
84+
"+I[CHARLY, 33]")
85+
.build())
86+
.runSql(
87+
"INSERT INTO sink_t SELECT UPPER(name), SUM(score) FROM source_t GROUP BY name")
88+
.build();
89+
}

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -753,10 +753,10 @@ LogicalSink(table=[default_catalog.default_database.another_pk_snk], fields=[a,
753753
</Resource>
754754
<Resource name="optimized rel plan">
755755
<![CDATA[
756-
Sink(table=[default_catalog.default_database.another_pk_snk], fields=[a, b, c], upsertMaterialize=[true], duplicateChanges=[NONE])
757-
+- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
758-
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], duplicateChanges=[DISALLOW])
759-
+- TableSourceScan(table=[[default_catalog, default_database, retract_src]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
756+
Sink(table=[default_catalog.default_database.another_pk_snk], fields=[a, b, c], duplicateChanges=[NONE])
757+
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
758+
+- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], duplicateChanges=[ALLOW])
759+
+- TableSourceScan(table=[[default_catalog, default_database, retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
760760
]]>
761761
</Resource>
762762
</TestCase>

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3
3131
</Resource>
3232
<Resource name="optimized rel plan">
3333
<![CDATA[
34-
Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b2, b1], upsertMaterialize=[true])
34+
Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b2, b1])
3535
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
3636
:- Exchange(distribution=[hash[a1, a2]])
3737
: +- TableSourceScan(table=[[default_catalog, default_database, src1]], fields=[a0, a1, a2, a3])
@@ -602,7 +602,7 @@ LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EX
602602
</Resource>
603603
<Resource name="optimized rel plan">
604604
<![CDATA[
605-
Sink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1], upsertMaterialize=[true])
605+
Sink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
606606
+- Calc(select=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
607607
+- GroupAggregate(groupBy=[a0, b1], select=[a0, b1, MAX(a1) AS EXPR$1, MAX(a2) AS EXPR$2, MAX(a3) AS EXPR$3, MAX(b0) AS EXPR$4, MAX(b2) AS EXPR$5])
608608
+- Exchange(distribution=[hash[a0, b1]])
@@ -662,7 +662,7 @@ LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3
662662
</Resource>
663663
<Resource name="optimized rel plan">
664664
<![CDATA[
665-
Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b2, b1], upsertMaterialize=[true])
665+
Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b2, b1])
666666
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
667667
:- Exchange(distribution=[hash[a1, a2]])
668668
: +- Calc(select=[a0, a1, a2, a3])

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,29 @@ LogicalSink(table=[default_catalog.default_database.retractSink], fields=[cnt, a
646646
<Resource name="optimized rel plan">
647647
<![CDATA[
648648
Sink(table=[default_catalog.default_database.retractSink], fields=[cnt, a], changelogMode=[NONE])
649+
+- GroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(a) AS a], changelogMode=[I,UB,UA,D])
650+
+- Exchange(distribution=[hash[cnt]], changelogMode=[I,UB,UA])
651+
+- GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
652+
+- Exchange(distribution=[hash[a]], changelogMode=[I])
653+
+- Calc(select=[a], changelogMode=[I])
654+
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I])
655+
]]>
656+
</Resource>
657+
</TestCase>
658+
<TestCase name="testRetractSinkWithPrimaryKey">
659+
<Resource name="ast">
660+
<![CDATA[
661+
LogicalSink(table=[default_catalog.default_database.retractSink], fields=[cnt, a])
662+
+- LogicalAggregate(group=[{0}], a=[COUNT($1)])
663+
+- LogicalProject(cnt=[$1], a=[$0])
664+
+- LogicalAggregate(group=[{0}], cnt=[COUNT()])
665+
+- LogicalProject(a=[$0])
666+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
667+
]]>
668+
</Resource>
669+
<Resource name="optimized rel plan">
670+
<![CDATA[
671+
Sink(table=[default_catalog.default_database.retractSink], fields=[cnt, a], changelogMode=[NONE])
649672
+- GroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(a) AS a], changelogMode=[I,UB,UA,D])
650673
+- Exchange(distribution=[hash[cnt]], changelogMode=[I,UB,UA])
651674
+- GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt], changelogMode=[I,UB,UA])

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,30 @@ class TableSinkTest extends TableTestBase {
207207
util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
208208
}
209209

210+
@Test
211+
def testRetractSinkWithPrimaryKey(): Unit = {
212+
util.addTable(s"""
213+
|CREATE TABLE retractSink (
214+
| `cnt` BIGINT,
215+
| `a` BIGINT,
216+
| PRIMARY KEY (a) NOT ENFORCED
217+
|) WITH (
218+
| 'connector' = 'values',
219+
| 'sink-changelog-mode-enforced' = 'I,UB,UA,D'
220+
|)
221+
|""".stripMargin)
222+
val dml =
223+
"""
224+
|INSERT INTO retractSink
225+
|SELECT cnt, COUNT(a) AS a FROM (
226+
| SELECT a, COUNT(*) AS cnt FROM MyTable GROUP BY a) t
227+
|GROUP BY cnt
228+
""".stripMargin
229+
val stmtSet = util.tableEnv.createStatementSet()
230+
stmtSet.addInsertSql(dml)
231+
util.verifyRelPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
232+
}
233+
210234
@Test
211235
def testUpsertSink(): Unit = {
212236
util.addTable(s"""

0 commit comments

Comments
 (0)