Skip to content

Commit 996f864

Browse files
garyrussellartembilan
authored andcommitted
GH-811: Configurable Reply Headers
Resolves #811 Support copying and/or modifying reply headers. * Javadoc Polishing
1 parent 2143168 commit 996f864

File tree

9 files changed

+227
-8
lines changed

9 files changed

+227
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry)
172172

173173
/**
174174
* Set the name of the {@link KafkaListenerContainerFactory} to use by default.
175-
* <p>If none is specified, "KafkaListenerContainerFactory" is assumed to be defined.
175+
* <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.
176176
* @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.
177177
*/
178178
public void setContainerFactoryBeanName(String containerFactoryBeanName) {

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.kafka.listener.ErrorHandler;
3434
import org.springframework.kafka.listener.GenericErrorHandler;
3535
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
36+
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
3637
import org.springframework.kafka.support.TopicPartitionInitialOffset;
3738
import org.springframework.kafka.support.converter.MessageConverter;
3839
import org.springframework.retry.RecoveryCallback;
@@ -84,6 +85,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
8485

8586
private AfterRollbackProcessor<K, V> afterRollbackProcessor;
8687

88+
private ReplyHeadersConfigurer replyHeadersConfigurer;
89+
8790
/**
8891
* Specify a {@link ConsumerFactory} to use.
8992
* @param consumerFactory The consumer factory.
@@ -230,6 +233,15 @@ public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollback
230233
this.afterRollbackProcessor = afterRollbackProcessor;
231234
}
232235

236+
/**
237+
* Set a configurer which will be invoked when creating a reply message.
238+
* @param replyHeadersConfigurer the configurer.
239+
* @since 2.2
240+
*/
241+
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) {
242+
this.replyHeadersConfigurer = replyHeadersConfigurer;
243+
}
244+
233245
/**
234246
* Obtain the properties template for this factory - set properties as needed
235247
* and they will be copied to a final properties instance for the endpoint.
@@ -270,6 +282,9 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
270282
if (this.replyTemplate != null) {
271283
aklEndpoint.setReplyTemplate(this.replyTemplate);
272284
}
285+
if (this.replyHeadersConfigurer != null) {
286+
aklEndpoint.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
287+
}
273288
}
274289

275290
endpoint.setupListenerContainer(instance, this.messageConverter);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
4343
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
4444
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
45+
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
4546
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
4647
import org.springframework.kafka.support.TopicPartitionInitialOffset;
4748
import org.springframework.kafka.support.converter.MessageConverter;
@@ -105,6 +106,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
105106

106107
private Boolean autoStartup;
107108

109+
private ReplyHeadersConfigurer replyHeadersConfigurer;
110+
108111
@Override
109112
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
110113
this.beanFactory = beanFactory;
@@ -374,6 +377,15 @@ public void setAutoStartup(Boolean autoStartup) {
374377
this.autoStartup = autoStartup;
375378
}
376379

380+
/**
381+
* Set a configurer which will be invoked when creating a reply message.
382+
* @param replyHeadersConfigurer the configurer.
383+
* @since 2.2
384+
*/
385+
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) {
386+
this.replyHeadersConfigurer = replyHeadersConfigurer;
387+
}
388+
377389
@Override
378390
public void afterPropertiesSet() {
379391
boolean topicsEmpty = getTopics().isEmpty();
@@ -408,7 +420,11 @@ protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(M
408420

409421
@SuppressWarnings("unchecked")
410422
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
411-
Object messageListener = createMessageListener(container, messageConverter);
423+
MessagingMessageListenerAdapter<K, V> adapter = createMessageListener(container, messageConverter);
424+
if (this.replyHeadersConfigurer != null) {
425+
adapter.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
426+
}
427+
Object messageListener = adapter;
412428
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
413429
if (this.retryTemplate != null) {
414430
messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Collection;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.stream.Collectors;
2829

2930
import org.apache.commons.logging.Log;
3031
import org.apache.commons.logging.LogFactory;
@@ -54,11 +55,13 @@
5455
import org.springframework.kafka.support.converter.RecordMessageConverter;
5556
import org.springframework.lang.Nullable;
5657
import org.springframework.messaging.Message;
58+
import org.springframework.messaging.MessageHeaders;
5759
import org.springframework.messaging.MessagingException;
5860
import org.springframework.messaging.converter.MessageConversionException;
5961
import org.springframework.messaging.handler.annotation.Payload;
6062
import org.springframework.messaging.support.MessageBuilder;
6163
import org.springframework.util.Assert;
64+
import org.springframework.util.ObjectUtils;
6265
import org.springframework.util.StringUtils;
6366

6467
/**
@@ -108,6 +111,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
108111

109112
private boolean messageReturnType;
110113

114+
private ReplyHeadersConfigurer replyHeadersConfigurer;
115+
111116
public MessagingMessageListenerAdapter(Object bean, Method method) {
112117
this.bean = bean;
113118
this.inferredType = determineInferredType(method);
@@ -214,6 +219,25 @@ protected boolean isMessageList() {
214219
return this.isMessageList;
215220
}
216221

222+
/**
223+
* Return the reply configurer.
224+
* @return the configurer.
225+
* @since 2.2
226+
* @see #setReplyHeadersConfigurer(ReplyHeadersConfigurer)
227+
*/
228+
protected ReplyHeadersConfigurer getReplyHeadersConfigurer() {
229+
return this.replyHeadersConfigurer;
230+
}
231+
232+
/**
233+
* Set a configurer which will be invoked when creating a reply message.
234+
* @param replyHeadersConfigurer the configurer.
235+
* @since 2.2
236+
*/
237+
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) {
238+
this.replyHeadersConfigurer = replyHeadersConfigurer;
239+
}
240+
217241
@Override
218242
public void registerSeekCallback(ConsumerSeekCallback callback) {
219243
if (this.bean instanceof ConsumerSeekAware) {
@@ -384,6 +408,24 @@ else if (result instanceof Message) {
384408
if (sourceIsMessage) {
385409
MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
386410
.setHeader(KafkaHeaders.TOPIC, topic);
411+
if (this.replyHeadersConfigurer != null) {
412+
Map<String, Object> headersToCopy = ((Message<?>) source).getHeaders().entrySet().stream()
413+
.filter(e -> {
414+
String key = e.getKey();
415+
return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP)
416+
&& !key.equals(KafkaHeaders.CORRELATION_ID)
417+
&& !key.startsWith(KafkaHeaders.RECEIVED);
418+
})
419+
.filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue()))
420+
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
421+
if (headersToCopy.size() > 0) {
422+
builder.copyHeaders(headersToCopy);
423+
}
424+
headersToCopy = this.replyHeadersConfigurer.additionalHeaders();
425+
if (!ObjectUtils.isEmpty(headersToCopy)) {
426+
builder.copyHeaders(headersToCopy);
427+
}
428+
}
387429
if (correlationId != null) {
388430
builder.setHeader(KafkaHeaders.CORRELATION_ID, correlationId);
389431
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import java.util.Map;
20+
21+
import org.springframework.kafka.support.KafkaHeaders;
22+
import org.springframework.lang.Nullable;
23+
import org.springframework.messaging.MessageHeaders;
24+
25+
/**
26+
* A strategy for configuring which headers, if any, should be set in a reply message.
27+
*
28+
* @author Gary Russell
29+
* @since 2.2
30+
*
31+
*/
32+
@FunctionalInterface
33+
public interface ReplyHeadersConfigurer {
34+
35+
/**
36+
* Return true if the header should be copied to the reply message.
37+
* {@link KafkaHeaders#CORRELATION_ID} will not be offered; it is always copied.
38+
* {@link MessageHeaders#ID} and {@link MessageHeaders#TIMESTAMP} are never copied.
39+
* {@code KafkaHeaders.RECEIVED*} headers are never copied.
40+
* @param headerName the header name.
41+
* @param headerValue the header value.
42+
* @return true to copy.
43+
*/
44+
boolean shouldCopy(String headerName, Object headerValue);
45+
46+
/**
47+
* A map of additional headers to add to the reply message.
48+
* IMPORTANT: Any existing headers with the same name will be replaced by those
49+
* returned by this method.
50+
* @return the headers.
51+
*/
52+
@Nullable
53+
default Map<String, Object> additionalHeaders() {
54+
return null;
55+
}
56+
57+
}

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,15 @@
2626
*/
2727
public abstract class KafkaHeaders {
2828

29-
private static final String PREFIX = "kafka_";
29+
/**
30+
* The prefix for Kafka headers.
31+
*/
32+
public static final String PREFIX = "kafka_";
33+
34+
/**
35+
* The prefix for Kafka headers containing 'received' values.
36+
*/
37+
public static final String RECEIVED = PREFIX + "received";
3038

3139
/**
3240
* The header containing the topic when sending data to Kafka.
@@ -72,17 +80,17 @@ public abstract class KafkaHeaders {
7280
/**
7381
* The header containing the topic from which the message was received.
7482
*/
75-
public static final String RECEIVED_TOPIC = PREFIX + "receivedTopic";
83+
public static final String RECEIVED_TOPIC = RECEIVED + "Topic";
7684

7785
/**
7886
* The header containing the message key for the received message.
7987
*/
80-
public static final String RECEIVED_MESSAGE_KEY = PREFIX + "receivedMessageKey";
88+
public static final String RECEIVED_MESSAGE_KEY = RECEIVED + "MessageKey";
8189

8290
/**
8391
* The header containing the topic partition for the received message.
8492
*/
85-
public static final String RECEIVED_PARTITION_ID = PREFIX + "receivedPartitionId";
93+
public static final String RECEIVED_PARTITION_ID = RECEIVED + "PartitionId";
8694

8795
/**
8896
* The header for holding the {@link org.apache.kafka.common.record.TimestampType type} of timestamp.

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.assertj.core.api.Assertions.fail;
2121

2222
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.HashMap;
2325
import java.util.Map;
2426
import java.util.concurrent.CountDownLatch;
2527
import java.util.concurrent.ExecutionException;
@@ -30,7 +32,9 @@
3032
import org.apache.kafka.clients.consumer.ConsumerRecord;
3133
import org.apache.kafka.clients.producer.ProducerRecord;
3234
import org.apache.kafka.common.TopicPartition;
35+
import org.apache.kafka.common.header.Headers;
3336
import org.apache.kafka.common.header.internals.RecordHeader;
37+
import org.apache.kafka.common.header.internals.RecordHeaders;
3438
import org.junit.ClassRule;
3539
import org.junit.Rule;
3640
import org.junit.Test;
@@ -50,6 +54,8 @@
5054
import org.springframework.kafka.core.KafkaTemplate;
5155
import org.springframework.kafka.listener.ContainerProperties;
5256
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
57+
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
58+
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
5359
import org.springframework.kafka.support.KafkaHeaders;
5460
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
5561
import org.springframework.kafka.support.TopicPartitionInitialOffset;
@@ -102,11 +108,17 @@ public void testGood() throws Exception {
102108
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
103109
try {
104110
template.setReplyTimeout(30_000);
105-
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, "foo");
111+
Headers headers = new RecordHeaders();
112+
headers.add("baz", "buz".getBytes());
113+
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, null, null, null, "foo", headers);
106114
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
107115
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
108116
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
109117
assertThat(consumerRecord.value()).isEqualTo("FOO");
118+
Map<String, Object> receivedHeaders = new HashMap<>();
119+
new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders);
120+
assertThat(receivedHeaders).containsKey("baz");
121+
assertThat(receivedHeaders).hasSize(2);
110122
}
111123
finally {
112124
template.stop();
@@ -199,12 +211,19 @@ public void testGoodWithSimpleMapper() throws Exception {
199211
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(B_REPLY);
200212
try {
201213
template.setReplyTimeout(30_000);
202-
ProducerRecord<Integer, String> record = new ProducerRecord<>(B_REQUEST, "qux");
214+
Headers headers = new RecordHeaders();
215+
headers.add("baz", "buz".getBytes());
216+
ProducerRecord<Integer, String> record = new ProducerRecord<>(B_REQUEST, null, null, null, "qux", headers);
203217
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, B_REPLY.getBytes()));
204218
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
205219
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
206220
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
207221
assertThat(consumerRecord.value()).isEqualTo("qUX");
222+
Map<String, Object> receivedHeaders = new HashMap<>();
223+
new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders);
224+
assertThat(receivedHeaders).containsKey("qux");
225+
assertThat(receivedHeaders).doesNotContainKey("baz");
226+
assertThat(receivedHeaders).hasSize(2);
208227
}
209228
finally {
210229
template.stop();
@@ -290,6 +309,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon
290309
new ConcurrentKafkaListenerContainerFactory<>();
291310
factory.setConsumerFactory(cf());
292311
factory.setReplyTemplate(template());
312+
factory.setReplyHeadersConfigurer((k, v) -> k.equals("baz"));
293313
return factory;
294314
}
295315

@@ -302,6 +322,19 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> simpleMapperFact
302322
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
303323
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
304324
factory.setMessageConverter(messageConverter);
325+
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
326+
327+
@Override
328+
public boolean shouldCopy(String headerName, Object headerValue) {
329+
return false;
330+
}
331+
332+
@Override
333+
public Map<String, Object> additionalHeaders() {
334+
return Collections.singletonMap("qux", "fiz");
335+
}
336+
337+
});
305338
return factory;
306339
}
307340

0 commit comments

Comments
 (0)