Skip to content

Commit 44db509

Browse files
authored
Add KafkaTestUtils methods using Duration for timeout (#2467)
* Add KafkaTestUtils methods using Duration for timeout To adjust codebase to modern JVM (and Kafka API itself - e.g. poll(Duration)). Old methods with "long timeout" are deprecated. * Remove deprecated KafkaTestUtils methods using "long timeout" Replaced by method with Duration for timeout.
1 parent 59eb2dc commit 44db509

File tree

2 files changed

+28
-31
lines changed

2 files changed

+28
-31
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -136,30 +136,30 @@ public static Map<String, Object> producerProps(String brokers) {
136136
* @param <V> the value type.
137137
* @return the record.
138138
* @throws IllegalStateException if exactly one record is not received.
139-
* @see #getSingleRecord(Consumer, String, long)
139+
* @see #getSingleRecord(Consumer, String, Duration)
140140
*/
141141
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
142-
return getSingleRecord(consumer, topic, 60000); // NOSONAR magic #
142+
return getSingleRecord(consumer, topic, Duration.ofSeconds(60)); // NOSONAR magic #
143143
}
144144

145145
/**
146146
* Poll the consumer, expecting a single record for the specified topic.
147147
* @param consumer the consumer.
148148
* @param topic the topic.
149-
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(long)}.
149+
* @param timeout max duration to wait for records; forwarded to {@link Consumer#poll(Duration)}.
150150
* @param <K> the key type.
151151
* @param <V> the value type.
152152
* @return the record.
153153
* @throws IllegalStateException if exactly one record is not received.
154-
* @since 2.0
154+
* @since 2.9.3
155155
*/
156-
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
157-
long expire = System.currentTimeMillis() + timeout;
156+
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout) {
157+
long expire = System.currentTimeMillis() + timeout.toMillis();
158158
ConsumerRecords<K, V> received;
159159
Iterator<ConsumerRecord<K, V>> iterator;
160-
long remaining = timeout;
160+
long remaining = timeout.toMillis();
161161
do {
162-
received = getRecords(consumer, remaining);
162+
received = getRecords(consumer, Duration.ofMillis(remaining));
163163
iterator = received.records(topic).iterator();
164164
Map<TopicPartition, Long> reset = new HashMap<>();
165165
received.forEach(rec -> {
@@ -197,12 +197,12 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
197197
* @param commit commit offset after polling or not.
198198
* @param timeout the timeout.
199199
* @return the record or null if no record received.
200-
* @since 2.3
200+
* @since 2.9.3
201201
*/
202202
@Nullable
203203
@SuppressWarnings({ "rawtypes", "unchecked" })
204204
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
205-
boolean seekToLast, boolean commit, long timeout) {
205+
boolean seekToLast, boolean commit, Duration timeout) {
206206

207207
Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, "false");
208208
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
@@ -215,7 +215,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
215215
consumer.seek(topicPart, consumer.position(topicPart) - 1);
216216
}
217217
}
218-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(timeout));
218+
ConsumerRecords<?, ?> records = consumer.poll(timeout);
219219
ConsumerRecord<?, ?> record = records.count() == 1 ? records.iterator().next() : null;
220220
if (record != null && commit) {
221221
consumer.commitSync();
@@ -298,44 +298,41 @@ public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, S
298298
* @param <K> the key type.
299299
* @param <V> the value type.
300300
* @return the records.
301-
* @see #getRecords(Consumer, long)
301+
* @see #getRecords(Consumer, Duration)
302302
*/
303303
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
304-
return getRecords(consumer, 60000); // NOSONAR magic #
304+
return getRecords(consumer, Duration.ofSeconds(60)); // NOSONAR magic #
305305
}
306306

307307
/**
308308
* Poll the consumer for records.
309309
* @param consumer the consumer.
310-
* @param timeout max time in milliseconds to wait for records; forwarded to
311-
* {@link Consumer#poll(long)}.
310+
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(Duration)}.
312311
* @param <K> the key type.
313312
* @param <V> the value type.
314313
* @return the records.
315314
* @throws IllegalStateException if the poll returns null (since 2.3.4).
316-
* @since 2.0
315+
* @since 2.9.3
317316
*/
318-
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout) {
317+
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout) {
319318
return getRecords(consumer, timeout, -1);
320319
}
321320

322321
/**
323322
* Poll the consumer for records.
324323
* @param consumer the consumer.
325-
* @param timeout max time in milliseconds to wait for records; forwarded to
326-
* {@link Consumer#poll(long)}.
324+
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(Duration)}.
327325
* @param <K> the key type.
328326
* @param <V> the value type.
329-
* @param minRecords wait until the timeout or at least this number of receords are
330-
* received.
327+
* @param minRecords wait until the timeout or at least this number of records are received.
331328
* @return the records.
332329
* @throws IllegalStateException if the poll returns null.
333-
* @since 2.4.2
330+
* @since 2.9.3
334331
*/
335-
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout, int minRecords) {
332+
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords) {
336333
logger.debug("Polling...");
337334
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
338-
long remaining = timeout;
335+
long remaining = timeout.toMillis();
339336
int count = 0;
340337
do {
341338
long t1 = System.currentTimeMillis();
@@ -344,8 +341,7 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, l
344341
+ received.partitions().stream()
345342
.flatMap(p -> received.records(p).stream())
346343
// map to same format as send metadata toString()
347-
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
348-
.collect(Collectors.toList()));
344+
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).toList());
349345
if (received == null) {
350346
throw new IllegalStateException("null received from consumer.poll()");
351347
}

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121

22+
import java.time.Duration;
2223
import java.util.Map;
2324

2425
import org.apache.kafka.clients.admin.AdminClient;
@@ -72,7 +73,7 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
7273
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
7374
long t1 = System.currentTimeMillis();
7475
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
75-
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", 2000L));
76+
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2)));
7677
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(2000L);
7778
producer.send(new ProducerRecord<>("singleTopic5", 1, "foo"));
7879
producer.close();
@@ -92,14 +93,14 @@ public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception {
9293
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
9394
producer.close();
9495
ConsumerRecord<?, ?> oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
95-
"singleTopic3", 0, false, true, 10_000L);
96+
"singleTopic3", 0, false, true, Duration.ofSeconds(10));
9697
assertThat(oneRecord.value()).isEqualTo("foo");
9798
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
9899
.isNotNull()
99100
.extracting(omd -> omd.offset())
100101
.isEqualTo(1L);
101102
oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
102-
"singleTopic3", 0, true, true, 10_000L);
103+
"singleTopic3", 0, true, true, Duration.ofSeconds(10));
103104
assertThat(oneRecord.value()).isEqualTo("foo");
104105
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
105106
.isNotNull()
@@ -124,7 +125,7 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
124125
Thread.currentThread().interrupt();
125126
}
126127
}).start();
127-
ConsumerRecords<Integer, String> records = KafkaTestUtils.getRecords(consumer, 10_000L, 2);
128+
ConsumerRecords<Integer, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10), 2);
128129
assertThat(records.count()).isEqualTo(2);
129130
producer.close();
130131
consumer.close();
@@ -138,7 +139,7 @@ public void testGetCurrentOffsetWithAdminClient(EmbeddedKafkaBroker broker) thro
138139
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
139140

140141
KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "testGetCurrentOffsetWithAdminClient",
141-
"singleTopic3", 0, false, true, 10_000L);
142+
"singleTopic3", 0, false, true, Duration.ofSeconds(10));
142143
assertThat(KafkaTestUtils.getCurrentOffset(adminClient, "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0))
143144
.isNotNull()
144145
.extracting(omd -> omd.offset())

0 commit comments

Comments
 (0)