Skip to content

Commit d3f13ef

Browse files
garyrussellartembilan
authored andcommitted
GH-1121: Fix KTU.getSingleRecord()
Fixes #1121 `getSingleRecord()` didn't work if the consumer was subscribed to multiple topics that had records. Change the assert to ensure there is just one message for the requested topic and seek any records that were retrieved for other topics. **cherry-pick to 2.2.x**
1 parent e487ece commit d3f13ef

File tree

2 files changed

+69
-1
lines changed

2 files changed

+69
-1
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.time.Duration;
2222
import java.util.HashMap;
23+
import java.util.Iterator;
2324
import java.util.Map;
2425
import java.util.stream.Collectors;
2526

@@ -30,6 +31,7 @@
3031
import org.apache.kafka.clients.consumer.ConsumerRecord;
3132
import org.apache.kafka.clients.consumer.ConsumerRecords;
3233
import org.apache.kafka.clients.producer.ProducerConfig;
34+
import org.apache.kafka.common.TopicPartition;
3335
import org.apache.kafka.common.serialization.IntegerDeserializer;
3436
import org.apache.kafka.common.serialization.IntegerSerializer;
3537
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -165,7 +167,19 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
165167
*/
166168
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
167169
ConsumerRecords<K, V> received = getRecords(consumer, timeout);
168-
assertThat(received.count()).as("Incorrect results returned", received.count()).isEqualTo(1);
170+
Iterator<ConsumerRecord<K, V>> iterator = received.records(topic).iterator();
171+
assertThat(iterator.hasNext()).as("No records found for topic").isTrue();
172+
iterator.next();
173+
assertThat(iterator.hasNext()).as("More than one record for topic found").isFalse();
174+
if (received.count() > 1) {
175+
Map<TopicPartition, Long> reset = new HashMap<>();
176+
received.forEach(rec -> {
177+
if (!rec.topic().equals(topic)) {
178+
reset.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), tp -> rec.offset());
179+
}
180+
});
181+
reset.forEach((tp, off) -> consumer.seek(tp, off));
182+
}
169183
return received.records(topic).iterator().next();
170184
}
171185

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.test.utils;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerConfig;
22+
import org.apache.kafka.clients.consumer.KafkaConsumer;
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
28+
import org.springframework.kafka.test.context.EmbeddedKafka;
29+
30+
/**
31+
* @author Gary Russell
32+
* @since 2.2.7
33+
*
34+
*/
35+
@EmbeddedKafka(topics = { "singleTopic1", "singleTopic2" })
36+
public class KafkaTestUtilsTests {
37+
38+
@Test
39+
void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
40+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
41+
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
42+
producer.send(new ProducerRecord<>("singleTopic1", 1, "foo"));
43+
producer.send(new ProducerRecord<>("singleTopic2", 1, "foo"));
44+
producer.close();
45+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests", "false", broker);
46+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
47+
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
48+
broker.consumeFromAllEmbeddedTopics(consumer);
49+
KafkaTestUtils.getSingleRecord(consumer, "singleTopic1");
50+
KafkaTestUtils.getSingleRecord(consumer, "singleTopic2");
51+
consumer.close();
52+
}
53+
54+
}

0 commit comments

Comments
 (0)