Skip to content

Commit b0f15f2

Browse files
tzulitaiMartijnVisser
authored andcommitted
[FLINK-33231] [source] Properly evict offsetsToCommit cache on checkpoint complete if no offsets exist
Prior to this fix, if the offsets to commit for a given checkpoint is empty, which can be the case if no starting offsets were retrieved from Kafka yet, then on checkpoint completion the cache is not properly evicted up to the given checkpoint. This change fixes this such that in notifyOnCheckpointComplete, we shortcut the method execution to not need to try to commit the offsets since its empty anyways, and always remember to evict the cache up to the completed checkpoint.
1 parent 73f761f commit b0f15f2

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,13 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
132132
Map<TopicPartition, OffsetAndMetadata> committedPartitions =
133133
offsetsToCommit.get(checkpointId);
134134
if (committedPartitions == null) {
135-
LOG.debug(
136-
"Offsets for checkpoint {} either do not exist or have already been committed.",
137-
checkpointId);
135+
LOG.debug("Offsets for checkpoint {} have already been committed.", checkpointId);
136+
return;
137+
}
138+
139+
if (committedPartitions.isEmpty()) {
140+
LOG.debug("There are no offsets to commit for checkpoint {}.", checkpointId);
141+
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
138142
return;
139143
}
140144

@@ -167,14 +171,17 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
167171
entry ->
168172
committedPartitions.containsKey(
169173
entry.getKey()));
170-
while (!offsetsToCommit.isEmpty()
171-
&& offsetsToCommit.firstKey() <= checkpointId) {
172-
offsetsToCommit.remove(offsetsToCommit.firstKey());
173-
}
174+
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
174175
}
175176
});
176177
}
177178

179+
private void removeAllOffsetsToCommitUpToCheckpoint(long checkpointId) {
180+
while (!offsetsToCommit.isEmpty() && offsetsToCommit.firstKey() <= checkpointId) {
181+
offsetsToCommit.remove(offsetsToCommit.firstKey());
182+
}
183+
}
184+
178185
@Override
179186
protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit split) {
180187
return new KafkaPartitionSplitState(split);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.Arrays;
6262
import java.util.Collection;
6363
import java.util.Collections;
64+
import java.util.HashMap;
6465
import java.util.HashSet;
6566
import java.util.List;
6667
import java.util.Map;
@@ -187,7 +188,26 @@ void testCommitEmptyOffsets() throws Exception {
187188
(KafkaSourceReader<Integer>)
188189
createReader(Boundedness.CONTINUOUS_UNBOUNDED, groupId)) {
189190
reader.snapshotState(100L);
190-
reader.notifyCheckpointComplete(100L);
191+
reader.snapshotState(101L);
192+
reader.snapshotState(102L);
193+
194+
// After each snapshot, a new entry should have been added to the offsets-to-commit
195+
// cache for the checkpoint
196+
final Map<Long, Map<TopicPartition, OffsetAndMetadata>> expectedOffsetsToCommit =
197+
new HashMap<>();
198+
expectedOffsetsToCommit.put(100L, new HashMap<>());
199+
expectedOffsetsToCommit.put(101L, new HashMap<>());
200+
expectedOffsetsToCommit.put(102L, new HashMap<>());
201+
assertThat(reader.getOffsetsToCommit()).isEqualTo(expectedOffsetsToCommit);
202+
203+
// only notify up to checkpoint 101L; all offsets prior to 101L should be evicted from
204+
// cache, leaving only 102L
205+
reader.notifyCheckpointComplete(101L);
206+
207+
final Map<Long, Map<TopicPartition, OffsetAndMetadata>>
208+
expectedOffsetsToCommitAfterNotify = new HashMap<>();
209+
expectedOffsetsToCommitAfterNotify.put(102L, new HashMap<>());
210+
assertThat(reader.getOffsetsToCommit()).isEqualTo(expectedOffsetsToCommitAfterNotify);
191211
}
192212
// Verify the committed offsets.
193213
try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {

0 commit comments

Comments
 (0)