Skip to content

Commit 8513d27

Browse files
committed
[FLINK-38460] Add SinkUpsertMaterializerMigrationTest
1 parent a9bae39 commit 8513d27

File tree

4 files changed

+190
-0
lines changed

4 files changed

+190
-0
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.sink;
20+
21+
import org.apache.flink.FlinkVersion;
22+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
23+
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
24+
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
25+
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
26+
import org.apache.flink.table.data.RowData;
27+
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
28+
import org.apache.flink.table.types.logical.RowType;
29+
import org.apache.flink.test.util.MigrationTest;
30+
import org.apache.flink.types.RowKind;
31+
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.junit.runners.Parameterized;
35+
36+
import java.nio.file.Files;
37+
import java.nio.file.Path;
38+
import java.nio.file.Paths;
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
import java.util.Set;
42+
43+
import static org.apache.flink.FlinkVersion.current;
44+
import static org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename;
45+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR;
46+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER;
47+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES;
48+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG;
49+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY;
50+
import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY_EQUALISER;
51+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
52+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
53+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
54+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
55+
56+
/** Test for {@link SinkUpsertMaterializer} migration. */
57+
@RunWith(Parameterized.class)
58+
public class SinkUpsertMaterializerMigrationTest implements MigrationTest {
59+
60+
private static final String FOLDER_NAME = "sink-upsert-materializer";
61+
62+
@Parameterized.Parameter(0)
63+
@SuppressWarnings({"ClassEscapesDefinedScope", "DefaultAnnotationParam"})
64+
public SinkOperationMode migrateFrom;
65+
66+
@Parameterized.Parameter(1)
67+
@SuppressWarnings("ClassEscapesDefinedScope")
68+
public SinkOperationMode migrateTo;
69+
70+
@Parameterized.Parameters(name = "{0} -> {1}")
71+
public static List<Object[]> parameters() {
72+
List<Object[]> result = new ArrayList<>();
73+
Set<FlinkVersion> versions = FlinkVersion.rangeOf(FlinkVersion.v2_2, FlinkVersion.v2_2);
74+
for (FlinkVersion fromVersion : versions) {
75+
for (SinkUpsertMaterializerStateBackend backend :
76+
SinkUpsertMaterializerStateBackend.values()) {
77+
result.add(
78+
new Object[] {
79+
new SinkOperationMode(fromVersion, backend),
80+
new SinkOperationMode(current(), backend)
81+
});
82+
}
83+
}
84+
return result;
85+
}
86+
87+
@Test
88+
public void testMigration() throws Exception {
89+
String path = getResourceFilename(FOLDER_NAME + "/" + getFileName(migrateFrom));
90+
try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
91+
createHarness(migrateTo, path)) {
92+
testCorrectnessAfterSnapshot(harness);
93+
}
94+
}
95+
96+
private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(
97+
SinkOperationMode mode, String snapshotPath) throws Exception {
98+
int[] inputUpsertKey = {UPSERT_KEY};
99+
OneInputStreamOperator<RowData, RowData> materializer =
100+
SinkUpsertMaterializer.create(
101+
TTL_CONFIG,
102+
RowType.of(LOGICAL_TYPES),
103+
EQUALISER,
104+
UPSERT_KEY_EQUALISER,
105+
inputUpsertKey);
106+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness =
107+
SinkUpsertMaterializerTest.createHarness(
108+
materializer, mode.stateBackend, LOGICAL_TYPES);
109+
harness.setup(new RowDataSerializer(LOGICAL_TYPES));
110+
if (snapshotPath != null) {
111+
harness.initializeState(snapshotPath);
112+
}
113+
harness.open();
114+
harness.setStateTtlProcessingTime(1);
115+
return harness;
116+
}
117+
118+
private void testCorrectnessBeforeSnapshot(
119+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception {
120+
121+
testHarness.processElement(insertRecord(1L, 1, "a1"));
122+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
123+
124+
testHarness.processElement(updateAfterRecord(1L, 1, "a11"));
125+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
126+
127+
testHarness.processElement(insertRecord(3L, 1, "a3"));
128+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
129+
}
130+
131+
private void testCorrectnessAfterSnapshot(
132+
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception {
133+
testHarness.processElement(deleteRecord(1L, 1, "a111"));
134+
ASSERTOR.shouldEmitNothing(testHarness);
135+
136+
testHarness.processElement(deleteRecord(3L, 1, "a33"));
137+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
138+
139+
testHarness.processElement(insertRecord(4L, 1, "a4"));
140+
ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
141+
142+
testHarness.setStateTtlProcessingTime(1002);
143+
testHarness.processElement(deleteRecord(4L, 1, "a4"));
144+
ASSERTOR.shouldEmitNothing(testHarness);
145+
}
146+
147+
private static String getFileName(SinkOperationMode mode) {
148+
return String.format(
149+
"migration-flink-%s-%s-%s-snapshot", mode.version, mode.stateBackend, "V1");
150+
}
151+
152+
@SnapshotsGenerator
153+
public void writeSnapshot(FlinkVersion version) throws Exception {
154+
for (SinkUpsertMaterializerStateBackend stateBackend :
155+
SinkUpsertMaterializerStateBackend.values()) {
156+
SinkOperationMode mode = new SinkOperationMode(version, stateBackend);
157+
try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
158+
createHarness(mode, null)) {
159+
testCorrectnessBeforeSnapshot(harness);
160+
Path parent = Paths.get("src/test/resources", FOLDER_NAME);
161+
Files.createDirectories(parent);
162+
OperatorSnapshotUtil.writeStateHandle(
163+
harness.snapshot(1L, 1L), parent.resolve(getFileName(mode)).toString());
164+
}
165+
}
166+
}
167+
168+
public static void main(String... s) throws Exception {
169+
// Run this to manually generate snapshot files for migration tests
170+
// set working directory to flink-table/flink-table-runtime/
171+
new SinkUpsertMaterializerMigrationTest().writeSnapshot(current());
172+
}
173+
174+
private static class SinkOperationMode {
175+
private final FlinkVersion version;
176+
private final SinkUpsertMaterializerStateBackend stateBackend;
177+
178+
private SinkOperationMode(
179+
FlinkVersion version, SinkUpsertMaterializerStateBackend stateBackend) {
180+
this.version = version;
181+
this.stateBackend = stateBackend;
182+
}
183+
184+
@Override
185+
public String toString() {
186+
return String.format("flink=%s, state=%s}", version, stateBackend);
187+
}
188+
}
189+
}
2.51 KB
Binary file not shown.
15.7 KB
Binary file not shown.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,6 +1659,7 @@ under the License.
16591659
<exclude>**/src/test/resources/**/serializer-snapshot</exclude>
16601660
<exclude>**/src/test/resources/**/test-data</exclude>
16611661
<exclude>**/src/test/resources/*-snapshot</exclude>
1662+
<exclude>**/src/test/resources/**/*-snapshot</exclude>
16621663
<exclude>**/src/test/resources/*.snapshot</exclude>
16631664
<exclude>**/src/test/resources/*-savepoint/**</exclude>
16641665
<exclude>**/src/test/resources/*-savepoint-native/**</exclude>

0 commit comments

Comments
 (0)