From c07a902e6cd0cd99a0cd73a842299009703a106c Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 30 Sep 2025 00:33:01 +0200 Subject: [PATCH 1/5] [hotfix] Introduce OperatorSnapshotFinalizer factory method --- .../flink/state/api/output/SnapshotUtils.java | 2 +- .../operators/OperatorSnapshotFinalizer.java | 18 +++++++++++++----- .../runtime/tasks/AsyncCheckpointRunnable.java | 2 +- .../OperatorSnapshotFinalizerTest.java | 2 +- .../AbstractStreamOperatorTestHarness.java | 2 +- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index a3ecabe715f62..14688e4ee742d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -72,7 +72,7 @@ public static > TaggedOperatorSubtaskState s operator.snapshotState(checkpointId, timestamp, options, storage); OperatorSubtaskState state = - new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState(); + OperatorSnapshotFinalizer.create(snapshotInProgress).getJobManagerOwnedState(); operator.notifyCheckpointComplete(checkpointId); return new TaggedOperatorSubtaskState(index, state); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java index c308a6cb8aeec..455c2489a741e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java @@ -27,12 +27,11 @@ import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.util.concurrent.FutureUtils; -import javax.annotation.Nonnull; - import java.util.concurrent.ExecutionException; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singletonOrEmpty; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * This class finalizes {@link OperatorSnapshotFutures}. Each object is created with a {@link @@ -47,8 +46,9 @@ public class OperatorSnapshotFinalizer { /** Secondary replica of the operator subtask state for faster, local recovery on TM. */ private final OperatorSubtaskState taskLocalState; - public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFutures) + public static OperatorSnapshotFinalizer create(OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + checkNotNull(snapshotFutures); SnapshotResult keyedManaged = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); @@ -68,7 +68,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture SnapshotResult> resultSubpartition = snapshotFutures.getResultSubpartitionStateFuture().get(); - jobManagerOwnedState = + OperatorSubtaskState jobManagerOwnedState = OperatorSubtaskState.builder() .setManagedOperatorState( singletonOrEmpty(operatorManaged.getJobManagerOwnedSnapshot())) @@ -83,7 +83,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture emptyIfNull(resultSubpartition.getJobManagerOwnedSnapshot())) .build(); - taskLocalState = + OperatorSubtaskState taskLocalState = OperatorSubtaskState.builder() .setManagedOperatorState( singletonOrEmpty(operatorManaged.getTaskLocalSnapshot())) @@ -94,6 +94,14 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture .setResultSubpartitionState( emptyIfNull(resultSubpartition.getTaskLocalSnapshot())) .build(); + + return new OperatorSnapshotFinalizer(jobManagerOwnedState, taskLocalState); + } + + public OperatorSnapshotFinalizer( + OperatorSubtaskState jobManagerOwnedState, OperatorSubtaskState taskLocalState) { + this.jobManagerOwnedState = checkNotNull(jobManagerOwnedState); + this.taskLocalState = checkNotNull(taskLocalState); } public OperatorSubtaskState getTaskLocalState() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index 2e859d2ab8d2c..ee674fc1c2ad9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -189,7 +189,7 @@ private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws Exception // finalize the async part of all by executing all snapshot runnables OperatorSnapshotFinalizer finalizedSnapshots = - new OperatorSnapshotFinalizer(snapshotInProgress); + OperatorSnapshotFinalizer.create(snapshotInProgress); jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( operatorID, finalizedSnapshots.getJobManagerOwnedState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java index a64905098af06..17d6351e3655d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java @@ -93,7 +93,7 @@ void testRunAndExtract() throws Exception { assertThat(f).isNotDone(); } - OperatorSnapshotFinalizer finalizer = new OperatorSnapshotFinalizer(snapshotFutures); + OperatorSnapshotFinalizer finalizer = OperatorSnapshotFinalizer.create(snapshotFutures); for (Future f : snapshotFutures.getAllFutures()) { assertThat(f).isDone(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 31a6019a50a6c..b72f7299f4b8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -761,7 +761,7 @@ public OperatorSnapshotFinalizer snapshotWithLocalState( checkpointStorageAccess.resolveCheckpointStorageLocation( checkpointId, locationReference)); - return new OperatorSnapshotFinalizer(operatorStateResult); + return OperatorSnapshotFinalizer.create(operatorStateResult); } /** From c7f843ebb00cc02e54cb1f66fdad870947b41b39 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 30 Sep 2025 00:40:29 +0200 Subject: [PATCH 2/5] [hotfix] Introduce SinkUpsertMaterializer factory method --- .../operators/sink/SinkUpsertMaterializer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java index 8ba4b792e2b05..cd5f66f7a8da9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java @@ -31,6 +31,8 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; @@ -222,4 +224,18 @@ private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) { } return equaliser.equals(newRow, oldRow); } + + public static SinkUpsertMaterializer create( + StateTtlConfig ttlConfig, + RowType physicalRowType, + GeneratedRecordEqualiser rowEqualiser, + GeneratedRecordEqualiser upsertKeyEqualiser, + int[] inputUpsertKey) { + return new SinkUpsertMaterializer( + ttlConfig, + InternalSerializers.create(physicalRowType), + rowEqualiser, + upsertKeyEqualiser, + inputUpsertKey); + } } From a9bae395c73f0341506caa3222bef3a23ad8025d Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 30 Sep 2025 10:02:06 +0200 Subject: [PATCH 3/5] [FLINK-38460] Refactor SinkUpsertMaterializer tests --- .../SinkUpsertMaterializerStateBackend.java | 41 ++ .../sink/SinkUpsertMaterializerTest.java | 375 ++++++++++++++++-- .../runtime/util/RowDataHarnessAssertor.java | 8 + 3 files changed, 384 insertions(+), 40 deletions(-) create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java new file mode 100644 index 0000000000000..b675df2893e83 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; + +/** StateBackend to test SinkUpsertMaterializer with. */ +public enum SinkUpsertMaterializerStateBackend { + HEAP { + + public StateBackend create(boolean incrementalIfSupported) { + return new HashMapStateBackend(); + } + }, + ROCKSDB { + + public StateBackend create(boolean incrementalIfSupported) { + return new EmbeddedRocksDBStateBackend(incrementalIfSupported); + } + }; + + public abstract StateBackend create(boolean incrementalIfSupported); +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index 322ac9e1cebe7..9fc4b1cf88d8f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -19,40 +19,73 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; -import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; import org.apache.flink.types.RowKind; -import org.junit.jupiter.api.Test; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.api.java.tuple.Tuple2.of; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.junit.Assert.assertEquals; /** Test for {@link SinkUpsertMaterializer}. */ -class SinkUpsertMaterializerTest { +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerTest { + + static final int UPSERT_KEY = 0; + + @SuppressWarnings("ClassEscapesDefinedScope") + @Parameter + public SinkUpsertMaterializerStateBackend stateBackend; - private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1000); - private final LogicalType[] types = + @Parameterized.Parameters(name = "stateBackend={0}") + public static Object[][] generateTestParameters() { + List result = new ArrayList<>(); + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add(new Object[] {backend}); + } + return result.toArray(new Object[0][]); + } + + static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000); + static final LogicalType[] LOGICAL_TYPES = new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; - private final RowDataSerializer serializer = new RowDataSerializer(types); - private final RowDataKeySelector keySelector = - HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types); - private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + static final RowDataHarnessAssertor ASSERTOR = new RowDataHarnessAssertor(LOGICAL_TYPES); - private final GeneratedRecordEqualiser equaliser = + static final GeneratedRecordEqualiser EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) { @Override @@ -61,7 +94,7 @@ public RecordEqualiser newInstance(ClassLoader classLoader) { } }; - private final GeneratedRecordEqualiser upsertKeyEqualiser = + static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) { @Override @@ -70,83 +103,236 @@ public RecordEqualiser newInstance(ClassLoader classLoader) { } }; + /** + * If the composite serializer in {@link SinkUpsertMaterializer} works on projected fields then + * it might use the wrong serializer, e.g. the {@link VarCharType} instead of the {@link + * IntType}. That might cause {@link ArrayIndexOutOfBoundsException} because string serializer + * expects the first number to be the length of the string. + */ @Test - void test() throws Exception { - SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer( - ttlConfig, serializer, equaliser, upsertKeyEqualiser, null); + public void testUpsertKeySerializerFailure() throws Exception { + LogicalType[] types = new LogicalType[] {new VarCharType(), new IntType()}; + // project int field, while in the original record it's string + + OneInputStreamOperator materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + // -1 is not a valid string length + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", -1)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", -1)); + + // 999 as a string length is too long + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", 999)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", 999)); + } + } + + @Test + public void testUpsertKeySerializerSilentCorruption() throws Exception { + LogicalType[] types = + new LogicalType[] {new VarCharType(), new BigIntType(), new IntType()}; + // project int field, while in the original record it's string + + OneInputStreamOperator materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + + // this might serialize upsert key as 32-character string potentially including "97" + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", 32L, 97)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", 32L, 97)); + + // but here it might include "98" which would result in no output and test failure + testHarness.processElement(binaryRecord(RowKind.DELETE, "any string", 32L, 98)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, "any string", 32L, 98)); + } + } + + @Test + public void testUpsertEqualizer() throws Exception { + LogicalType[] types = new LogicalType[] {new IntType(), new BigIntType()}; + + OneInputStreamOperator materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + + // upsert key is 33, 0 is unused + testHarness.processElement(binaryRecord(RowKind.INSERT, 0, 33L)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 0, 33L)); + + // upsert key 33 - should remove AND clear the state involving upsert equalizer + // equalizer might fail if it's used on un-projected records + testHarness.processElement(binaryRecord(RowKind.DELETE, 1, 33L)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1, 33L)); + } + } + + @Test + public void testNoUpsertKeyFlow() throws Exception { KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - materializer, keySelector, keySelector.getProducedType()); + createHarness(createOperatorWithoutUpsertKey()); testHarness.open(); testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(insertRecord(2L, 1, "a2")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); testHarness.processElement(insertRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(2L, 1, "a2")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); testHarness.processElement(deleteRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); testHarness.processElement(insertRecord(4L, 1, "a4")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.close(); } @Test - void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { - SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer( - ttlConfig, serializer, equaliser, upsertKeyEqualiser, new int[] {0}); + public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { + OneInputStreamOperator materializer = + createOperator(LOGICAL_TYPES, UPSERT_KEY); KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - materializer, keySelector, keySelector.getProducedType()); + createHarness(materializer); testHarness.open(); testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(updateAfterRecord(1L, 1, "a11")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); testHarness.processElement(insertRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(1L, 1, "a111")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a33")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); testHarness.processElement(insertRecord(4L, 1, "a4")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); + + testHarness.close(); + } + + @Test + public void testRetractionWithoutUpsertKey() throws Exception { + testRetractions((int[]) null); + } + + @Test + public void testRetractionWithUpsertKey() throws Exception { + testRetractions(UPSERT_KEY); + } + + public void testRetractions(int... upsertKey) throws Exception { + testThreeElementProcessing( + "retract first - should emit nothing until empty - then delete", + upsertKey, + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, "a3"))); + testThreeElementProcessing( + "retract middle - should emit nothing until empty - then delete", + upsertKey, + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, "a3"))); + testThreeElementProcessing( + "retract last - should emit penultimate until empty - then delete", + upsertKey, + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), + of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, "a1"))); + testThreeElementProcessing( + "retract in arbitrary order: 1,3,2", + upsertKey, + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, "a2"))); + testThreeElementProcessing( + "retract in arbitrary order: 2,3,1", + upsertKey, + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), + of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, "a1"))); + testThreeElementProcessing( + "retract in arbitrary order: 3,1,2", + upsertKey, + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, "a2"))); + } + + // boilerplate for common test case of processing starting with three elements + @SafeVarargs + private void testThreeElementProcessing( + String description, + int[] upsertKey, + Tuple2, RowData>... inputAndOutput) + throws Exception { + @SuppressWarnings("rawtypes") + Tuple2[] merged = new Tuple2[inputAndOutput.length + 3]; + merged[0] = of(insertRecord(1L, 1, "a1"), rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + merged[1] = of(insertRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + merged[2] = of(insertRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + System.arraycopy(inputAndOutput, 0, merged, 3, inputAndOutput.length); + testElementProcessing(description, upsertKey, merged); + } + + @SafeVarargs + private void testElementProcessing( + String description, + int[] upsertKey, + Tuple2, RowData>... inputAndOutput) + throws Exception { + OneInputStreamOperator materializer = + createOperator(LOGICAL_TYPES, upsertKey); + KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer); + + testHarness.open(); + + for (Tuple2, RowData> el0 : inputAndOutput) { + testHarness.processElement(el0.f0); + if (el0.f1 == null) { + ASSERTOR.shouldEmitNothing(testHarness); + } else { + ASSERTOR.shouldEmit(testHarness, description, el0.f1); + } + } testHarness.close(); } @@ -162,9 +348,118 @@ public boolean equals(RowData row1, RowData row2) { } private static class TestUpsertKeyEqualiser implements RecordEqualiser { + @Override public boolean equals(RowData row1, RowData row2) { - return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(UPSERT_KEY) == row2.getLong(UPSERT_KEY); } } + + static KeyedOneInputStreamOperatorTestHarness createHarness( + OneInputStreamOperator materializer, + SinkUpsertMaterializerStateBackend backend, + LogicalType[] types) + throws Exception { + KeyedOneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + materializer, + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types), + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types) + .getProducedType()); + testHarness.setStateBackend(backend.create(true)); + return testHarness; + } + + @Test + public void testEmptyUpsertKey() throws Exception { + testRecovery(createOperator(LOGICAL_TYPES), createOperatorWithoutUpsertKey()); + testRecovery(createOperatorWithoutUpsertKey(), createOperator(LOGICAL_TYPES)); + } + + private void testRecovery( + OneInputStreamOperator from, + OneInputStreamOperator to) + throws Exception { + OperatorSubtaskState snapshot; + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(from)) { + testHarness.open(); + snapshot = testHarness.snapshot(1L, 1L); + } + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(to)) { + testHarness.initializeState(snapshot); + testHarness.open(); + } + } + + private KeyedOneInputStreamOperatorTestHarness createHarness( + OneInputStreamOperator m2) throws Exception { + return createHarness(m2, stateBackend, LOGICAL_TYPES); + } + + @Test + public void testVersionStateGrowth() throws Exception { + int dop = 2; + int numIterations = 10; + OperatorSnapshotFinalizer[] snapshots = new OperatorSnapshotFinalizer[dop]; + long[] prevStateSizes = new long[dop]; + for (int i = 0; i < numIterations; i++) { + for (int subtask = 0; subtask < dop; subtask++) { + snapshots[subtask] = initAndSnapshot(snapshots[subtask], i); + long currentStateSize = + snapshots[subtask] + .getJobManagerOwnedState() + .getManagedOperatorState() + .stream() + .mapToLong(StateObject::getStateSize) + .sum(); + if (i > 0) { + assertEquals(prevStateSizes[subtask], currentStateSize); + } + prevStateSizes[subtask] = currentStateSize; + } + List union = + Arrays.stream(snapshots) + .flatMap( + s -> + s + .getJobManagerOwnedState() + .getManagedOperatorState() + .stream()) + .collect(Collectors.toList()); + for (int j = 0; j < dop; j++) { + snapshots[j] = + new OperatorSnapshotFinalizer( + snapshots[j].getJobManagerOwnedState().toBuilder() + .setManagedOperatorState(new StateObjectCollection<>(union)) + .build(), + snapshots[j].getTaskLocalState()); + } + } + } + + private OperatorSnapshotFinalizer initAndSnapshot( + OperatorSnapshotFinalizer from, int newCheckpointID) throws Exception { + try (OneInputStreamOperatorTestHarness harness = + createHarness( + createOperator(LOGICAL_TYPES, UPSERT_KEY), stateBackend, LOGICAL_TYPES)) { + if (from != null) { + harness.initializeState(from.getJobManagerOwnedState()); + } + harness.open(); + return harness.snapshotWithLocalState(newCheckpointID, newCheckpointID); + } + } + + private OneInputStreamOperator createOperatorWithoutUpsertKey() { + return createOperator(LOGICAL_TYPES, (int[]) null); + } + + private OneInputStreamOperator createOperator( + LogicalType[] types, int... upsertKey) { + return SinkUpsertMaterializer.create( + TTL_CONFIG, RowType.of(types), EQUALISER, UPSERT_KEY_EQUALISER, upsertKey); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java index d7fbef9f16293..b4b074f724913 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java @@ -61,6 +61,14 @@ public void shouldEmit( assertThat(getEmittedRows(harness)).containsExactly(expected); } + /** Assert the test harness should emit records exactly same as the expected records. */ + public void shouldEmit( + AbstractStreamOperatorTestHarness harness, + String description, + RowData... expected) { + assertThat(getEmittedRows(harness)).describedAs(description).containsExactly(expected); + } + /** Assert the test harness should emit all records regardless of the order. */ public void shouldEmitAll( AbstractStreamOperatorTestHarness harness, RowData... expected) { From 8513d27d9bd7270b34c6a47d1a04a1a8b3337eeb Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 30 Sep 2025 10:34:34 +0200 Subject: [PATCH 4/5] [FLINK-38460] Add SinkUpsertMaterializerMigrationTest --- .../SinkUpsertMaterializerMigrationTest.java | 189 ++++++++++++++++++ .../migration-flink-2.2-HEAP-V1-snapshot | Bin 0 -> 2566 bytes .../migration-flink-2.2-ROCKSDB-V1-snapshot | Bin 0 -> 16099 bytes pom.xml | 1 + 4 files changed, 190 insertions(+) create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java create mode 100644 flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot create mode 100644 flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java new file mode 100644 index 0000000000000..1ab7ac4ae8468 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.util.MigrationTest; +import org.apache.flink.types.RowKind; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.FlinkVersion.current; +import static org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY_EQUALISER; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; + +/** Test for {@link SinkUpsertMaterializer} migration. */ +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerMigrationTest implements MigrationTest { + + private static final String FOLDER_NAME = "sink-upsert-materializer"; + + @Parameterized.Parameter(0) + @SuppressWarnings({"ClassEscapesDefinedScope", "DefaultAnnotationParam"}) + public SinkOperationMode migrateFrom; + + @Parameterized.Parameter(1) + @SuppressWarnings("ClassEscapesDefinedScope") + public SinkOperationMode migrateTo; + + @Parameterized.Parameters(name = "{0} -> {1}") + public static List parameters() { + List result = new ArrayList<>(); + Set versions = FlinkVersion.rangeOf(FlinkVersion.v2_2, FlinkVersion.v2_2); + for (FlinkVersion fromVersion : versions) { + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add( + new Object[] { + new SinkOperationMode(fromVersion, backend), + new SinkOperationMode(current(), backend) + }); + } + } + return result; + } + + @Test + public void testMigration() throws Exception { + String path = getResourceFilename(FOLDER_NAME + "/" + getFileName(migrateFrom)); + try (OneInputStreamOperatorTestHarness harness = + createHarness(migrateTo, path)) { + testCorrectnessAfterSnapshot(harness); + } + } + + private OneInputStreamOperatorTestHarness createHarness( + SinkOperationMode mode, String snapshotPath) throws Exception { + int[] inputUpsertKey = {UPSERT_KEY}; + OneInputStreamOperator materializer = + SinkUpsertMaterializer.create( + TTL_CONFIG, + RowType.of(LOGICAL_TYPES), + EQUALISER, + UPSERT_KEY_EQUALISER, + inputUpsertKey); + KeyedOneInputStreamOperatorTestHarness harness = + SinkUpsertMaterializerTest.createHarness( + materializer, mode.stateBackend, LOGICAL_TYPES); + harness.setup(new RowDataSerializer(LOGICAL_TYPES)); + if (snapshotPath != null) { + harness.initializeState(snapshotPath); + } + harness.open(); + harness.setStateTtlProcessingTime(1); + return harness; + } + + private void testCorrectnessBeforeSnapshot( + OneInputStreamOperatorTestHarness testHarness) throws Exception { + + testHarness.processElement(insertRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + testHarness.processElement(updateAfterRecord(1L, 1, "a11")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + + testHarness.processElement(insertRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + } + + private void testCorrectnessAfterSnapshot( + OneInputStreamOperatorTestHarness testHarness) throws Exception { + testHarness.processElement(deleteRecord(1L, 1, "a111")); + ASSERTOR.shouldEmitNothing(testHarness); + + testHarness.processElement(deleteRecord(3L, 1, "a33")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + + testHarness.processElement(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + testHarness.setStateTtlProcessingTime(1002); + testHarness.processElement(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmitNothing(testHarness); + } + + private static String getFileName(SinkOperationMode mode) { + return String.format( + "migration-flink-%s-%s-%s-snapshot", mode.version, mode.stateBackend, "V1"); + } + + @SnapshotsGenerator + public void writeSnapshot(FlinkVersion version) throws Exception { + for (SinkUpsertMaterializerStateBackend stateBackend : + SinkUpsertMaterializerStateBackend.values()) { + SinkOperationMode mode = new SinkOperationMode(version, stateBackend); + try (OneInputStreamOperatorTestHarness harness = + createHarness(mode, null)) { + testCorrectnessBeforeSnapshot(harness); + Path parent = Paths.get("src/test/resources", FOLDER_NAME); + Files.createDirectories(parent); + OperatorSnapshotUtil.writeStateHandle( + harness.snapshot(1L, 1L), parent.resolve(getFileName(mode)).toString()); + } + } + } + + public static void main(String... s) throws Exception { + // Run this to manually generate snapshot files for migration tests + // set working directory to flink-table/flink-table-runtime/ + new SinkUpsertMaterializerMigrationTest().writeSnapshot(current()); + } + + private static class SinkOperationMode { + private final FlinkVersion version; + private final SinkUpsertMaterializerStateBackend stateBackend; + + private SinkOperationMode( + FlinkVersion version, SinkUpsertMaterializerStateBackend stateBackend) { + this.version = version; + this.stateBackend = stateBackend; + } + + @Override + public String toString() { + return String.format("flink=%s, state=%s}", version, stateBackend); + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..be4fd71e72aca49ee5759760529f8273499b40ab GIT binary patch literal 2566 zcmeHJPmj_-6n_O5U89B=K7a>~wuJ=?#8a28yNyT)t(fIPhBB0{LrdCW-Syx>4}KZH zmnT1hi8KAfwp5Z`6He|VZF!G*|7PC%y$1kA*jmDqm?1>p!-@da#b1Fu=PitD^*L%OGl0YIOr*rc3SfG^Euj^%7Fe1S$NeZ#G$mN_*DiIISfbY} zyrvEbq@nle1W{yQ-$BIrjQwhkW4LG)EIi6IbAN>&zu+M7;pGk+Tz4Q5&vq;(NaAv~ZSJmThto<}JtJ*HNUqL#y`FEJpi3kX!VeN#KthEVzNo1aAZ z^mA6x%8@K@x%AxBB~4L)`AMd3DMMv$NPyBMB5NGP4hT?wtDS2n6T?(ZZDO7eG&mTk zz4sd1P=Ipyb7E*iUG3>(<{;;u@T)YP$K z-);%V1nEY2vicjx$uslM3cY%CvFF}|B2s)Gqc5S-&U*UbE4!dV6 zN6$2D6!(O4`)C~F9pIB7KL?>`XfUYnxc1iE@Hb`#R4aY;on{QwuD0E6Mmfug&8XXq zJZIKNOB}F?2B#Xq4t~aJoTFjGg>r^Uh^QO0(F9)b7hm!CwfUlYowL8OXR{|6-jcnj r!MDIysCJb0yzxq@iM3h7VguWeBBEBOF18h`W!0?Kw29i`UiSI}RFVM~ literal 0 HcmV?d00001 diff --git a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..1832c7a50ebf7d4c990e50258d741d220bd2b081 GIT binary patch literal 16099 zcmeHONo*WvcCHe)79~rRWgCVOI1O9K1cerYQ5KX7FZEV3pehe)!sxV}R=?8iTD?lEVcC^_t%G-Fv)*cm zh9k^Iwwm8)``FfMJGR;G8uUaPcfT~RTFs8zZht7lmCR=r&_6y@e?*s)BzZYak= zIH($9!y1aJwN2VzV$kDw4QLZULZ^RHqOWD?}tk3tFlCU z<)swmRle1o?-(zPs%Q9v>W)7d6(`5Bva)w@yz~Cfq4L_Z{~}57{|9rY8e_L=1*1{m z7nEcgkswJHv^4W+MnkHy{KD`iB1#xgm9=l}e7y5eKRVt#-qDXge!8P9oow!ZxPt{Z zlr?^FXpXVebfVaZVL-80J&wI2`mk$QaS%=)(C5sIW>|XVUly5a ze)Sv5)43C_zWRRP4|38EWWr?wPM=yxd}@7;ed)Js!I@8O zxr1xBBAEpP3;56`;BB!bERDJCV-`{iKy#0a;CMo8zS2y;VG_mpK%_=9-({}tX( zHV?NSZ4!DGVPoNI!nv;_a6G>Pd{X2m5Rys5Q5?Gd;6mPT+29PQzIL$rc<1Qp=Jw9T zXmc`ma(bqWHYb5=9~dJMA)^t6KybzQ%&DZ0)f!&)S~ZGdq*LvlS#LY-Zlz_^C<`;p ziqYYw9=@j(c8pTrWoO^#`@L?yG0R0Lk8|nC8Rv z(wb@o-elxg9b@Er)2eH$Z!9kR5UWfVR!eV<#ztr$mV4rb@NoRrG!|!WB({Te=Jv{R zD5A&>d=m0_dcX9isveAE3`A$(EjZInH;%MQy|qzhGFr3I?Q}QFQ4G>zFx6@s<&kl& z+pZPsCJu;bokm2uAh$!pVB;gcGJXhM3}FcL_+8NOp*zRP^)j0Z{B|p3L}P4 zs<@163nahsRJ4t5r`K#&YL4wxTB7DuOrzhfV92&az0-0G>)tD?!yt;YJ&&ID>xYeJ zzuS6tZ|${63Lu+&cyjRQVYBh9)p=6;_**CMKRbKl~m}IHz})k)N{F`Dsfi1`HC8ZBO|W9 z5Fsfr>mbFuQ5A9t=?L}CqDlvU z6=VnSF}gV!i&DC~6nM6Pkcr()QaA6bp*wJWlnyKv9Ak=bweb)Dpc|1k#?0JHwqffqFx{XuMm_FVA?c1x#{pr(j>7E)Ju2%)bgmE=+oJhnU zWRCdnpM9gEyt7^Phi! zw+qUhCr^*}o*W!i2%TzuAd_V~X1&wzSK6l2sI;7RqhjB_wwH1=7S?;^u_pj4WuS)$ z%W+VShoUUbjSX<`)gg`)Mq{C-yV~D?W z(y7*{^}F?U18V>`8Nlc;4LzFrmgX2a|v{-R&g%;vfc5BFvLSPHx5F5q8r_ZU8=orFClgv^^P(i*qz%ygC* zp}HOoCo#~~*Wz$OHy~D{iJ6Nc(nvH6sI!-X%NbK)>t~@G3!Ok%t9QEHZlj*EUXXK< z9?SZI9=;Qpkw)@mM;1ChBGiB`8AO!cW8_+4fOU0TRKBo-&L=RJs%N^F3X@F>jSHCd zWTX#B8?wOEQfG&NJP(nE;73Qa5Mc;^&y>rg2RsP-01=^EFrpazw7RWcv(xI5qyRm- zaNrYAZz!w)#yHJrNMYF~)DQ;ERMuuiXkFOHs=6Kbk|Aay;LO2Ikn2v)}`e`a<&_tX8WIQ z?(1kAkP8HeF&de=Ijg8p#PBlLN8CfG((0M@NWQSonuH<3iart}X-=%Fi~%GtJ~@vx zpmdl{Cb!f*cjU(U<1IcLVxe0?CRk6sGCg*YIh-B@{z>2=aqz?zC1A%G1*@k*j9K1n zp$JS`3!{<=xgBE>N+XyJQlQkkcr~F4xv|s;e3fP?(gq{luqeI(bD;#Mp>Bd+M3SqDB|ln zW+(+X)ge9|Uh6tj!rh3VV9rx1IdtZ*pI|P@V@ciH`9~1XF)*c0Vhmv;j3NmDOMHPh zh;yKO?$ap;Iu2m3g&pD2928pheydp{t8pXU0YNVSdzcmHaLbf1r@Va#1GWC|Ps>!%=h~eag-yl$I!K8nHFhDbSCllgg?f53i)d zOah}xEY343WaO|Jz(pZCBpg6D2@~n11Q0W{1Wxe>FgV08u!p1-@g+*vNT3;wjIm@d z=Q#0GKw?ZfE0_r^JdCx2Cx?$W349ZqM760;d>5Dx3!X8~L{gc2Df^{36C!zKWTG3} zF3b+bALw8@Ne0N`^K&yPaJWg0H3P;Ob0u+SrkOb{0Jhugw(7kG8KqQ5VotQ75Ho

;&?=-E@FI|Y&aUZfZUg(4PZmq7v^ zLmcEvd5(k1fmhMi#GN_!kpb~kmZ;nLH{M*jR7SYTzU|T{BaVasL*QFMw_Boy1GC@r;f@z{@^O(U_3zp66CR_j7=GfMM9Tn z!E_djpoWYgNbMMLc+4O=3Mc zF-_xP-;~57RDUWR2Zy}ClOzP37edSwMU54FZdQCP$%Y1cyZM;)+12Fhck9@oLsCbo`rMWa^RaD6}libNcc@WO6TIHJ|) zxB8uKqu<_OTDUZ@vA*#=To+C~5QPYbdL$BBr(3V}dL0Jw0g@;n9T+KyMw%!2tm}^_ zaf&OMC3AB|hv_E3RfKHTi1bc^6v9tw%cP(vaHxv#Z3(NXAQOM!%tq_i>4(KkU3h4`fu(VrGF_y_(es zMGcbO=HZDC1{xycry@Fu>=`09QWwZ_!plNOLWDhn%%@&|#Kdq4r3Pu2NiC>O3E4oj0qWH(Nb>TQDBzs zyScFYt8C;{(tR25HqcqA_M9Eo*b;Y%pI4j4zBKu?vkw8+5 zW;)_LS9plq=q+*V^dqf@OHv9GqJcoA#)Y;8rNy`%TJdmvH0I{JI|advLsvU}?D|iT zWq5etfY5hHAsR;%R$Y=@AhFn|rx>#fAx!Z3@+0M%qD!WohLqDIRKfcw`gW1YpedLh zf%^xc#wc;=vlFb7(x*zx!;=UZu^_w|PnI#7+fJ0IFpMUlQ=X@75yEWIL!o2xanL2D z%n)G(bLR@x3ZAHB!Et6);V7)h)=>s9qHIfw0;MVf`H*@VsR1T={8fn}g+4TJQ1?(H z2k4H*3M!n`$_Z^HjHCLZ-Y6bJOfPh7WFCmRQcCqx^Z>4U7_k9VqP7k~&U>6laBL^# zM;dkdSR+Mmv!`ezQ2UOW*bJO$(VJ@8^}&CPG&COK;;8ptg4kD*mlG0LQ^Dar+zduG z5CTmd?%R-XP(qHzSrLlkEN^0?Cz2=S-0dui@IpotO~&MNnT}RJVe5Dzd+O^KjAphL zBZ;vzxOiiIbKszl++OFDafGJ{%!TZBk(oe$NNxPwp8=GvPpvNPbQk#wwU#%s*-PfA zK9PdrXIGc3pfrQ{&>^5gj)NW>8+B4_6B?w^I~6u;49~}?E~e*WS{sBeHEm!d+&B|l z&TeA=*$f`1P)RO_VNpIZs0jwp9SSEbjch5Ni7-CSicwlyuimE?IyvoS)3VWm1xY?5 z1yj6WVy;_OxAn)YK zF?N<#txs`G9wtlJEj+Lbr$@pFts$lo0gVICwWc#jni$YLH9tslYPPL{81c9p4?keo z+(6Y%kL3F!0W}feMmX=Xk}8o=eiGabl_{AlvM6j~OP15ct31R*g)Iiy**g`M4~=M; zR5SE_2zsW1qOn%(riH%r4@or_ue@N<$BSh?R3@*Uwj#iyBK4U?&s};ZCq*?qLb!O!>du4WZ0kpU z$>6&4!(X1b?)+SLevrWzn)UFov|zaI{3K0A>XBV{ekgsq?);#se%<-uP8a1n{8f(P zQ|}rvr9jU+Rh$E?Q&lzPhAVWrKB{X`y}H z4$95r|7ts^P@i3QgRZ+lXr|6|P_DZ{Gaat$Zjk;}yFtpz_J@auI|s-3qI=(JS)FFH zSE-wZQNh>ZoJ!B`n3a0NGM#Quv^w<$z9YZ(c=KRy7r#4@Uy*OGDt9l8fR-Ve?S^Ps zm2Rzv-}JP)jf&~i@%4GzsW&^e(GYcfX9>A!8~?Vy`8z)-DRi^}dl{p^dz%eUWHTqrFnf3K=PRg+)h62YJT z?6c3m4}y!Jb8B&7psfC5>23PS!2Iul)Z|CxUoyS0u(ZU#E?JiUyrQZrN@*qezRvBn zAGiPP7eBxI{N-{;nGj{Dz6fFf literal 0 HcmV?d00001 diff --git a/pom.xml b/pom.xml index 94cd411a2955d..01e91e996708f 100644 --- a/pom.xml +++ b/pom.xml @@ -1659,6 +1659,7 @@ under the License. **/src/test/resources/**/serializer-snapshot **/src/test/resources/**/test-data **/src/test/resources/*-snapshot + **/src/test/resources/**/*-snapshot **/src/test/resources/*.snapshot **/src/test/resources/*-savepoint/** **/src/test/resources/*-savepoint-native/** From bbc876dd3477dca9dabf71f3cd8e8dbc21cbcf6b Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Tue, 30 Sep 2025 10:44:25 +0200 Subject: [PATCH 5/5] [FLINK-38460] Add SinkUpsertMaterializerRescalingTest --- .../SinkUpsertMaterializerRescalingTest.java | 353 ++++++++++++++++++ 1 file changed, 353 insertions(+) create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java new file mode 100644 index 0000000000000..9d8990264bce8 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.SavepointType; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.runtime.util.StateConfigUtil; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.ToIntFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; + +/** Rescaling and migration unit tests for {@link SinkUpsertMaterializer}. */ +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerRescalingTest { + + @Parameter public SinkUpsertMaterializerStateBackend backend; + + @Parameterized.Parameters(name = "stateBackend={0}") + public static Object[][] generateTestParameters() { + List result = new ArrayList<>(); + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add(new Object[] {backend}); + } + return result.toArray(new Object[0][]); + } + + @Test + public void testScaleUpThenDown() throws Exception { + testRescaleFromToFrom(10, 2, 3, backend, backend); + } + + @Test + public void testScaleDownThenUp() throws Exception { + testRescaleFromToFrom(10, 3, 2, backend, backend); + } + + @Test + public void testRecovery() throws Exception { + testRescaleFromToFrom(1, 1, 1, backend, backend); + } + + @Test + public void testForwardAndBackwardMigration() throws Exception { + testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend)); + } + + @Test + public void testScaleUpThenDownWithMigration() throws Exception { + testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend)); + } + + @Test + public void testScaleDownThenUpWithMigration() throws Exception { + testRescaleFromToFrom( + 7, 5, 1, backend, getOtherBackend(SinkUpsertMaterializerStateBackend.HEAP)); + } + + private SinkUpsertMaterializerStateBackend getOtherBackend( + SinkUpsertMaterializerStateBackend backend) { + return backend == SinkUpsertMaterializerStateBackend.HEAP + ? SinkUpsertMaterializerStateBackend.ROCKSDB + : SinkUpsertMaterializerStateBackend.HEAP; + } + + @SuppressWarnings("unchecked") + private void testRescaleFromToFrom( + final int maxParallelism, + final int fromParallelism, + final int toParallelism, + final SinkUpsertMaterializerStateBackend fromBackend, + final SinkUpsertMaterializerStateBackend toBackend) + throws Exception { + + int[] currentParallelismRef = new int[] {fromParallelism}; + + boolean useSavepoint = fromBackend != toBackend; + + OneInputStreamOperator[] materializers = + new OneInputStreamOperator[maxParallelism]; + KeyedOneInputStreamOperatorTestHarness[] harnesses = + new KeyedOneInputStreamOperatorTestHarness[maxParallelism]; + + final ToIntFunction> combinedHarnesses = + (r) -> { + try { + int subtaskIndex = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + KEY_SELECTOR.getKey(r.getValue()), + maxParallelism, + currentParallelismRef[0]); + + harnesses[subtaskIndex].processElement(r); + return subtaskIndex; + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + initHarnessesAndMaterializers( + harnesses, materializers, fromBackend, maxParallelism, fromParallelism, null); + + int idx = combinedHarnesses.applyAsInt(insertRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(insertRecord(2L, 1, "a2")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + + List subtaskStates = + snapshotHarnesses(harnesses, fromParallelism, 1L, useSavepoint); + + currentParallelismRef[0] = toParallelism; + initHarnessesAndMaterializers( + harnesses, materializers, toBackend, maxParallelism, toParallelism, subtaskStates); + + idx = combinedHarnesses.applyAsInt(insertRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 4L, 1, "a4")); + + subtaskStates = snapshotHarnesses(harnesses, toParallelism, 2L, useSavepoint); + + currentParallelismRef[0] = fromParallelism; + initHarnessesAndMaterializers( + harnesses, + materializers, + fromBackend, + maxParallelism, + fromParallelism, + subtaskStates); + + idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + idx = combinedHarnesses.applyAsInt(deleteRecord(2L, 1, "a2")); + ASSERTOR.shouldEmitNothing(harnesses[idx]); + + idx = combinedHarnesses.applyAsInt(deleteRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(deleteRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + Arrays.stream(harnesses) + .filter(Objects::nonNull) + .forEach(h -> h.setStateTtlProcessingTime(1002)); + + idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmitNothing(harnesses[idx]); + + Arrays.stream(harnesses) + .filter(Objects::nonNull) + .forEach( + h -> { + try { + h.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private void initHarnessesAndMaterializers( + KeyedOneInputStreamOperatorTestHarness[] harnesses, + OneInputStreamOperator[] materializers, + SinkUpsertMaterializerStateBackend backend, + int maxParallelism, + int parallelism, + @Nullable List subtaskStates) + throws Exception { + for (int i = 0; i < parallelism; ++i) { + materializers[i] = + SinkUpsertMaterializer.create( + TTL_CONFIG, + RowType.of(LOGICAL_TYPES), + EQUALISER, + UPSERT_KEY_EQUALISER, + null); + harnesses[i] = + new KeyedOneInputStreamOperatorTestHarness<>( + materializers[i], + KEY_SELECTOR, + KEY_SELECTOR.getProducedType(), + maxParallelism, + parallelism, + i); + + harnesses[i].setStateBackend(backend.create(false)); + + if (subtaskStates != null) { + OperatorSubtaskState operatorSubtaskState = + AbstractStreamOperatorTestHarness.repackageState( + subtaskStates.toArray(new OperatorSubtaskState[0])); + + harnesses[i].initializeState( + AbstractStreamOperatorTestHarness.repartitionOperatorState( + operatorSubtaskState, + maxParallelism, + subtaskStates.size(), + parallelism, + i)); + } + + harnesses[i].open(); + harnesses[i].setStateTtlProcessingTime(1); + } + } + + private List snapshotHarnesses( + KeyedOneInputStreamOperatorTestHarness[] harnesses, + int parallelism, + long checkpointId, + boolean useSavepoint) { + return Arrays.stream(harnesses, 0, parallelism) + .map( + h -> { + try { + return h.snapshotWithLocalState( + checkpointId, + 0L, + useSavepoint + ? SavepointType.savepoint( + SavepointFormatType.CANONICAL) + : CheckpointType.CHECKPOINT) + .getJobManagerOwnedState(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + + /** Test equalizer for records. */ + protected static class TestRecordEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(0) == row2.getLong(0) + && row1.getInt(1) == row2.getInt(1) + && row1.getString(2).equals(row2.getString(2)); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getLong(0), rd.getInt(1), rd.getString(2)); + } + } + + /** Test equalizer for upsert keys. */ + protected static class TestUpsertKeyEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getLong(0)); + } + } + + private static class MyGeneratedRecordEqualiser extends GeneratedRecordEqualiser { + + public MyGeneratedRecordEqualiser() { + super("", "", new Object[0]); + } + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + } + + private static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000); + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final RowDataKeySelector KEY_SELECTOR = + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, LOGICAL_TYPES); + + private static final RowDataHarnessAssertor ASSERTOR = + new RowDataHarnessAssertor(LOGICAL_TYPES); + + private static final GeneratedRecordEqualiser EQUALISER = new MyGeneratedRecordEqualiser(); + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; +}