Skip to content

Commit 59085a0

Browse files
committed
[FLINK-38460] Add SinkUpsertMaterializerMigrationTest
1 parent fa9529b commit 59085a0

File tree

4 files changed

+187
-0
lines changed

4 files changed

+187
-0
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.sink;
20+
21+
import org.apache.flink.FlinkVersion;
22+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
23+
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
24+
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
25+
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
28+
import org.apache.flink.table.types.logical.RowType;
29+
import org.apache.flink.test.util.MigrationTest;
30+
import org.apache.flink.types.RowKind;
31+
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.junit.runners.Parameterized;
35+
36+
import java.nio.file.Files;
37+
import java.nio.file.Path;
38+
import java.nio.file.Paths;
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
import java.util.Set;
42+
43+
import static org.apache.flink.FlinkVersion.current;
44+
import static org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename;
45+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR;
46+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER;
47+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES;
48+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG;
49+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY;
50+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY_EQUALISER;
51+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
52+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
53+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
54+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
55+
56+
/** Test for {@link SinkUpsertMaterializer} migration. */
57+
@RunWith(Parameterized.class)
58+
public class SinkUpsertMaterializerMigrationTest implements MigrationTest {
59+
60+
private static final String FOLDER_NAME = "sink-upsert-materializer";
61+
62+
@Parameterized.Parameter(0)
63+
@SuppressWarnings({"ClassEscapesDefinedScope", "DefaultAnnotationParam"})
64+
public SinkOperationMode migrateFrom;
65+
66+
@Parameterized.Parameter(1)
67+
@SuppressWarnings("ClassEscapesDefinedScope")
68+
public SinkOperationMode migrateTo;
69+
70+
@Parameterized.Parameters(name = "{0} -> {1}")
71+
public static List<Object[]> parameters() {
72+
List<Object[]> result = new ArrayList<>();
73+
Set<FlinkVersion> versions = FlinkVersion.rangeOf(FlinkVersion.v2_2, FlinkVersion.v2_2);
74+
for (FlinkVersion fromVersion : versions) {
75+
for (SumStateBackend backend : SumStateBackend.values()) {
76+
result.add(
77+
new Object[] {
78+
new SinkOperationMode(fromVersion, backend),
79+
new SinkOperationMode(current(), backend)
80+
});
81+
}
82+
}
83+
return result;
84+
}
85+
86+
@Test
87+
public void testMigration() throws Exception {
88+
String path = getResourceFilename(FOLDER_NAME + "/" + getFileName(migrateFrom));
89+
try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
90+
createHarness(migrateTo, path)) {
91+
testCorrectnessAfterSnapshot(harness);
92+
}
93+
}
94+
95+
private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(
96+
SinkOperationMode mode, String snapshotPath) throws Exception {
97+
int[] inputUpsertKey = {UPSERT_KEY};
98+
OneInputStreamOperator<RowData, RowData> materializer =
99+
SinkUpsertMaterializer.create(
100+
TTL_CONFIG,
101+
RowType.of(LOGICAL_TYPES),
102+
EQUALISER,
103+
UPSERT_KEY_EQUALISER,
104+
inputUpsertKey);
105+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness =
106+
SinkUpsertMaterializerTest.createHarness(
107+
materializer, mode.stateBackend, LOGICAL_TYPES);
108+
harness.setup(new RowDataSerializer(LOGICAL_TYPES));
109+
if (snapshotPath != null) {
110+
harness.initializeState(snapshotPath);
111+
}
112+
harness.open();
113+
harness.setStateTtlProcessingTime(1);
114+
return harness;
115+
}
116+
117+
private void testCorrectnessBeforeSnapshot(
118+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception {
119+
120+
testHarness.processElement(insertRecord(1L, 1, "a1"));
121+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
122+
123+
testHarness.processElement(updateAfterRecord(1L, 1, "a11"));
124+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
125+
126+
testHarness.processElement(insertRecord(3L, 1, "a3"));
127+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
128+
}
129+
130+
private void testCorrectnessAfterSnapshot(
131+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception {
132+
testHarness.processElement(deleteRecord(1L, 1, "a111"));
133+
ASSERTOR.shouldEmitNothing(testHarness);
134+
135+
testHarness.processElement(deleteRecord(3L, 1, "a33"));
136+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
137+
138+
testHarness.processElement(insertRecord(4L, 1, "a4"));
139+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
140+
141+
testHarness.setStateTtlProcessingTime(1002);
142+
testHarness.processElement(deleteRecord(4L, 1, "a4"));
143+
ASSERTOR.shouldEmitNothing(testHarness);
144+
}
145+
146+
private static String getFileName(SinkOperationMode mode) {
147+
return String.format(
148+
"migration-flink-%s-%s-%s-snapshot", mode.version, mode.stateBackend, "V1");
149+
}
150+
151+
@SnapshotsGenerator
152+
public void writeSnapshot(FlinkVersion version) throws Exception {
153+
for (SumStateBackend stateBackend : SumStateBackend.values()) {
154+
SinkOperationMode mode = new SinkOperationMode(version, stateBackend);
155+
try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
156+
createHarness(mode, null)) {
157+
testCorrectnessBeforeSnapshot(harness);
158+
Path parent = Paths.get("src/test/resources", FOLDER_NAME);
159+
Files.createDirectories(parent);
160+
OperatorSnapshotUtil.writeStateHandle(
161+
harness.snapshot(1L, 1L), parent.resolve(getFileName(mode)).toString());
162+
}
163+
}
164+
}
165+
166+
public static void main(String... s) throws Exception {
167+
// Run this to manually generate snapshot files for migration tests
168+
// set working directory to flink-table/flink-table-runtime/
169+
new SinkUpsertMaterializerMigrationTest().writeSnapshot(current());
170+
}
171+
172+
private static class SinkOperationMode {
173+
private final FlinkVersion version;
174+
private final SumStateBackend stateBackend;
175+
176+
private SinkOperationMode(FlinkVersion version, SumStateBackend stateBackend) {
177+
this.version = version;
178+
this.stateBackend = stateBackend;
179+
}
180+
181+
@Override
182+
public String toString() {
183+
return String.format("flink=%s, state=%s}", version, stateBackend);
184+
}
185+
}
186+
}
2.51 KB
Binary file not shown.
15.7 KB
Binary file not shown.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,6 +1659,7 @@ under the License.
16591659
<exclude>**/src/test/resources/**/serializer-snapshot</exclude>
16601660
<exclude>**/src/test/resources/**/test-data</exclude>
16611661
<exclude>**/src/test/resources/*-snapshot</exclude>
1662+
<exclude>**/src/test/resources/**/*-snapshot</exclude>
16621663
<exclude>**/src/test/resources/*.snapshot</exclude>
16631664
<exclude>**/src/test/resources/*-savepoint/**</exclude>
16641665
<exclude>**/src/test/resources/*-savepoint-native/**</exclude>

0 commit comments

Comments
 (0)