Skip to content

Commit 33d0022

Browse files
authored
[FLINK-36630][Connectors/Kafka] Wrap consumer.position in retryOnWakeup (#133)
1 parent 7c112ab commit 33d0022

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
124124
List<TopicPartition> finishedPartitions = new ArrayList<>();
125125
for (TopicPartition tp : consumer.assignment()) {
126126
long stoppingOffset = getStoppingOffset(tp);
127-
long consumerPosition = consumer.position(tp);
127+
long consumerPosition = getConsumerPosition(tp, "retrieving consumer position");
128128
// Stop fetching when the consumer's position reaches the stoppingOffset.
129129
// Control messages may follow the last record; therefore, using the last record's
130130
// offset as a stopping condition could result in indefinite blocking.
@@ -279,6 +279,10 @@ void setConsumerClientRack(Properties consumerProps, String rackId) {
279279
}
280280
}
281281

282+
long getConsumerPosition(TopicPartition tp, String msg) {
283+
return retryOnWakeup(() -> consumer.position(tp), msg);
284+
}
285+
282286
private void parseStartingOffsets(
283287
KafkaPartitionSplit split,
284288
List<TopicPartition> partitionsStartingFromEarliest,
@@ -371,10 +375,9 @@ private void removeEmptySplits() {
371375
List<TopicPartition> emptyPartitions = new ArrayList<>();
372376
// If none of the partitions have any records,
373377
for (TopicPartition tp : consumer.assignment()) {
374-
if (retryOnWakeup(
375-
() -> consumer.position(tp),
376-
"getting starting offset to check if split is empty")
377-
>= getStoppingOffset(tp)) {
378+
long startingOffset =
379+
getConsumerPosition(tp, "getting starting offset to check if split is empty");
380+
if (startingOffset >= getStoppingOffset(tp)) {
378381
emptyPartitions.add(tp);
379382
}
380383
}
@@ -403,9 +406,8 @@ private void maybeLogSplitChangesHandlingResult(
403406
}
404407

405408
long startingOffset =
406-
retryOnWakeup(
407-
() -> consumer.position(split.getTopicPartition()),
408-
"logging starting position");
409+
getConsumerPosition(split.getTopicPartition(), "logging starting position");
410+
409411
long stoppingOffset = getStoppingOffset(split.getTopicPartition());
410412
splitsInfo.add(
411413
String.format(

0 commit comments

Comments
 (0)