Skip to content

Commit ad7cad4

Browse files
committed
Fix Map Iteration
1 parent 343df6d commit ad7cad4

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2006,15 +2006,12 @@ private void processTimestampSeeks() {
20062006
}
20072007
}
20082008
if (timestampSeeks != null) {
2009-
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
2010-
.offsetsForTimes(timestampSeeks);
2011-
2012-
for (TopicPartition tp : offsetsForTimes.keySet()) {
2013-
OffsetAndTimestamp ot = offsetsForTimes.get(tp);
2009+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(timestampSeeks);
2010+
offsetsForTimes.forEach((tp, ot) -> {
20142011
if (ot != null) {
20152012
this.consumer.seek(tp, ot.offset());
20162013
}
2017-
}
2014+
});
20182015
}
20192016
}
20202017

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,12 @@ void testAsyncTimestampSeeks() throws InterruptedException {
372372
.collect(Collectors.toMap(tp -> tp, tp -> 200L)));
373373
given(consumer.offsetsForTimes(Collections.singletonMap(tp0, 42L)))
374374
.willReturn(Collections.singletonMap(tp0, new OffsetAndTimestamp(73L, 42L)));
375+
Map<TopicPartition, OffsetAndTimestamp> map = new HashMap<>(assignments.stream()
376+
.collect(Collectors.toMap(tp -> tp,
377+
tp -> new OffsetAndTimestamp(tp.equals(tp0) ? 73L : 92L, 43L))));
378+
map.put(new TopicPartition("foo", 5), null);
375379
given(consumer.offsetsForTimes(any()))
376-
.willReturn(assignments.stream()
377-
.collect(Collectors.toMap(tp -> tp,
378-
tp -> new OffsetAndTimestamp(tp.equals(tp0) ? 73L : 92L, 43L))));
380+
.willReturn(map);
379381
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
380382
.willReturn(consumer);
381383
ContainerProperties containerProperties = new ContainerProperties("foo");

0 commit comments

Comments
 (0)