Skip to content

Commit 7025a3f

Browse files
authored
[FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode (#27015)
In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since streaming pipelines can continue to checkpoint even after their respective operators have been shut down, it is not safe to use a constant as this can lead to duplicate commits. However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should suffice in this scenario. Any pending committables should be processed by the ComitterOperator when the operator shuts down. No further checkpoints will take place. There are various connectors which rely on this behavior. I don't see any drawbacks from keeping this behavior for batch pipelines.
1 parent 2cddb46 commit 7025a3f

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
151151
public void endInput() throws Exception {
152152
if (!isCheckpointingEnabled || isBatchMode) {
153153
// There will be no final checkpoint, all committables should be committed here
154-
commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
154+
commitAndEmitCheckpoints(Long.MAX_VALUE);
155155
}
156156
}
157157

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@
3636
import org.assertj.core.api.ListAssert;
3737
import org.junit.jupiter.api.Test;
3838
import org.junit.jupiter.params.ParameterizedTest;
39+
import org.junit.jupiter.params.provider.Arguments;
3940
import org.junit.jupiter.params.provider.CsvSource;
41+
import org.junit.jupiter.params.provider.MethodSource;
4042
import org.junit.jupiter.params.provider.ValueSource;
4143

4244
import java.util.Collection;
4345
import java.util.function.IntSupplier;
46+
import java.util.stream.Stream;
4447

4548
import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
4649
import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
@@ -85,9 +88,17 @@ SinkAndCounters sinkWithoutPostCommit() {
8588
() -> committer.successfulCommits);
8689
}
8790

91+
static Stream<Arguments> testParameters() {
92+
return Stream.of(
93+
Arguments.of(true, false),
94+
Arguments.of(true, true),
95+
Arguments.of(false, false),
96+
Arguments.of(false, true));
97+
}
98+
8899
@ParameterizedTest
89-
@ValueSource(booleans = {true, false})
90-
void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
100+
@MethodSource("testParameters")
101+
void testEmitCommittables(boolean withPostCommitTopology, boolean isBatch) throws Exception {
91102
SinkAndCounters sinkAndCounters;
92103
if (withPostCommitTopology) {
93104
// Insert global committer to simulate post commit topology
@@ -99,7 +110,8 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
99110
CommittableMessage<String>, CommittableMessage<String>>
100111
testHarness =
101112
new OneInputStreamOperatorTestHarness<>(
102-
new CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
113+
new CommitterOperatorFactory<>(
114+
sinkAndCounters.sink, isBatch, true));
103115
testHarness.open();
104116

105117
final CommittableSummary<String> committableSummary =
@@ -127,6 +139,33 @@ void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
127139
testHarness.close();
128140
}
129141

142+
@Test
143+
void testEmitCommittablesBatch() throws Exception {
144+
SinkAndCounters sinkAndCounters = sinkWithoutPostCommit();
145+
final OneInputStreamOperatorTestHarness<
146+
CommittableMessage<String>, CommittableMessage<String>>
147+
testHarness =
148+
new OneInputStreamOperatorTestHarness<>(
149+
new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false));
150+
testHarness.open();
151+
152+
// Test that all committables up to Long.MAX_VALUE are committed.
153+
long checkpointId = Long.MAX_VALUE;
154+
final CommittableSummary<String> committableSummary =
155+
new CommittableSummary<>(1, 1, checkpointId, 1, 0);
156+
testHarness.processElement(new StreamRecord<>(committableSummary));
157+
final CommittableWithLineage<String> committableWithLineage =
158+
new CommittableWithLineage<>("1", checkpointId, 1);
159+
testHarness.processElement(new StreamRecord<>(committableWithLineage));
160+
161+
testHarness.endInput();
162+
163+
assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
164+
assertThat(testHarness.getOutput()).isEmpty();
165+
166+
testHarness.close();
167+
}
168+
130169
@Test
131170
void ensureAllCommittablesArrivedBeforeCommitting() throws Exception {
132171
SinkAndCounters sinkAndCounters = sinkWithPostCommit();

0 commit comments

Comments
 (0)