Skip to content

Commit bd911ad

Browse files
garyrussellartembilan
authored andcommitted
GH-1587: Don't Correct TX Offsets After Seek(s)
Resolves #1587 We should not advance the consumer partition if Seek operations have been performed; skip fixing the offsets if that condition is detected. Capture the positions after the poll and check during the fix operation. This is difficult to write a unit test for; tested with a Boot application, observing the correct behavior via DEBUG logs. **cherry-pick to 2.5.x**
1 parent cee9bed commit bd911ad

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
458458

459459
private final Map<TopicPartition, OffsetAndMetadata> lastCommits = new HashMap<>();
460460

461+
private final Map<TopicPartition, Long> savedPositions = new HashMap<>();
462+
461463
private final GenericMessageListener<?> genericListener;
462464

463465
private final ConsumerSeekAware consumerSeekAwareListener;
@@ -1119,6 +1121,7 @@ protected void pollAndInvoke() {
11191121
resumeConsumerIfNeccessary();
11201122
debugRecords(records);
11211123
if (records != null && records.count() > 0) {
1124+
savePositionsIfNeeded(records);
11221125
if (this.containerProperties.getIdleEventInterval() != null) {
11231126
this.lastReceive = System.currentTimeMillis();
11241127
}
@@ -1129,13 +1132,28 @@ protected void pollAndInvoke() {
11291132
}
11301133
}
11311134

1135+
private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
1136+
if (this.fixTxOffsets) {
1137+
this.savedPositions.clear();
1138+
records.partitions().forEach(tp -> this.savedPositions.put(tp, this.consumer.position(tp)));
1139+
}
1140+
}
1141+
11321142
@SuppressWarnings("rawtypes")
11331143
private void fixTxOffsetsIfNeeded() {
11341144
if (this.fixTxOffsets) {
11351145
try {
11361146
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
11371147
this.lastCommits.forEach((tp, oamd) -> {
11381148
long position = this.consumer.position(tp);
1149+
Long saved = this.savedPositions.get(tp);
1150+
if (saved != null && saved.longValue() != position) {
1151+
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
1152+
+ "saved: " + this.savedPositions + ", "
1153+
+ "comitted: " + oamd + ", "
1154+
+ "current: " + tp + "@" + position);
1155+
return;
1156+
}
11391157
if (position > oamd.offset()) {
11401158
toFix.put(tp, new OffsetAndMetadata(position));
11411159
}

0 commit comments

Comments
 (0)