Skip to content

Commit f99fe20

Browse files
realcbbartembilan
authored andcommitted
Fix getHighestOffsetRecords
* Add test case for "fix getHighestOffsetRecords" * Simple code style polishing **Cherry-pick to 1.2.x & 1.1.x** Conflicts: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
1 parent ee63cf3 commit f99fe20

File tree

1 file changed

+5
-9
lines changed

1 file changed

+5
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -982,18 +982,14 @@ private void commitIfNecessary() {
982982
}
983983

984984
private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(List<ConsumerRecord<K, V>> records) {
985-
Map<Integer, ConsumerRecord<K, V>> highestOffsetMap = new HashMap<>();
986-
985+
Map<TopicPartition, ConsumerRecord<K, V>> highestOffsetMap = new HashMap<>();
987986
for (ConsumerRecord<K, V> record : records) {
988-
if (record != null) {
989-
ConsumerRecord<K, V> consumerRecord = highestOffsetMap.get(record.partition());
990-
991-
if (consumerRecord == null || record.offset() > consumerRecord.offset()) {
992-
highestOffsetMap.put(record.partition(), record);
993-
}
987+
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
988+
ConsumerRecord<K, V> consumerRecord = highestOffsetMap.get(topicPartition);
989+
if (consumerRecord == null || record.offset() > consumerRecord.offset()) {
990+
highestOffsetMap.put(topicPartition, record);
994991
}
995992
}
996-
997993
return highestOffsetMap.values();
998994
}
999995

0 commit comments

Comments
 (0)