Skip to content

Commit 691b8f9

Browse files
authored
[release-1.20][FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode (#27013)
In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since streaming pipelines can continue to checkpoint even after their respective operators have been shut down, it is not safe to use a constant as this can lead to duplicate commits. However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should suffice in this scenario. Any pending committables should be processed by the ComitterOperator when the operator shuts down. No further checkpoints will take place. There are various connectors which rely on this behavior. I don't see any drawbacks from keeping this behavior for batch pipelines.
1 parent 6027a7b commit 691b8f9

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
148148
public void endInput() throws Exception {
149149
if (!isCheckpointingEnabled || isBatchMode) {
150150
// There will be no final checkpoint, all committables should be committed here
151-
commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
151+
commitAndEmitCheckpoints(Long.MAX_VALUE);
152152
}
153153
}
154154

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,33 @@ void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Ex
276276
assertThat(testHarness.getOutput()).hasSize(2);
277277
}
278278

279+
@Test
280+
void testEmitCommittablesBatch() throws Exception {
281+
SinkAndCounters sinkAndCounters = sinkWithoutPostCommit();
282+
final OneInputStreamOperatorTestHarness<
283+
CommittableMessage<String>, CommittableMessage<String>>
284+
testHarness =
285+
new OneInputStreamOperatorTestHarness<>(
286+
new CommitterOperatorFactory<>(sinkAndCounters.sink, true, false));
287+
testHarness.open();
288+
289+
// Test that all committables up to Long.MAX_VALUE are committed.
290+
long checkpointId = Long.MAX_VALUE;
291+
final CommittableSummary<String> committableSummary =
292+
new CommittableSummary<>(1, 1, checkpointId, 1, 0, 0);
293+
testHarness.processElement(new StreamRecord<>(committableSummary));
294+
final CommittableWithLineage<String> committableWithLineage =
295+
new CommittableWithLineage<>("1", checkpointId, 1);
296+
testHarness.processElement(new StreamRecord<>(committableWithLineage));
297+
298+
testHarness.endInput();
299+
300+
assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
301+
assertThat(testHarness.getOutput()).isEmpty();
302+
303+
testHarness.close();
304+
}
305+
279306
private OneInputStreamOperatorTestHarness<
280307
CommittableMessage<String>, CommittableMessage<String>>
281308
createTestHarness(

0 commit comments

Comments
 (0)