Skip to content

Commit bbc876d

Browse files
committed
[FLINK-38460] Add SinkUpsertMaterializerRescalingTest
1 parent 8513d27 commit bbc876d

File tree

1 file changed

+353
-0
lines changed

1 file changed

+353
-0
lines changed
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.sink;
20+
21+
import org.apache.flink.api.common.state.StateTtlConfig;
22+
import org.apache.flink.core.execution.SavepointFormatType;
23+
import org.apache.flink.runtime.checkpoint.CheckpointType;
24+
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
25+
import org.apache.flink.runtime.checkpoint.SavepointType;
26+
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
27+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
28+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
29+
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
30+
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
31+
import org.apache.flink.table.data.RowData;
32+
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
33+
import org.apache.flink.table.runtime.generated.HashFunction;
34+
import org.apache.flink.table.runtime.generated.RecordEqualiser;
35+
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
36+
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
37+
import org.apache.flink.table.runtime.util.StateConfigUtil;
38+
import org.apache.flink.table.types.logical.BigIntType;
39+
import org.apache.flink.table.types.logical.IntType;
40+
import org.apache.flink.table.types.logical.LogicalType;
41+
import org.apache.flink.table.types.logical.RowType;
42+
import org.apache.flink.table.types.logical.VarCharType;
43+
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
44+
import org.apache.flink.types.RowKind;
45+
46+
import org.junit.Test;
47+
import org.junit.runner.RunWith;
48+
import org.junit.runners.Parameterized;
49+
import org.junit.runners.Parameterized.Parameter;
50+
51+
import javax.annotation.Nullable;
52+
53+
import java.util.ArrayList;
54+
import java.util.Arrays;
55+
import java.util.List;
56+
import java.util.Objects;
57+
import java.util.function.ToIntFunction;
58+
import java.util.stream.Collectors;
59+
60+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
61+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
62+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
63+
64+
/** Rescaling and migration unit tests for {@link SinkUpsertMaterializer}. */
65+
@RunWith(Parameterized.class)
66+
public class SinkUpsertMaterializerRescalingTest {
67+
68+
@Parameter public SinkUpsertMaterializerStateBackend backend;
69+
70+
@Parameterized.Parameters(name = "stateBackend={0}")
71+
public static Object[][] generateTestParameters() {
72+
List<Object[]> result = new ArrayList<>();
73+
for (SinkUpsertMaterializerStateBackend backend :
74+
SinkUpsertMaterializerStateBackend.values()) {
75+
result.add(new Object[] {backend});
76+
}
77+
return result.toArray(new Object[0][]);
78+
}
79+
80+
@Test
81+
public void testScaleUpThenDown() throws Exception {
82+
testRescaleFromToFrom(10, 2, 3, backend, backend);
83+
}
84+
85+
@Test
86+
public void testScaleDownThenUp() throws Exception {
87+
testRescaleFromToFrom(10, 3, 2, backend, backend);
88+
}
89+
90+
@Test
91+
public void testRecovery() throws Exception {
92+
testRescaleFromToFrom(1, 1, 1, backend, backend);
93+
}
94+
95+
@Test
96+
public void testForwardAndBackwardMigration() throws Exception {
97+
testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend));
98+
}
99+
100+
@Test
101+
public void testScaleUpThenDownWithMigration() throws Exception {
102+
testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend));
103+
}
104+
105+
@Test
106+
public void testScaleDownThenUpWithMigration() throws Exception {
107+
testRescaleFromToFrom(
108+
7, 5, 1, backend, getOtherBackend(SinkUpsertMaterializerStateBackend.HEAP));
109+
}
110+
111+
private SinkUpsertMaterializerStateBackend getOtherBackend(
112+
SinkUpsertMaterializerStateBackend backend) {
113+
return backend == SinkUpsertMaterializerStateBackend.HEAP
114+
? SinkUpsertMaterializerStateBackend.ROCKSDB
115+
: SinkUpsertMaterializerStateBackend.HEAP;
116+
}
117+
118+
@SuppressWarnings("unchecked")
119+
private void testRescaleFromToFrom(
120+
final int maxParallelism,
121+
final int fromParallelism,
122+
final int toParallelism,
123+
final SinkUpsertMaterializerStateBackend fromBackend,
124+
final SinkUpsertMaterializerStateBackend toBackend)
125+
throws Exception {
126+
127+
int[] currentParallelismRef = new int[] {fromParallelism};
128+
129+
boolean useSavepoint = fromBackend != toBackend;
130+
131+
OneInputStreamOperator<RowData, RowData>[] materializers =
132+
new OneInputStreamOperator[maxParallelism];
133+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses =
134+
new KeyedOneInputStreamOperatorTestHarness[maxParallelism];
135+
136+
final ToIntFunction<StreamRecord<RowData>> combinedHarnesses =
137+
(r) -> {
138+
try {
139+
int subtaskIndex =
140+
KeyGroupRangeAssignment.assignKeyToParallelOperator(
141+
KEY_SELECTOR.getKey(r.getValue()),
142+
maxParallelism,
143+
currentParallelismRef[0]);
144+
145+
harnesses[subtaskIndex].processElement(r);
146+
return subtaskIndex;
147+
} catch (Exception e) {
148+
throw new RuntimeException(e);
149+
}
150+
};
151+
152+
initHarnessesAndMaterializers(
153+
harnesses, materializers, fromBackend, maxParallelism, fromParallelism, null);
154+
155+
int idx = combinedHarnesses.applyAsInt(insertRecord(1L, 1, "a1"));
156+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
157+
158+
idx = combinedHarnesses.applyAsInt(insertRecord(2L, 1, "a2"));
159+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
160+
161+
List<OperatorSubtaskState> subtaskStates =
162+
snapshotHarnesses(harnesses, fromParallelism, 1L, useSavepoint);
163+
164+
currentParallelismRef[0] = toParallelism;
165+
initHarnessesAndMaterializers(
166+
harnesses, materializers, toBackend, maxParallelism, toParallelism, subtaskStates);
167+
168+
idx = combinedHarnesses.applyAsInt(insertRecord(3L, 1, "a3"));
169+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
170+
171+
idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4"));
172+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 4L, 1, "a4"));
173+
174+
subtaskStates = snapshotHarnesses(harnesses, toParallelism, 2L, useSavepoint);
175+
176+
currentParallelismRef[0] = fromParallelism;
177+
initHarnessesAndMaterializers(
178+
harnesses,
179+
materializers,
180+
fromBackend,
181+
maxParallelism,
182+
fromParallelism,
183+
subtaskStates);
184+
185+
idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4"));
186+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
187+
188+
idx = combinedHarnesses.applyAsInt(deleteRecord(2L, 1, "a2"));
189+
ASSERTOR.shouldEmitNothing(harnesses[idx]);
190+
191+
idx = combinedHarnesses.applyAsInt(deleteRecord(3L, 1, "a3"));
192+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1"));
193+
194+
idx = combinedHarnesses.applyAsInt(deleteRecord(1L, 1, "a1"));
195+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
196+
197+
idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4"));
198+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
199+
200+
Arrays.stream(harnesses)
201+
.filter(Objects::nonNull)
202+
.forEach(h -> h.setStateTtlProcessingTime(1002));
203+
204+
idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4"));
205+
ASSERTOR.shouldEmitNothing(harnesses[idx]);
206+
207+
Arrays.stream(harnesses)
208+
.filter(Objects::nonNull)
209+
.forEach(
210+
h -> {
211+
try {
212+
h.close();
213+
} catch (Exception e) {
214+
throw new RuntimeException(e);
215+
}
216+
});
217+
}
218+
219+
private void initHarnessesAndMaterializers(
220+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses,
221+
OneInputStreamOperator<RowData, RowData>[] materializers,
222+
SinkUpsertMaterializerStateBackend backend,
223+
int maxParallelism,
224+
int parallelism,
225+
@Nullable List<OperatorSubtaskState> subtaskStates)
226+
throws Exception {
227+
for (int i = 0; i < parallelism; ++i) {
228+
materializers[i] =
229+
SinkUpsertMaterializer.create(
230+
TTL_CONFIG,
231+
RowType.of(LOGICAL_TYPES),
232+
EQUALISER,
233+
UPSERT_KEY_EQUALISER,
234+
null);
235+
harnesses[i] =
236+
new KeyedOneInputStreamOperatorTestHarness<>(
237+
materializers[i],
238+
KEY_SELECTOR,
239+
KEY_SELECTOR.getProducedType(),
240+
maxParallelism,
241+
parallelism,
242+
i);
243+
244+
harnesses[i].setStateBackend(backend.create(false));
245+
246+
if (subtaskStates != null) {
247+
OperatorSubtaskState operatorSubtaskState =
248+
AbstractStreamOperatorTestHarness.repackageState(
249+
subtaskStates.toArray(new OperatorSubtaskState[0]));
250+
251+
harnesses[i].initializeState(
252+
AbstractStreamOperatorTestHarness.repartitionOperatorState(
253+
operatorSubtaskState,
254+
maxParallelism,
255+
subtaskStates.size(),
256+
parallelism,
257+
i));
258+
}
259+
260+
harnesses[i].open();
261+
harnesses[i].setStateTtlProcessingTime(1);
262+
}
263+
}
264+
265+
private List<OperatorSubtaskState> snapshotHarnesses(
266+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses,
267+
int parallelism,
268+
long checkpointId,
269+
boolean useSavepoint) {
270+
return Arrays.stream(harnesses, 0, parallelism)
271+
.map(
272+
h -> {
273+
try {
274+
return h.snapshotWithLocalState(
275+
checkpointId,
276+
0L,
277+
useSavepoint
278+
? SavepointType.savepoint(
279+
SavepointFormatType.CANONICAL)
280+
: CheckpointType.CHECKPOINT)
281+
.getJobManagerOwnedState();
282+
} catch (Exception e) {
283+
throw new RuntimeException(e);
284+
}
285+
})
286+
.collect(Collectors.toList());
287+
}
288+
289+
/** Test equalizer for records. */
290+
protected static class TestRecordEqualiser implements RecordEqualiser, HashFunction {
291+
@Override
292+
public boolean equals(RowData row1, RowData row2) {
293+
return row1.getRowKind() == row2.getRowKind()
294+
&& row1.getLong(0) == row2.getLong(0)
295+
&& row1.getInt(1) == row2.getInt(1)
296+
&& row1.getString(2).equals(row2.getString(2));
297+
}
298+
299+
@Override
300+
public int hashCode(Object data) {
301+
RowData rd = (RowData) data;
302+
return Objects.hash(rd.getRowKind(), rd.getLong(0), rd.getInt(1), rd.getString(2));
303+
}
304+
}
305+
306+
/** Test equalizer for upsert keys. */
307+
protected static class TestUpsertKeyEqualiser implements RecordEqualiser, HashFunction {
308+
@Override
309+
public boolean equals(RowData row1, RowData row2) {
310+
return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0);
311+
}
312+
313+
@Override
314+
public int hashCode(Object data) {
315+
RowData rd = (RowData) data;
316+
return Objects.hash(rd.getRowKind(), rd.getLong(0));
317+
}
318+
}
319+
320+
private static class MyGeneratedRecordEqualiser extends GeneratedRecordEqualiser {
321+
322+
public MyGeneratedRecordEqualiser() {
323+
super("", "", new Object[0]);
324+
}
325+
326+
@Override
327+
public RecordEqualiser newInstance(ClassLoader classLoader) {
328+
return new TestRecordEqualiser();
329+
}
330+
}
331+
332+
private static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000);
333+
334+
private static final LogicalType[] LOGICAL_TYPES =
335+
new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()};
336+
337+
private static final RowDataKeySelector KEY_SELECTOR =
338+
HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, LOGICAL_TYPES);
339+
340+
private static final RowDataHarnessAssertor ASSERTOR =
341+
new RowDataHarnessAssertor(LOGICAL_TYPES);
342+
343+
private static final GeneratedRecordEqualiser EQUALISER = new MyGeneratedRecordEqualiser();
344+
345+
private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
346+
new GeneratedRecordEqualiser("", "", new Object[0]) {
347+
348+
@Override
349+
public RecordEqualiser newInstance(ClassLoader classLoader) {
350+
return new TestUpsertKeyEqualiser();
351+
}
352+
};
353+
}

0 commit comments

Comments
 (0)