Skip to content

Commit 20397d9

Browse files
authored
GH-1757: Option for Raw ConsumerRecord Header
Resolves #1757 * Add docs.
1 parent 290d11a commit 20397d9

File tree

6 files changed

+112
-16
lines changed

6 files changed

+112
-16
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4663,6 +4663,7 @@ TIP: See <<tip-assign-all-parts>>.
46634663

46644664
This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.
46654665

4666+
[[listener-error-handlers]]
46664667
===== Listener Error Handlers
46674668

46684669
Starting with version 2.0, the `@KafkaListener` annotation has a new attribute: `errorHandler`.
@@ -4686,6 +4687,26 @@ You have access to the spring-messaging `Message<?>` object produced by the mess
46864687
The error handler can throw the original or a new exception, which is thrown to the container.
46874688
Anything returned by the error handler is ignored.
46884689

4690+
Starting with version 2.7, you can set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>` in the `KafkaHeaders.RAW_DATA` header.
4691+
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
4692+
It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.
4693+
4694+
====
4695+
[source, java]
4696+
----
4697+
@Bean
4698+
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
4699+
return (msg, ex) -> {
4700+
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
4701+
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
4702+
return "FAILED";
4703+
}
4704+
throw ex;
4705+
};
4706+
}
4707+
----
4708+
====
4709+
46894710
It has a sub-interface (`ConsumerAwareListenerErrorHandler`) that has access to the consumer object, through the following method:
46904711

46914712
====

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ See <<message-listener-container>> for more information.
3535
You can now validate the payload parameter of `@KafkaHandler` methods (class-level listeners).
3636
See <<kafka-validation>> for more information.
3737

38+
You can now set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>`.
39+
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
40+
See <<listener-error-handlers>> for more information.
41+
3842
[[x27-dlt]]
3943
==== `DeadLetterPublishingRecover` Changes
4044

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -71,6 +71,8 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
7171

7272
private KafkaHeaderMapper headerMapper;
7373

74+
private boolean rawRecordHeader;
75+
7476
/**
7577
* Create an instance that does not convert the record values.
7678
*/
@@ -123,6 +125,16 @@ public RecordMessageConverter getRecordMessageConverter() {
123125
return this.recordConverter;
124126
}
125127

128+
/**
129+
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
130+
* {@link KafkaHeaders#RAW_DATA}.
131+
* @param rawRecordHeader true to add the header.
132+
* @since 2.7
133+
*/
134+
public void setRawRecordHeader(boolean rawRecordHeader) {
135+
this.rawRecordHeader = rawRecordHeader;
136+
}
137+
126138
@Override
127139
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
128140
Consumer<?, ?> consumer, Type type) {
@@ -140,12 +152,16 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
140152
List<Long> timestamps = new ArrayList<>();
141153
List<Map<String, Object>> convertedHeaders = new ArrayList<>();
142154
List<Headers> natives = new ArrayList<>();
155+
List<ConsumerRecord<?, ?>> raws = new ArrayList<>();
143156
if (this.headerMapper != null) {
144157
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
145158
}
146159
else {
147160
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, natives);
148161
}
162+
if (this.rawRecordHeader) {
163+
rawHeaders.put(KafkaHeaders.RAW_DATA, raws);
164+
}
149165
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
150166
timestamps);
151167

@@ -177,6 +193,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
177193
}
178194
natives.add(record.headers());
179195
}
196+
if (this.rawRecordHeader) {
197+
raws.add(record);
198+
}
180199
}
181200
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
182201
}

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,8 @@ public class MessagingMessageConverter implements RecordMessageConverter {
6262

6363
private KafkaHeaderMapper headerMapper;
6464

65+
private boolean rawRecordHeader;
66+
6567
public MessagingMessageConverter() {
6668
if (JacksonPresent.isJackson2Present()) {
6769
this.headerMapper = new DefaultKafkaHeaderMapper();
@@ -98,6 +100,16 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
98100
this.headerMapper = headerMapper;
99101
}
100102

103+
/**
104+
* Set to true to add the raw {@link ConsumerRecord} as a header
105+
* {@link KafkaHeaders#RAW_DATA}.
106+
* @param rawRecordHeader true to add the header.
107+
* @since 2.7
108+
*/
109+
public void setRawRecordHeader(boolean rawRecordHeader) {
110+
this.rawRecordHeader = rawRecordHeader;
111+
}
112+
101113
@Override
102114
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
103115
Type type) {
@@ -119,7 +131,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
119131
String ttName = record.timestampType() != null ? record.timestampType().name() : null;
120132
commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
121133
record.offset(), ttName, record.timestamp());
122-
134+
if (this.rawRecordHeader) {
135+
rawHeaders.put(KafkaHeaders.RAW_DATA, record);
136+
}
123137
return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders);
124138
}
125139

spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,13 +44,14 @@
4444
/**
4545
* @author Biju Kunjummen
4646
* @author Artem Bilan
47+
* @author Gary Russell
4748
*
4849
* @since 1.3
4950
*/
5051
public class BatchMessageConverterTests {
5152

5253
@Test
53-
public void testBatchConverters() {
54+
void testBatchConverters() {
5455
BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
5556

5657
MessageHeaders headers = testGuts(batchMessageConverter);
@@ -61,10 +62,11 @@ public void testBatchConverters() {
6162
Map<String, Object> map = converted.get(0);
6263
assertThat(map).hasSize(1);
6364
assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar");
65+
assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNull();
6466
}
6567

6668
@Test
67-
public void testNoMapper() {
69+
void testNoMapper() {
6870
BatchMessagingMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
6971
batchMessageConverter.setHeaderMapper(null);
7072

@@ -79,16 +81,26 @@ public void testNoMapper() {
7981
assertThat(new String(next.value())).isEqualTo("bar");
8082
}
8183

84+
@Test
85+
void raw() {
86+
BatchMessagingMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
87+
batchMessageConverter.setRawRecordHeader(true);
88+
MessageHeaders headers = testGuts(batchMessageConverter);
89+
@SuppressWarnings("unchecked")
90+
List<Map<String, Object>> converted = (List<Map<String, Object>>) headers
91+
.get(KafkaHeaders.BATCH_CONVERTED_HEADERS);
92+
assertThat(converted).hasSize(3);
93+
Map<String, Object> map = converted.get(0);
94+
assertThat(map).hasSize(1);
95+
assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar");
96+
@SuppressWarnings("unchecked")
97+
List<ConsumerRecord<?, ?>> rawHeader = headers.get(KafkaHeaders.RAW_DATA, List.class);
98+
assertThat(rawHeader).extracting(rec -> (String) rec.value())
99+
.containsExactly("value1", "value2", "value3");
100+
}
101+
82102
private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) {
83-
Header header = new RecordHeader("foo", "bar".getBytes());
84-
Headers kHeaders = new RecordHeaders(new Header[] { header });
85-
List<ConsumerRecord<?, ?>> consumerRecords = new ArrayList<>();
86-
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 1, 1487694048607L,
87-
TimestampType.CREATE_TIME, 123L, 2, 3, "key1", "value1", kHeaders));
88-
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 2, 1487694048608L,
89-
TimestampType.CREATE_TIME, 123L, 2, 3, "key2", "value2", kHeaders));
90-
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 3, 1487694048609L,
91-
TimestampType.CREATE_TIME, 123L, 2, 3, "key3", "value3", kHeaders));
103+
List<ConsumerRecord<?, ?>> consumerRecords = recordList();
92104

93105

94106
Acknowledgment ack = mock(Acknowledgment.class);
@@ -118,6 +130,19 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) {
118130
return headers;
119131
}
120132

133+
private List<ConsumerRecord<?, ?>> recordList() {
134+
Header header = new RecordHeader("foo", "bar".getBytes());
135+
Headers kHeaders = new RecordHeaders(new Header[] { header });
136+
List<ConsumerRecord<?, ?>> consumerRecords = new ArrayList<>();
137+
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 1, 1487694048607L,
138+
TimestampType.CREATE_TIME, 123L, 2, 3, "key1", "value1", kHeaders));
139+
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 2, 1487694048608L,
140+
TimestampType.CREATE_TIME, 123L, 2, 3, "key2", "value2", kHeaders));
141+
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 3, 1487694048609L,
142+
TimestampType.CREATE_TIME, 123L, 2, 3, "key3", "value3", kHeaders));
143+
return consumerRecords;
144+
}
145+
121146
@SuppressWarnings("unchecked")
122147
@Test
123148
public void missingHeaders() {

spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ void missingHeaders() {
4141
assertThat(message.getPayload()).isEqualTo("baz");
4242
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo");
4343
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar");
44+
assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isNull();
4445
}
4546

4647
@Test
@@ -53,4 +54,16 @@ void dontMapNullKey() {
5354
assertThat(message.getHeaders().containsKey(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isFalse();
5455
}
5556

57+
@Test
58+
void raw() {
59+
MessagingMessageConverter converter = new MessagingMessageConverter();
60+
converter.setRawRecordHeader(true);
61+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz");
62+
Message<?> message = converter.toMessage(record, null, null, null);
63+
assertThat(message.getPayload()).isEqualTo("baz");
64+
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo");
65+
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar");
66+
assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isSameAs(record);
67+
}
68+
5669
}

0 commit comments

Comments
 (0)