diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index a3ecabe715f62..14688e4ee742d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -72,7 +72,7 @@ public static > TaggedOperatorSubtaskState s operator.snapshotState(checkpointId, timestamp, options, storage); OperatorSubtaskState state = - new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState(); + OperatorSnapshotFinalizer.create(snapshotInProgress).getJobManagerOwnedState(); operator.notifyCheckpointComplete(checkpointId); return new TaggedOperatorSubtaskState(index, state); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java index c308a6cb8aeec..455c2489a741e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java @@ -27,12 +27,11 @@ import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.util.concurrent.FutureUtils; -import javax.annotation.Nonnull; - import java.util.concurrent.ExecutionException; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singletonOrEmpty; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * This class finalizes {@link OperatorSnapshotFutures}. Each object is created with a {@link @@ -47,8 +46,9 @@ public class OperatorSnapshotFinalizer { /** Secondary replica of the operator subtask state for faster, local recovery on TM. */ private final OperatorSubtaskState taskLocalState; - public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFutures) + public static OperatorSnapshotFinalizer create(OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException { + checkNotNull(snapshotFutures); SnapshotResult keyedManaged = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); @@ -68,7 +68,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture SnapshotResult> resultSubpartition = snapshotFutures.getResultSubpartitionStateFuture().get(); - jobManagerOwnedState = + OperatorSubtaskState jobManagerOwnedState = OperatorSubtaskState.builder() .setManagedOperatorState( singletonOrEmpty(operatorManaged.getJobManagerOwnedSnapshot())) @@ -83,7 +83,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture emptyIfNull(resultSubpartition.getJobManagerOwnedSnapshot())) .build(); - taskLocalState = + OperatorSubtaskState taskLocalState = OperatorSubtaskState.builder() .setManagedOperatorState( singletonOrEmpty(operatorManaged.getTaskLocalSnapshot())) @@ -94,6 +94,14 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture .setResultSubpartitionState( emptyIfNull(resultSubpartition.getTaskLocalSnapshot())) .build(); + + return new OperatorSnapshotFinalizer(jobManagerOwnedState, taskLocalState); + } + + public OperatorSnapshotFinalizer( + OperatorSubtaskState jobManagerOwnedState, OperatorSubtaskState taskLocalState) { + this.jobManagerOwnedState = checkNotNull(jobManagerOwnedState); + this.taskLocalState = checkNotNull(taskLocalState); } public OperatorSubtaskState getTaskLocalState() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index 2e859d2ab8d2c..ee674fc1c2ad9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -189,7 +189,7 @@ private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws Exception // finalize the async part of all by executing all snapshot runnables OperatorSnapshotFinalizer finalizedSnapshots = - new OperatorSnapshotFinalizer(snapshotInProgress); + OperatorSnapshotFinalizer.create(snapshotInProgress); jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID( operatorID, finalizedSnapshots.getJobManagerOwnedState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java index a64905098af06..17d6351e3655d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java @@ -93,7 +93,7 @@ void testRunAndExtract() throws Exception { assertThat(f).isNotDone(); } - OperatorSnapshotFinalizer finalizer = new OperatorSnapshotFinalizer(snapshotFutures); + OperatorSnapshotFinalizer finalizer = OperatorSnapshotFinalizer.create(snapshotFutures); for (Future f : snapshotFutures.getAllFutures()) { assertThat(f).isDone(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 31a6019a50a6c..b72f7299f4b8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -761,7 +761,7 @@ public OperatorSnapshotFinalizer snapshotWithLocalState( checkpointStorageAccess.resolveCheckpointStorageLocation( checkpointId, locationReference)); - return new OperatorSnapshotFinalizer(operatorStateResult); + return OperatorSnapshotFinalizer.create(operatorStateResult); } /** diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java index 8ba4b792e2b05..cd5f66f7a8da9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java @@ -31,6 +31,8 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; @@ -222,4 +224,18 @@ private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) { } return equaliser.equals(newRow, oldRow); } + + public static SinkUpsertMaterializer create( + StateTtlConfig ttlConfig, + RowType physicalRowType, + GeneratedRecordEqualiser rowEqualiser, + GeneratedRecordEqualiser upsertKeyEqualiser, + int[] inputUpsertKey) { + return new SinkUpsertMaterializer( + ttlConfig, + InternalSerializers.create(physicalRowType), + rowEqualiser, + upsertKeyEqualiser, + inputUpsertKey); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java new file mode 100644 index 0000000000000..1ab7ac4ae8468 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.util.MigrationTest; +import org.apache.flink.types.RowKind; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.FlinkVersion.current; +import static org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY_EQUALISER; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; + +/** Test for {@link SinkUpsertMaterializer} migration. */ +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerMigrationTest implements MigrationTest { + + private static final String FOLDER_NAME = "sink-upsert-materializer"; + + @Parameterized.Parameter(0) + @SuppressWarnings({"ClassEscapesDefinedScope", "DefaultAnnotationParam"}) + public SinkOperationMode migrateFrom; + + @Parameterized.Parameter(1) + @SuppressWarnings("ClassEscapesDefinedScope") + public SinkOperationMode migrateTo; + + @Parameterized.Parameters(name = "{0} -> {1}") + public static List parameters() { + List result = new ArrayList<>(); + Set versions = FlinkVersion.rangeOf(FlinkVersion.v2_2, FlinkVersion.v2_2); + for (FlinkVersion fromVersion : versions) { + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add( + new Object[] { + new SinkOperationMode(fromVersion, backend), + new SinkOperationMode(current(), backend) + }); + } + } + return result; + } + + @Test + public void testMigration() throws Exception { + String path = getResourceFilename(FOLDER_NAME + "/" + getFileName(migrateFrom)); + try (OneInputStreamOperatorTestHarness harness = + createHarness(migrateTo, path)) { + testCorrectnessAfterSnapshot(harness); + } + } + + private OneInputStreamOperatorTestHarness createHarness( + SinkOperationMode mode, String snapshotPath) throws Exception { + int[] inputUpsertKey = {UPSERT_KEY}; + OneInputStreamOperator materializer = + SinkUpsertMaterializer.create( + TTL_CONFIG, + RowType.of(LOGICAL_TYPES), + EQUALISER, + UPSERT_KEY_EQUALISER, + inputUpsertKey); + KeyedOneInputStreamOperatorTestHarness harness = + SinkUpsertMaterializerTest.createHarness( + materializer, mode.stateBackend, LOGICAL_TYPES); + harness.setup(new RowDataSerializer(LOGICAL_TYPES)); + if (snapshotPath != null) { + harness.initializeState(snapshotPath); + } + harness.open(); + harness.setStateTtlProcessingTime(1); + return harness; + } + + private void testCorrectnessBeforeSnapshot( + OneInputStreamOperatorTestHarness testHarness) throws Exception { + + testHarness.processElement(insertRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + testHarness.processElement(updateAfterRecord(1L, 1, "a11")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + + testHarness.processElement(insertRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + } + + private void testCorrectnessAfterSnapshot( + OneInputStreamOperatorTestHarness testHarness) throws Exception { + testHarness.processElement(deleteRecord(1L, 1, "a111")); + ASSERTOR.shouldEmitNothing(testHarness); + + testHarness.processElement(deleteRecord(3L, 1, "a33")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + + testHarness.processElement(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + testHarness.setStateTtlProcessingTime(1002); + testHarness.processElement(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmitNothing(testHarness); + } + + private static String getFileName(SinkOperationMode mode) { + return String.format( + "migration-flink-%s-%s-%s-snapshot", mode.version, mode.stateBackend, "V1"); + } + + @SnapshotsGenerator + public void writeSnapshot(FlinkVersion version) throws Exception { + for (SinkUpsertMaterializerStateBackend stateBackend : + SinkUpsertMaterializerStateBackend.values()) { + SinkOperationMode mode = new SinkOperationMode(version, stateBackend); + try (OneInputStreamOperatorTestHarness harness = + createHarness(mode, null)) { + testCorrectnessBeforeSnapshot(harness); + Path parent = Paths.get("src/test/resources", FOLDER_NAME); + Files.createDirectories(parent); + OperatorSnapshotUtil.writeStateHandle( + harness.snapshot(1L, 1L), parent.resolve(getFileName(mode)).toString()); + } + } + } + + public static void main(String... s) throws Exception { + // Run this to manually generate snapshot files for migration tests + // set working directory to flink-table/flink-table-runtime/ + new SinkUpsertMaterializerMigrationTest().writeSnapshot(current()); + } + + private static class SinkOperationMode { + private final FlinkVersion version; + private final SinkUpsertMaterializerStateBackend stateBackend; + + private SinkOperationMode( + FlinkVersion version, SinkUpsertMaterializerStateBackend stateBackend) { + this.version = version; + this.stateBackend = stateBackend; + } + + @Override + public String toString() { + return String.format("flink=%s, state=%s}", version, stateBackend); + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java new file mode 100644 index 0000000000000..9d8990264bce8 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.SavepointType; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.runtime.util.StateConfigUtil; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.ToIntFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; + +/** Rescaling and migration unit tests for {@link SinkUpsertMaterializer}. */ +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerRescalingTest { + + @Parameter public SinkUpsertMaterializerStateBackend backend; + + @Parameterized.Parameters(name = "stateBackend={0}") + public static Object[][] generateTestParameters() { + List result = new ArrayList<>(); + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add(new Object[] {backend}); + } + return result.toArray(new Object[0][]); + } + + @Test + public void testScaleUpThenDown() throws Exception { + testRescaleFromToFrom(10, 2, 3, backend, backend); + } + + @Test + public void testScaleDownThenUp() throws Exception { + testRescaleFromToFrom(10, 3, 2, backend, backend); + } + + @Test + public void testRecovery() throws Exception { + testRescaleFromToFrom(1, 1, 1, backend, backend); + } + + @Test + public void testForwardAndBackwardMigration() throws Exception { + testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend)); + } + + @Test + public void testScaleUpThenDownWithMigration() throws Exception { + testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend)); + } + + @Test + public void testScaleDownThenUpWithMigration() throws Exception { + testRescaleFromToFrom( + 7, 5, 1, backend, getOtherBackend(SinkUpsertMaterializerStateBackend.HEAP)); + } + + private SinkUpsertMaterializerStateBackend getOtherBackend( + SinkUpsertMaterializerStateBackend backend) { + return backend == SinkUpsertMaterializerStateBackend.HEAP + ? SinkUpsertMaterializerStateBackend.ROCKSDB + : SinkUpsertMaterializerStateBackend.HEAP; + } + + @SuppressWarnings("unchecked") + private void testRescaleFromToFrom( + final int maxParallelism, + final int fromParallelism, + final int toParallelism, + final SinkUpsertMaterializerStateBackend fromBackend, + final SinkUpsertMaterializerStateBackend toBackend) + throws Exception { + + int[] currentParallelismRef = new int[] {fromParallelism}; + + boolean useSavepoint = fromBackend != toBackend; + + OneInputStreamOperator[] materializers = + new OneInputStreamOperator[maxParallelism]; + KeyedOneInputStreamOperatorTestHarness[] harnesses = + new KeyedOneInputStreamOperatorTestHarness[maxParallelism]; + + final ToIntFunction> combinedHarnesses = + (r) -> { + try { + int subtaskIndex = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + KEY_SELECTOR.getKey(r.getValue()), + maxParallelism, + currentParallelismRef[0]); + + harnesses[subtaskIndex].processElement(r); + return subtaskIndex; + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + initHarnessesAndMaterializers( + harnesses, materializers, fromBackend, maxParallelism, fromParallelism, null); + + int idx = combinedHarnesses.applyAsInt(insertRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(insertRecord(2L, 1, "a2")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + + List subtaskStates = + snapshotHarnesses(harnesses, fromParallelism, 1L, useSavepoint); + + currentParallelismRef[0] = toParallelism; + initHarnessesAndMaterializers( + harnesses, materializers, toBackend, maxParallelism, toParallelism, subtaskStates); + + idx = combinedHarnesses.applyAsInt(insertRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 4L, 1, "a4")); + + subtaskStates = snapshotHarnesses(harnesses, toParallelism, 2L, useSavepoint); + + currentParallelismRef[0] = fromParallelism; + initHarnessesAndMaterializers( + harnesses, + materializers, + fromBackend, + maxParallelism, + fromParallelism, + subtaskStates); + + idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + idx = combinedHarnesses.applyAsInt(deleteRecord(2L, 1, "a2")); + ASSERTOR.shouldEmitNothing(harnesses[idx]); + + idx = combinedHarnesses.applyAsInt(deleteRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(deleteRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + Arrays.stream(harnesses) + .filter(Objects::nonNull) + .forEach(h -> h.setStateTtlProcessingTime(1002)); + + idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmitNothing(harnesses[idx]); + + Arrays.stream(harnesses) + .filter(Objects::nonNull) + .forEach( + h -> { + try { + h.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private void initHarnessesAndMaterializers( + KeyedOneInputStreamOperatorTestHarness[] harnesses, + OneInputStreamOperator[] materializers, + SinkUpsertMaterializerStateBackend backend, + int maxParallelism, + int parallelism, + @Nullable List subtaskStates) + throws Exception { + for (int i = 0; i < parallelism; ++i) { + materializers[i] = + SinkUpsertMaterializer.create( + TTL_CONFIG, + RowType.of(LOGICAL_TYPES), + EQUALISER, + UPSERT_KEY_EQUALISER, + null); + harnesses[i] = + new KeyedOneInputStreamOperatorTestHarness<>( + materializers[i], + KEY_SELECTOR, + KEY_SELECTOR.getProducedType(), + maxParallelism, + parallelism, + i); + + harnesses[i].setStateBackend(backend.create(false)); + + if (subtaskStates != null) { + OperatorSubtaskState operatorSubtaskState = + AbstractStreamOperatorTestHarness.repackageState( + subtaskStates.toArray(new OperatorSubtaskState[0])); + + harnesses[i].initializeState( + AbstractStreamOperatorTestHarness.repartitionOperatorState( + operatorSubtaskState, + maxParallelism, + subtaskStates.size(), + parallelism, + i)); + } + + harnesses[i].open(); + harnesses[i].setStateTtlProcessingTime(1); + } + } + + private List snapshotHarnesses( + KeyedOneInputStreamOperatorTestHarness[] harnesses, + int parallelism, + long checkpointId, + boolean useSavepoint) { + return Arrays.stream(harnesses, 0, parallelism) + .map( + h -> { + try { + return h.snapshotWithLocalState( + checkpointId, + 0L, + useSavepoint + ? SavepointType.savepoint( + SavepointFormatType.CANONICAL) + : CheckpointType.CHECKPOINT) + .getJobManagerOwnedState(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + + /** Test equalizer for records. */ + protected static class TestRecordEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(0) == row2.getLong(0) + && row1.getInt(1) == row2.getInt(1) + && row1.getString(2).equals(row2.getString(2)); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getLong(0), rd.getInt(1), rd.getString(2)); + } + } + + /** Test equalizer for upsert keys. */ + protected static class TestUpsertKeyEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getLong(0)); + } + } + + private static class MyGeneratedRecordEqualiser extends GeneratedRecordEqualiser { + + public MyGeneratedRecordEqualiser() { + super("", "", new Object[0]); + } + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + } + + private static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000); + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final RowDataKeySelector KEY_SELECTOR = + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, LOGICAL_TYPES); + + private static final RowDataHarnessAssertor ASSERTOR = + new RowDataHarnessAssertor(LOGICAL_TYPES); + + private static final GeneratedRecordEqualiser EQUALISER = new MyGeneratedRecordEqualiser(); + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java new file mode 100644 index 0000000000000..b675df2893e83 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; + +/** StateBackend to test SinkUpsertMaterializer with. */ +public enum SinkUpsertMaterializerStateBackend { + HEAP { + + public StateBackend create(boolean incrementalIfSupported) { + return new HashMapStateBackend(); + } + }, + ROCKSDB { + + public StateBackend create(boolean incrementalIfSupported) { + return new EmbeddedRocksDBStateBackend(incrementalIfSupported); + } + }; + + public abstract StateBackend create(boolean incrementalIfSupported); +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index 322ac9e1cebe7..9fc4b1cf88d8f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -19,40 +19,73 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; -import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; import org.apache.flink.types.RowKind; -import org.junit.jupiter.api.Test; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.api.java.tuple.Tuple2.of; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.junit.Assert.assertEquals; /** Test for {@link SinkUpsertMaterializer}. */ -class SinkUpsertMaterializerTest { +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerTest { + + static final int UPSERT_KEY = 0; + + @SuppressWarnings("ClassEscapesDefinedScope") + @Parameter + public SinkUpsertMaterializerStateBackend stateBackend; - private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1000); - private final LogicalType[] types = + @Parameterized.Parameters(name = "stateBackend={0}") + public static Object[][] generateTestParameters() { + List result = new ArrayList<>(); + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add(new Object[] {backend}); + } + return result.toArray(new Object[0][]); + } + + static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000); + static final LogicalType[] LOGICAL_TYPES = new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; - private final RowDataSerializer serializer = new RowDataSerializer(types); - private final RowDataKeySelector keySelector = - HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types); - private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + static final RowDataHarnessAssertor ASSERTOR = new RowDataHarnessAssertor(LOGICAL_TYPES); - private final GeneratedRecordEqualiser equaliser = + static final GeneratedRecordEqualiser EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) { @Override @@ -61,7 +94,7 @@ public RecordEqualiser newInstance(ClassLoader classLoader) { } }; - private final GeneratedRecordEqualiser upsertKeyEqualiser = + static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) { @Override @@ -70,83 +103,236 @@ public RecordEqualiser newInstance(ClassLoader classLoader) { } }; + /** + * If the composite serializer in {@link SinkUpsertMaterializer} works on projected fields then + * it might use the wrong serializer, e.g. the {@link VarCharType} instead of the {@link + * IntType}. That might cause {@link ArrayIndexOutOfBoundsException} because string serializer + * expects the first number to be the length of the string. + */ @Test - void test() throws Exception { - SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer( - ttlConfig, serializer, equaliser, upsertKeyEqualiser, null); + public void testUpsertKeySerializerFailure() throws Exception { + LogicalType[] types = new LogicalType[] {new VarCharType(), new IntType()}; + // project int field, while in the original record it's string + + OneInputStreamOperator materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + // -1 is not a valid string length + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", -1)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", -1)); + + // 999 as a string length is too long + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", 999)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", 999)); + } + } + + @Test + public void testUpsertKeySerializerSilentCorruption() throws Exception { + LogicalType[] types = + new LogicalType[] {new VarCharType(), new BigIntType(), new IntType()}; + // project int field, while in the original record it's string + + OneInputStreamOperator materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + + // this might serialize upsert key as 32-character string potentially including "97" + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", 32L, 97)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", 32L, 97)); + + // but here it might include "98" which would result in no output and test failure + testHarness.processElement(binaryRecord(RowKind.DELETE, "any string", 32L, 98)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, "any string", 32L, 98)); + } + } + + @Test + public void testUpsertEqualizer() throws Exception { + LogicalType[] types = new LogicalType[] {new IntType(), new BigIntType()}; + + OneInputStreamOperator materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + + // upsert key is 33, 0 is unused + testHarness.processElement(binaryRecord(RowKind.INSERT, 0, 33L)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 0, 33L)); + + // upsert key 33 - should remove AND clear the state involving upsert equalizer + // equalizer might fail if it's used on un-projected records + testHarness.processElement(binaryRecord(RowKind.DELETE, 1, 33L)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1, 33L)); + } + } + + @Test + public void testNoUpsertKeyFlow() throws Exception { KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - materializer, keySelector, keySelector.getProducedType()); + createHarness(createOperatorWithoutUpsertKey()); testHarness.open(); testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(insertRecord(2L, 1, "a2")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); testHarness.processElement(insertRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(2L, 1, "a2")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); testHarness.processElement(deleteRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); testHarness.processElement(insertRecord(4L, 1, "a4")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.close(); } @Test - void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { - SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer( - ttlConfig, serializer, equaliser, upsertKeyEqualiser, new int[] {0}); + public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { + OneInputStreamOperator materializer = + createOperator(LOGICAL_TYPES, UPSERT_KEY); KeyedOneInputStreamOperatorTestHarness testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - materializer, keySelector, keySelector.getProducedType()); + createHarness(materializer); testHarness.open(); testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(updateAfterRecord(1L, 1, "a11")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); testHarness.processElement(insertRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(1L, 1, "a111")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a33")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); testHarness.processElement(insertRecord(4L, 1, "a4")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); + + testHarness.close(); + } + + @Test + public void testRetractionWithoutUpsertKey() throws Exception { + testRetractions((int[]) null); + } + + @Test + public void testRetractionWithUpsertKey() throws Exception { + testRetractions(UPSERT_KEY); + } + + public void testRetractions(int... upsertKey) throws Exception { + testThreeElementProcessing( + "retract first - should emit nothing until empty - then delete", + upsertKey, + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, "a3"))); + testThreeElementProcessing( + "retract middle - should emit nothing until empty - then delete", + upsertKey, + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, "a3"))); + testThreeElementProcessing( + "retract last - should emit penultimate until empty - then delete", + upsertKey, + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), + of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, "a1"))); + testThreeElementProcessing( + "retract in arbitrary order: 1,3,2", + upsertKey, + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, "a2"))); + testThreeElementProcessing( + "retract in arbitrary order: 2,3,1", + upsertKey, + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), + of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, "a1"))); + testThreeElementProcessing( + "retract in arbitrary order: 3,1,2", + upsertKey, + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, "a2"))); + } + + // boilerplate for common test case of processing starting with three elements + @SafeVarargs + private void testThreeElementProcessing( + String description, + int[] upsertKey, + Tuple2, RowData>... inputAndOutput) + throws Exception { + @SuppressWarnings("rawtypes") + Tuple2[] merged = new Tuple2[inputAndOutput.length + 3]; + merged[0] = of(insertRecord(1L, 1, "a1"), rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + merged[1] = of(insertRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + merged[2] = of(insertRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + System.arraycopy(inputAndOutput, 0, merged, 3, inputAndOutput.length); + testElementProcessing(description, upsertKey, merged); + } + + @SafeVarargs + private void testElementProcessing( + String description, + int[] upsertKey, + Tuple2, RowData>... inputAndOutput) + throws Exception { + OneInputStreamOperator materializer = + createOperator(LOGICAL_TYPES, upsertKey); + KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(materializer); + + testHarness.open(); + + for (Tuple2, RowData> el0 : inputAndOutput) { + testHarness.processElement(el0.f0); + if (el0.f1 == null) { + ASSERTOR.shouldEmitNothing(testHarness); + } else { + ASSERTOR.shouldEmit(testHarness, description, el0.f1); + } + } testHarness.close(); } @@ -162,9 +348,118 @@ public boolean equals(RowData row1, RowData row2) { } private static class TestUpsertKeyEqualiser implements RecordEqualiser { + @Override public boolean equals(RowData row1, RowData row2) { - return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(UPSERT_KEY) == row2.getLong(UPSERT_KEY); } } + + static KeyedOneInputStreamOperatorTestHarness createHarness( + OneInputStreamOperator materializer, + SinkUpsertMaterializerStateBackend backend, + LogicalType[] types) + throws Exception { + KeyedOneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + materializer, + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types), + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types) + .getProducedType()); + testHarness.setStateBackend(backend.create(true)); + return testHarness; + } + + @Test + public void testEmptyUpsertKey() throws Exception { + testRecovery(createOperator(LOGICAL_TYPES), createOperatorWithoutUpsertKey()); + testRecovery(createOperatorWithoutUpsertKey(), createOperator(LOGICAL_TYPES)); + } + + private void testRecovery( + OneInputStreamOperator from, + OneInputStreamOperator to) + throws Exception { + OperatorSubtaskState snapshot; + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(from)) { + testHarness.open(); + snapshot = testHarness.snapshot(1L, 1L); + } + try (KeyedOneInputStreamOperatorTestHarness testHarness = + createHarness(to)) { + testHarness.initializeState(snapshot); + testHarness.open(); + } + } + + private KeyedOneInputStreamOperatorTestHarness createHarness( + OneInputStreamOperator m2) throws Exception { + return createHarness(m2, stateBackend, LOGICAL_TYPES); + } + + @Test + public void testVersionStateGrowth() throws Exception { + int dop = 2; + int numIterations = 10; + OperatorSnapshotFinalizer[] snapshots = new OperatorSnapshotFinalizer[dop]; + long[] prevStateSizes = new long[dop]; + for (int i = 0; i < numIterations; i++) { + for (int subtask = 0; subtask < dop; subtask++) { + snapshots[subtask] = initAndSnapshot(snapshots[subtask], i); + long currentStateSize = + snapshots[subtask] + .getJobManagerOwnedState() + .getManagedOperatorState() + .stream() + .mapToLong(StateObject::getStateSize) + .sum(); + if (i > 0) { + assertEquals(prevStateSizes[subtask], currentStateSize); + } + prevStateSizes[subtask] = currentStateSize; + } + List union = + Arrays.stream(snapshots) + .flatMap( + s -> + s + .getJobManagerOwnedState() + .getManagedOperatorState() + .stream()) + .collect(Collectors.toList()); + for (int j = 0; j < dop; j++) { + snapshots[j] = + new OperatorSnapshotFinalizer( + snapshots[j].getJobManagerOwnedState().toBuilder() + .setManagedOperatorState(new StateObjectCollection<>(union)) + .build(), + snapshots[j].getTaskLocalState()); + } + } + } + + private OperatorSnapshotFinalizer initAndSnapshot( + OperatorSnapshotFinalizer from, int newCheckpointID) throws Exception { + try (OneInputStreamOperatorTestHarness harness = + createHarness( + createOperator(LOGICAL_TYPES, UPSERT_KEY), stateBackend, LOGICAL_TYPES)) { + if (from != null) { + harness.initializeState(from.getJobManagerOwnedState()); + } + harness.open(); + return harness.snapshotWithLocalState(newCheckpointID, newCheckpointID); + } + } + + private OneInputStreamOperator createOperatorWithoutUpsertKey() { + return createOperator(LOGICAL_TYPES, (int[]) null); + } + + private OneInputStreamOperator createOperator( + LogicalType[] types, int... upsertKey) { + return SinkUpsertMaterializer.create( + TTL_CONFIG, RowType.of(types), EQUALISER, UPSERT_KEY_EQUALISER, upsertKey); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java index d7fbef9f16293..b4b074f724913 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java @@ -61,6 +61,14 @@ public void shouldEmit( assertThat(getEmittedRows(harness)).containsExactly(expected); } + /** Assert the test harness should emit records exactly same as the expected records. */ + public void shouldEmit( + AbstractStreamOperatorTestHarness harness, + String description, + RowData... expected) { + assertThat(getEmittedRows(harness)).describedAs(description).containsExactly(expected); + } + /** Assert the test harness should emit all records regardless of the order. */ public void shouldEmitAll( AbstractStreamOperatorTestHarness harness, RowData... expected) { diff --git a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot new file mode 100644 index 0000000000000..be4fd71e72aca Binary files /dev/null and b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot differ diff --git a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot new file mode 100644 index 0000000000000..1832c7a50ebf7 Binary files /dev/null and b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot differ diff --git a/pom.xml b/pom.xml index 94cd411a2955d..01e91e996708f 100644 --- a/pom.xml +++ b/pom.xml @@ -1659,6 +1659,7 @@ under the License. **/src/test/resources/**/serializer-snapshot **/src/test/resources/**/test-data **/src/test/resources/*-snapshot + **/src/test/resources/**/*-snapshot **/src/test/resources/*.snapshot **/src/test/resources/*-savepoint/** **/src/test/resources/*-savepoint-native/**