Skip to content

Commit c0303d3

Browse files
authored
GH-1861: Add KafkaTemplate.receive()
Resolves #1861 * Pull constant up to interface.
1 parent f7046d7 commit c0303d3

File tree

5 files changed

+103
-0
lines changed

5 files changed

+103
-0
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,6 +2457,24 @@ As an aside; previously, containers in each group were added to a bean of type `
24572457
These collections are now deprecated in favor of beans of type `ContainerGroup` with a bean name that is the group name, suffixed with `.group`; in the example above, there would be 2 beans `g1.group` and `g2.group`.
24582458
The `Collection` beans will be removed in a future release.
24592459

2460+
[[kafka-template-receive]]
2461+
===== Using `KafkaTemplate` to Receive
2462+
2463+
This section covers how to use `KafkaTemplate` to receive messages.
2464+
2465+
Starting with version 2.8, the template has two `receive()` methods:
2466+
2467+
====
2468+
[source, jvava]
2469+
----
2470+
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
2471+
2472+
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
2473+
----
2474+
====
2475+
2476+
As you can see, you need to know the partition and offset of the record you need to retrieve; a new `Consumer` is created (and closed) for each operation.
2477+
24602478
[[container-props]]
24612479
==== Listener Container Properties
24622480

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,9 @@ This version requires the 2.8.0 `kafka-clients`.
1414
The listener container can now be configured to accept manual offset commits out of order (usually asynchronously).
1515
The container will defer the commit until the missing offset is acknowledged.
1616
See <<ooo-commits>> for more information.
17+
18+
[[x28-template]]
19+
==== `KafkaTemplate` Changes
20+
21+
You can now receive a single record, given the topic, partition and offset.
22+
See <<kafka-template-receive>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.time.Duration;
1920
import java.util.List;
2021
import java.util.Map;
2122

2223
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
24+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2325
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2426
import org.apache.kafka.clients.producer.Producer;
2527
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -53,6 +55,11 @@
5355
*/
5456
public interface KafkaOperations<K, V> {
5557

58+
/**
59+
* Default timeout for {@link #receive(String, int, long)}.
60+
*/
61+
Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(5);
62+
5663
/**
5764
* Send the data to the default topic with no key or partition.
5865
* @param data The data.
@@ -267,6 +274,30 @@ default ProducerFactory<K, V> getProducerFactory() {
267274
throw new UnsupportedOperationException("This implementation does not support this operation");
268275
}
269276

277+
/**
278+
* Receive a single record with the default poll timeout (5 seconds).
279+
* @param topic the topic.
280+
* @param partition the partition.
281+
* @param offset the offset.
282+
* @return the record or null.
283+
* @since 2.8
284+
* @see #DEFAULT_POLL_TIMEOUT
285+
*/
286+
@Nullable
287+
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
288+
289+
/**
290+
* Receive a single record.
291+
* @param topic the topic.
292+
* @param partition the partition.
293+
* @param offset the offset.
294+
* @param pollTimeout the timeout.
295+
* @return the record or null.
296+
* @since 2.8
297+
*/
298+
@Nullable
299+
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
300+
270301
/**
271302
* A callback for executing arbitrary operations on the {@link Producer}.
272303
* @param <K> the key type.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.time.Duration;
20+
import java.util.Collections;
2021
import java.util.HashMap;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Properties;
2325
import java.util.concurrent.ExecutionException;
2426
import java.util.concurrent.Future;
2527

2628
import org.apache.commons.logging.LogFactory;
29+
import org.apache.kafka.clients.consumer.Consumer;
30+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2731
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
32+
import org.apache.kafka.clients.consumer.ConsumerRecord;
33+
import org.apache.kafka.clients.consumer.ConsumerRecords;
2834
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2935
import org.apache.kafka.clients.producer.Callback;
3036
import org.apache.kafka.clients.producer.Producer;
@@ -112,6 +118,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
112118

113119
private boolean converterSet;
114120

121+
private ConsumerFactory<K, V> consumerFactory;
122+
115123
private volatile boolean micrometerEnabled = true;
116124

117125
private volatile MicrometerHolder micrometerHolder;
@@ -347,6 +355,15 @@ protected ProducerFactory<K, V> getProducerFactory(String topic) {
347355
return this.producerFactory;
348356
}
349357

358+
/**
359+
* Set a consumer factory for receive operations.
360+
* @param consumerFactory the consumer factory.
361+
* @since 2.8
362+
*/
363+
public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
364+
this.consumerFactory = consumerFactory;
365+
}
366+
350367
@Override
351368
public void onApplicationEvent(ContextStoppedEvent event) {
352369
if (this.customProducerFactory) {
@@ -541,6 +558,31 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
541558
producerForOffsets().sendOffsetsToTransaction(offsets, groupMetadata);
542559
}
543560

561+
562+
@Override
563+
@Nullable
564+
public ConsumerRecord<K, V> receive(String topic, int partition, long offset) {
565+
return receive(topic, partition, offset, DEFAULT_POLL_TIMEOUT);
566+
}
567+
568+
@Override
569+
@Nullable
570+
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
571+
Assert.notNull(this.consumerFactory, "A consumerFactory is required");
572+
Properties props = new Properties();
573+
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
574+
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
575+
TopicPartition topicPartition = new TopicPartition(topic, partition);
576+
consumer.assign(Collections.singletonList(topicPartition));
577+
consumer.seek(topicPartition, offset);
578+
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
579+
if (records.count() == 1) {
580+
return records.iterator().next();
581+
}
582+
return null;
583+
}
584+
}
585+
544586
private Producer<K, V> producerForOffsets() {
545587
Producer<K, V> producer = this.producers.get();
546588
if (producer == null) {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ void testTemplate() {
171171
assertThat(partitions).isNotNull();
172172
assertThat(partitions).hasSize(2);
173173
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
174+
template.setConsumerFactory(
175+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
176+
ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 0, received.offset());
177+
assertThat(receive).has(allOf(keyValue(2, "buz"), partition(0)))
178+
.extracting(rec -> rec.offset())
179+
.isEqualTo(received.offset());
174180
pf.destroy();
175181
}
176182

0 commit comments

Comments
 (0)