Skip to content

Commit 0c3ff71

Browse files
garyrussellartembilan
authored andcommitted
GH-1171: Custom headers for ReplyingKafkaTemplate
Resolves #1171 Allow the use of custom header names for request/reply semantics. * Try trusty on travis
1 parent 2837fb2 commit 0c3ff71

File tree

5 files changed

+105
-17
lines changed

5 files changed

+105
-17
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
dist: trusty
12
language: java
23
jdk: oraclejdk8
34
install: true

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.time.Instant;
2222
import java.util.Collection;
23-
import java.util.Iterator;
2423
import java.util.List;
2524
import java.util.UUID;
2625
import java.util.concurrent.ConcurrentHashMap;
@@ -92,6 +91,12 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
9291
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy =
9392
ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
9493

94+
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
95+
96+
private String replyTopicHeaderName = KafkaHeaders.REPLY_TOPIC;
97+
98+
private String replyPartitionHeaderName = KafkaHeaders.REPLY_PARTITION;
99+
95100
private volatile boolean running;
96101

97102
private volatile boolean schedulerInitialized;
@@ -204,6 +209,39 @@ public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationK
204209
this.correlationStrategy = correlationStrategy;
205210
}
206211

212+
/**
213+
* Set a custom header name for the correlation id. Default
214+
* {@link KafkaHeaders#CORRELATION_ID}.
215+
* @param correlationHeaderName the header name.
216+
* @since 2.3
217+
*/
218+
public void setCorrelationHeaderName(String correlationHeaderName) {
219+
Assert.notNull(correlationHeaderName, "'correlationHeaderName' cannot be null");
220+
this.correlationHeaderName = correlationHeaderName;
221+
}
222+
223+
/**
224+
* Set a custom header name for the reply topic. Default
225+
* {@link KafkaHeaders#REPLY_TOPIC}.
226+
* @param replyTopicHeaderName the header name.
227+
* @since 2.3
228+
*/
229+
public void setReplyTopicHeaderName(String replyTopicHeaderName) {
230+
Assert.notNull(replyTopicHeaderName, "'replyTopicHeaderName' cannot be null");
231+
this.replyTopicHeaderName = replyTopicHeaderName;
232+
}
233+
234+
/**
235+
* Set a custom header name for the reply partition. Default
236+
* {@link KafkaHeaders#REPLY_PARTITION}.
237+
* @param replyPartitionHeaderName the reply partition header name.
238+
* @since 2.3
239+
*/
240+
public void setReplyPartitionHeaderName(String replyPartitionHeaderName) {
241+
Assert.notNull(replyPartitionHeaderName, "'replyPartitionHeaderName' cannot be null");
242+
this.replyPartitionHeaderName = replyPartitionHeaderName;
243+
}
244+
207245
@Override
208246
public void afterPropertiesSet() {
209247
if (!this.schedulerSet && !this.schedulerInitialized) {
@@ -249,12 +287,12 @@ public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
249287
Headers headers = record.headers();
250288
boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null;
251289
if (!hasReplyTopic && this.replyTopic != null) {
252-
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, this.replyTopic));
290+
headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
253291
if (this.replyPartition != null) {
254-
headers.add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, this.replyPartition));
292+
headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
255293
}
256294
}
257-
headers.add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId.getCorrelationId()));
295+
headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
258296
this.logger.debug(() -> "Sending: " + record + WITH_CORRELATION_ID + correlationId);
259297
RequestReplyFuture<K, V, R> future = new RequestReplyFuture<>();
260298
this.futures.put(correlationId, future);
@@ -338,18 +376,15 @@ private static <K, V> CorrelationKey defaultCorrelationIdStrategy(
338376
@Override
339377
public void onMessage(List<ConsumerRecord<K, R>> data) {
340378
data.forEach(record -> {
341-
Iterator<Header> iterator = record.headers().iterator();
379+
Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
342380
CorrelationKey correlationId = null;
343-
while (correlationId == null && iterator.hasNext()) {
344-
Header next = iterator.next();
345-
if (next.key().equals(KafkaHeaders.CORRELATION_ID)) {
346-
correlationId = new CorrelationKey(next.value());
347-
}
381+
if (correlationHeader != null) {
382+
correlationId = new CorrelationKey(correlationHeader.value());
348383
}
349384
if (correlationId == null) {
350385
this.logger.error(() -> "No correlationId found in reply: " + record
351386
+ " - to use request/reply semantics, the responding server must return the correlation id "
352-
+ " in the '" + KafkaHeaders.CORRELATION_ID + "' header");
387+
+ " in the '" + this.correlationHeaderName + "' header");
353388
}
354389
else {
355390
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.verify;
2727

28+
import java.nio.ByteBuffer;
2829
import java.time.Duration;
2930
import java.util.ArrayList;
3031
import java.util.Collection;
@@ -121,9 +122,14 @@ public class ReplyingKafkaTemplateTests {
121122

122123
private static final String F_REQUEST = "fRequest";
123124

125+
private static final String G_REPLY = "gReply";
126+
127+
private static final String G_REQUEST = "gRequest";
128+
124129
@ClassRule
125130
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, 5, A_REQUEST, A_REPLY,
126-
B_REQUEST, B_REPLY, C_REQUEST, C_REPLY, D_REQUEST, D_REPLY, E_REQUEST, E_REPLY, F_REQUEST, F_REPLY);
131+
B_REQUEST, B_REPLY, C_REQUEST, C_REPLY, D_REQUEST, D_REPLY, E_REQUEST, E_REPLY, F_REQUEST, F_REPLY,
132+
G_REQUEST, G_REPLY);
127133

128134
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
129135

@@ -152,10 +158,8 @@ public void testGood() throws Exception {
152158
new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders);
153159
assertThat(receivedHeaders).containsKey("baz");
154160
assertThat(receivedHeaders).hasSize(2);
155-
assertThat(KafkaTestUtils.getPropertyValue(
156-
this.registry.getListenerContainer(A_REQUEST), "containerProperties.missingTopicsFatal",
157-
Boolean.class))
158-
.isFalse();
161+
assertThat(this.registry.getListenerContainer(A_REQUEST).getContainerProperties().isMissingTopicsFatal())
162+
.isFalse();
159163
}
160164
finally {
161165
template.stop();
@@ -481,6 +485,28 @@ public AggregatingReplyingKafkaTemplate<Integer, String, String> aggregatingTemp
481485
return template;
482486
}
483487

488+
@Test
489+
public void withCustomHeaders() throws Exception {
490+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(new TopicPartitionOffset(G_REPLY, 1));
491+
template.setCorrelationHeaderName("custom.correlation.id");
492+
template.setReplyTopicHeaderName("custom.reply.to");
493+
template.setReplyPartitionHeaderName("custom.reply.partition");
494+
try {
495+
template.setReplyTimeout(30_000);
496+
Headers headers = new RecordHeaders();
497+
ProducerRecord<Integer, String> record = new ProducerRecord<>(G_REQUEST, null, null, null, "foo", headers);
498+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
499+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
500+
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
501+
assertThat(consumerRecord.value()).isEqualTo("fooWithCustomHeaders");
502+
assertThat(consumerRecord.partition()).isEqualTo(1);
503+
}
504+
finally {
505+
template.stop();
506+
template.destroy();
507+
}
508+
}
509+
484510
@Configuration
485511
@EnableKafka
486512
public static class Config {
@@ -568,6 +594,16 @@ public String dListener2(String in) {
568594
return in.substring(0, 1) + in.substring(1).toUpperCase();
569595
}
570596

597+
@KafkaListener(id = G_REQUEST, topics = G_REQUEST)
598+
public void gListener(Message<String> in) {
599+
String replyTopic = new String(in.getHeaders().get("custom.reply.to", byte[].class));
600+
int replyPart = ByteBuffer.wrap(in.getHeaders().get("custom.reply.partition", byte[].class)).getInt();
601+
ProducerRecord<Integer, String> record = new ProducerRecord<>(replyTopic, replyPart, null,
602+
in.getPayload() + "WithCustomHeaders");
603+
record.headers().add(new RecordHeader("custom.correlation.id",
604+
in.getHeaders().get("custom.correlation.id", byte[].class)));
605+
template().send(record);
606+
}
571607
}
572608

573609
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)

src/reference/asciidoc/kafka.adoc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ public class KRequestingApplication {
544544

545545
Note that we can use Boot's auto-configured container factory to create the reply container.
546546

547-
The template sets a header called `KafkaHeaders.CORRELATION_ID`, which must be echoed back by the server side.
547+
The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.
548548

549549
In this case, the following `@KafkaListener` application responds:
550550

@@ -617,6 +617,17 @@ In this case, though, the reply container must not use Kafka's group management
617617
NOTE: The `DefaultKafkaHeaderMapper` requires Jackson to be on the classpath (for the `@KafkaListener`).
618618
If it is not available, the message converter has no header mapper, so you must configure a `MessagingMessageConverter` with a `SimpleKafkaHeaderMapper`, as shown earlier.
619619

620+
By default, 3 headers are used:
621+
622+
* `KafkaHeaders.CORRELATION_ID` - used to correlate the reply to a request
623+
* `KafkaHeaders.REPLY_TOPIC` - used to tell the server where to reply
624+
* `KafkaHeaders.REPLY_PARTITION` - (optional) used to tell the server which partition to reply to
625+
626+
These header names are used by the `@KafkaListener` infrastructure to route the reply.
627+
628+
Starting with version 2.3, you can customize the header names - the template has 3 properties `correlationHeaderName`, `replyTopicHeaderName`, and `replyPartitionHeaderName`.
629+
This is useful if your server is not a Spring application (or does not use the `@KafkaListener`).
630+
620631
[[aggregating-request-reply]]
621632
===== Aggregating Multiple Replies
622633

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,8 @@ This enables the code to access to extra information that was missing in the old
129129

130130
You can now override the default broker list property name in the annotation.
131131
See <<kafka-testing-embeddedkafka-annotation>> for more information.
132+
133+
==== ReplyingKafkaTemplate Changes
134+
135+
You can now customize the header names for correlation, reply topic and reply partition.
136+
See <<replying-template>> for more information.

0 commit comments

Comments
 (0)