Skip to content

Commit 946df1e

Browse files
tanjialiangtzulitai
authored andcommitted
[FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data
This closes #52.
1 parent 37cbb83 commit 946df1e

File tree

13 files changed

+230
-32
lines changed

13 files changed

+230
-32
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kafka.source.enumerator.initializer;
20+
21+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
22+
import org.apache.kafka.common.TopicPartition;
23+
24+
import java.util.Collection;
25+
import java.util.Map;
26+
27+
/**
28+
* An implementation of {@link OffsetsInitializer} to initialize the offsets based on a
29+
* latest-offset.
30+
*
31+
* <p>Package private and should be instantiated via {@link OffsetsInitializer}.
32+
*/
33+
class LatestOffsetsInitializer implements OffsetsInitializer {
34+
private static final long serialVersionUID = 3014700244733286989L;
35+
36+
@Override
37+
public Map<TopicPartition, Long> getPartitionOffsets(
38+
Collection<TopicPartition> partitions,
39+
PartitionOffsetsRetriever partitionOffsetsRetriever) {
40+
return partitionOffsetsRetriever.endOffsets(partitions);
41+
}
42+
43+
@Override
44+
public OffsetResetStrategy getAutoOffsetResetStrategy() {
45+
return OffsetResetStrategy.LATEST;
46+
}
47+
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ static OffsetsInitializer earliest() {
155155
* @return an {@link OffsetsInitializer} which initializes the offsets to the latest offsets.
156156
*/
157157
static OffsetsInitializer latest() {
158-
return new ReaderHandledOffsetsInitializer(
159-
KafkaPartitionSplit.LATEST_OFFSET, OffsetResetStrategy.LATEST);
158+
return new LatestOffsetsInitializer();
160159
}
161160

162161
/**

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
import static org.apache.flink.util.Preconditions.checkState;
3333

3434
/**
35-
* A initializer that initialize the partitions to the earliest / latest / last-committed offsets.
36-
* The offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of
37-
* by the {@code KafkaSourceEnumerator}.
35+
* A initializer that initialize the partitions to the earliest / last-committed offsets. The
36+
* offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of by
37+
* the {@code KafkaSourceEnumerator}.
3838
*
3939
* <p>Package private and should be instantiated via {@link OffsetsInitializer}.
4040
*/
@@ -46,8 +46,7 @@ class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInit
4646
/**
4747
* The only valid value for startingOffset is following. {@link
4848
* KafkaPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
49-
* KafkaPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link KafkaPartitionSplit#COMMITTED_OFFSET
50-
* COMMITTED_OFFSET}
49+
* KafkaPartitionSplit#COMMITTED_OFFSET COMMITTED_OFFSET}
5150
*/
5251
ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy offsetResetStrategy) {
5352
this.startingOffset = startingOffset;

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,18 @@
3535
public class KafkaPartitionSplit implements SourceSplit {
3636
public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE;
3737
// Indicating the split should consume from the latest.
38-
public static final long LATEST_OFFSET = -1;
38+
// @deprecated Only be used for compatibility with the history state, see FLINK-28303
39+
@Deprecated public static final long LATEST_OFFSET = -1;
3940
// Indicating the split should consume from the earliest.
4041
public static final long EARLIEST_OFFSET = -2;
4142
// Indicating the split should consume from the last committed offset.
4243
public static final long COMMITTED_OFFSET = -3;
4344

4445
// Valid special starting offsets
4546
public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
46-
new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET));
47+
new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
4748
public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
48-
new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
49+
new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
4950

5051
private final TopicPartition tp;
5152
private final long startingOffset;
@@ -132,21 +133,17 @@ private static void verifyInitialOffset(
132133
String.format(
133134
"Invalid starting offset %d is specified for partition %s. "
134135
+ "It should either be non-negative or be one of the "
135-
+ "[%d(earliest), %d(latest), %d(committed)].",
136-
startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
136+
+ "[%d(earliest), %d(committed)].",
137+
startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET));
137138
}
138139

139140
if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
140141
throw new FlinkRuntimeException(
141142
String.format(
142143
"Illegal stopping offset %d is specified for partition %s. "
143144
+ "It should either be non-negative or be one of the "
144-
+ "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
145-
stoppingOffset,
146-
tp,
147-
LATEST_OFFSET,
148-
COMMITTED_OFFSET,
149-
NO_STOPPING_OFFSET));
145+
+ "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
146+
stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
150147
}
151148
}
152149
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public void testDiscoverPartitionsPeriodically() throws Throwable {
300300
getAllAssignSplits(context, PRE_EXISTING_TOPICS);
301301
assertThat(initialPartitionAssign)
302302
.extracting(KafkaPartitionSplit::getStartingOffset)
303-
.containsOnly(KafkaPartitionSplit.LATEST_OFFSET);
303+
.containsOnly((long) KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
304304
List<KafkaPartitionSplit> newPartitionAssign =
305305
getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_NAME));
306306
assertThat(newPartitionAssign)

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void testLatestOffsetsInitializer() {
8484
assertThat(offsets).hasSameSizeAs(partitions);
8585
assertThat(offsets.keySet()).containsAll(partitions);
8686
for (long offset : offsets.values()) {
87-
assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
87+
assertThat(offset).isEqualTo(KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
8888
}
8989
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
9090
}

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ public void testAssignEmptySplit() throws Exception {
246246
final KafkaPartitionSplit emptySplit =
247247
new KafkaPartitionSplit(
248248
new TopicPartition(TOPIC2, 0),
249-
KafkaPartitionSplit.LATEST_OFFSET,
250-
KafkaPartitionSplit.LATEST_OFFSET);
249+
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION,
250+
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
251251
final KafkaPartitionSplit emptySplitWithZeroStoppingOffset =
252252
new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0);
253253

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,9 @@ void testAssigningEmptySplits() throws Exception {
397397
// Normal split with NUM_RECORDS_PER_SPLIT records
398398
final KafkaPartitionSplit normalSplit =
399399
new KafkaPartitionSplit(
400-
new TopicPartition(TOPIC, 0), 0, KafkaPartitionSplit.LATEST_OFFSET);
400+
new TopicPartition(TOPIC, 0),
401+
0,
402+
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
401403
// Empty split with no record
402404
final KafkaPartitionSplit emptySplit =
403405
new KafkaPartitionSplit(

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,7 @@ public void testSerializer() throws IOException {
3636
Long normalOffset = 1L;
3737
TopicPartition topicPartition = new TopicPartition(topic, 1);
3838
List<Long> stoppingOffsets =
39-
Lists.newArrayList(
40-
KafkaPartitionSplit.COMMITTED_OFFSET,
41-
KafkaPartitionSplit.LATEST_OFFSET,
42-
offsetZero,
43-
normalOffset);
39+
Lists.newArrayList(KafkaPartitionSplit.COMMITTED_OFFSET, offsetZero, normalOffset);
4440
KafkaPartitionSplitSerializer splitSerializer = new KafkaPartitionSplitSerializer();
4541
for (Long stoppingOffset : stoppingOffsets) {
4642
KafkaPartitionSplit kafkaPartitionSplit =

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,13 +477,17 @@ public void testBoundedLatestOffset() {
477477
OffsetsInitializer offsetsInitializer =
478478
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
479479
TopicPartition partition = new TopicPartition(TOPIC, 0);
480+
long endOffsets = 123L;
480481
Map<TopicPartition, Long> partitionOffsets =
481482
offsetsInitializer.getPartitionOffsets(
482483
Collections.singletonList(partition),
483-
MockPartitionOffsetsRetriever.noInteractions());
484+
MockPartitionOffsetsRetriever.latest(
485+
(tps) ->
486+
Collections.singletonMap(
487+
partition, endOffsets)));
484488
assertThat(partitionOffsets)
485489
.containsOnlyKeys(partition)
486-
.containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET);
490+
.containsEntry(partition, endOffsets);
487491
});
488492
}
489493

0 commit comments

Comments
 (0)