Skip to content

Commit 5a151f6

Browse files
dieggoluisEric Nascimentodwangatt
authored
[FLINK-37747][runtime] Use old subtask count for restored committable objects.
* [FLINK-37747][runtime] Use old subtask count for restored committable objects * trying to adapt test * Attempt to backport SinkV2ITCase from FLINK-37747 * Add more test use cases for SinkV2ITCase * Fix SinkV2ITCase test * Setup TestSinkV2WithPostCommitTopology global committer * Rename test SinkV2ITCase streaming source to be more specific --------- Co-authored-by: Eric Nascimento <[email protected]> Co-authored-by: David Wang <[email protected]>
1 parent 691b8f9 commit 5a151f6

File tree

4 files changed

+227
-28
lines changed

4 files changed

+227
-28
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,12 @@ private void commitAndEmit(CheckpointCommittableManager<CommT> committableManage
180180

181181
private void emit(CheckpointCommittableManager<CommT> committableManager) {
182182
int subtaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
183-
int numberOfSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
183+
// Ensure that numberOfSubtasks is in sync with the number of actually emitted
184+
// CommittableSummaries during upscaling recovery (see FLINK-37747).
185+
int numberOfSubtasks =
186+
Math.min(
187+
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
188+
committableManager.getNumberOfSubtasks());
184189
long checkpointId = committableManager.getCheckpointId();
185190
Collection<CommT> committables = committableManager.getSuccessfulCommittables();
186191
output.collect(

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.assertj.core.api.ListAssert;
3232
import org.junit.jupiter.api.Test;
3333
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.CsvSource;
3435
import org.junit.jupiter.params.provider.ValueSource;
3536

3637
import java.util.function.IntSupplier;
@@ -91,7 +92,7 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
9192
SinkAndCounters sinkAndCounters = sinkWithPostCommit();
9293
final OneInputStreamOperatorTestHarness<
9394
CommittableMessage<String>, CommittableMessage<String>>
94-
testHarness = createTestHarness(sinkAndCounters.sink, false, true);
95+
testHarness = createTestHarness(sinkAndCounters.sink, false, true, 1, 1, 0);
9596
testHarness.open();
9697
testHarness.setProcessingTime(0);
9798

@@ -125,11 +126,13 @@ void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
125126
testHarness.close();
126127
}
127128

128-
@Test
129-
void testStateRestore() throws Exception {
129+
@ParameterizedTest
130+
@CsvSource({"1, 10, 9", "2, 1, 0", "2, 2, 1"})
131+
void testStateRestoreWithScaling(
132+
int parallelismBeforeScaling, int parallelismAfterScaling, int subtaskIdAfterRecovery)
133+
throws Exception {
130134

131135
final int originalSubtaskId = 0;
132-
final int subtaskIdAfterRecovery = 9;
133136

134137
final OneInputStreamOperatorTestHarness<
135138
CommittableMessage<String>, CommittableMessage<String>>
@@ -138,8 +141,8 @@ void testStateRestore() throws Exception {
138141
sinkWithPostCommitWithRetry().sink,
139142
false,
140143
true,
141-
1,
142-
1,
144+
parallelismBeforeScaling,
145+
parallelismBeforeScaling,
143146
originalSubtaskId);
144147
testHarness.open();
145148

@@ -148,15 +151,17 @@ void testStateRestore() throws Exception {
148151
long checkpointId = 0L;
149152

150153
final CommittableSummary<String> committableSummary =
151-
new CommittableSummary<>(originalSubtaskId, 1, checkpointId, 1, 1, 0);
154+
new CommittableSummary<>(
155+
originalSubtaskId, parallelismBeforeScaling, checkpointId, 1, 1, 0);
152156
testHarness.processElement(new StreamRecord<>(committableSummary));
153157
final CommittableWithLineage<String> first =
154158
new CommittableWithLineage<>("1", checkpointId, originalSubtaskId);
155159
testHarness.processElement(new StreamRecord<>(first));
156160

157161
// another committable for the same checkpointId but from different subtask.
158162
final CommittableSummary<String> committableSummary2 =
159-
new CommittableSummary<>(originalSubtaskId + 1, 1, checkpointId, 1, 1, 0);
163+
new CommittableSummary<>(
164+
originalSubtaskId + 1, parallelismBeforeScaling, checkpointId, 1, 1, 0);
160165
testHarness.processElement(new StreamRecord<>(committableSummary2));
161166
final CommittableWithLineage<String> second =
162167
new CommittableWithLineage<>("2", checkpointId, originalSubtaskId + 1);
@@ -174,7 +179,12 @@ void testStateRestore() throws Exception {
174179
CommittableMessage<String>, CommittableMessage<String>>
175180
restored =
176181
createTestHarness(
177-
sinkAndCounters.sink, false, true, 10, 10, subtaskIdAfterRecovery);
182+
sinkAndCounters.sink,
183+
false,
184+
true,
185+
parallelismAfterScaling,
186+
parallelismAfterScaling,
187+
subtaskIdAfterRecovery);
178188

179189
restored.initializeState(snapshot);
180190
restored.open();
@@ -188,7 +198,8 @@ void testStateRestore() throws Exception {
188198
.hasSubtaskId(subtaskIdAfterRecovery)
189199
.hasFailedCommittables(0)
190200
.hasOverallCommittables(2)
191-
.hasPendingCommittables(0);
201+
.hasPendingCommittables(0)
202+
.hasNumberOfSubtasks(Math.min(parallelismBeforeScaling, parallelismAfterScaling));
192203

193204
// Expect the same checkpointId that the original snapshot was made with.
194205
records.element(1, as(committableWithLineage()))
@@ -303,17 +314,6 @@ void testEmitCommittablesBatch() throws Exception {
303314
testHarness.close();
304315
}
305316

306-
private OneInputStreamOperatorTestHarness<
307-
CommittableMessage<String>, CommittableMessage<String>>
308-
createTestHarness(
309-
SupportsCommitter<String> sink,
310-
boolean isBatchMode,
311-
boolean isCheckpointingEnabled)
312-
throws Exception {
313-
return new OneInputStreamOperatorTestHarness<>(
314-
new CommitterOperatorFactory<>(sink, isBatchMode, isCheckpointingEnabled));
315-
}
316-
317317
private OneInputStreamOperatorTestHarness<
318318
CommittableMessage<String>, CommittableMessage<String>>
319319
createTestHarness(

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public TestSinkV2WithPostCommitTopology(
243243
@Override
244244
public void addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
245245
StandardSinkTopologies.addGlobalCommitter(
246-
committables, DefaultCommitter::new, this::getCommittableSerializer);
246+
committables, () -> createCommitter(null), this::getCommittableSerializer);
247247
}
248248
}
249249

0 commit comments

Comments
 (0)