Skip to content

Commit 54e3b70

Browse files
committed
[FLINK-28303] Allow LATEST_OFFSET marker when restoring from old checkpoints
1 parent 946df1e commit 54e3b70

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public class KafkaPartitionSplit implements SourceSplit {
4444

4545
// Valid special starting offsets
4646
public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
47-
new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
47+
new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET));
4848
public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
49-
new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
49+
new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
5050

5151
private final TopicPartition tp;
5252
private final long startingOffset;
@@ -133,17 +133,21 @@ private static void verifyInitialOffset(
133133
String.format(
134134
"Invalid starting offset %d is specified for partition %s. "
135135
+ "It should either be non-negative or be one of the "
136-
+ "[%d(earliest), %d(committed)].",
137-
startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET));
136+
+ "[%d(earliest), %d(latest), %d(committed)].",
137+
startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET));
138138
}
139139

140140
if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
141141
throw new FlinkRuntimeException(
142142
String.format(
143143
"Illegal stopping offset %d is specified for partition %s. "
144144
+ "It should either be non-negative or be one of the "
145-
+ "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
146-
stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET));
145+
+ "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].",
146+
stoppingOffset,
147+
tp,
148+
LATEST_OFFSET,
149+
COMMITTED_OFFSET,
150+
NO_STOPPING_OFFSET));
147151
}
148152
}
149153
}

0 commit comments

Comments
 (0)