Skip to content

Commit 07a49df

Browse files
Addressing PR review
Signed-off-by: Sanghyeok An <[email protected]>
1 parent c945dd5 commit 07a49df

File tree

3 files changed

+141
-75
lines changed

3 files changed

+141
-75
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,39 @@ By default, when multiple headers share the same name, the `HeaderMapper` treats
189189
* **Producer side:** `DefaultKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it.
190190
* **Consumer side:** the mapper exposes the header as a single value—the **last occurrence wins**; earlier duplicates are silently discarded.
191191

192+
For example, on the producer side:
193+
[source, java]
194+
----
195+
// For Producer Side
196+
Message<String> message = MessageBuilder
197+
.withPayload("test-multi-value-header")
198+
.setHeader("test-multi-value-header", List.of("test-value1", "test-value2"))
199+
.build();
200+
201+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
202+
RecordHeaders mappedHeaders = new RecordHeaders();
203+
headerMapper.fromHeaders(message.getHeaders(), mappedHeaders);
204+
205+
RecordHeader expectedHeader = new RecordHeader("test-multi-value-header", new byte[] {
206+
91, 34, 116, 101, 115, 116, 45, 118, 97, 108, 117, 101, 49, 34, 44, 34, 116, 101, 115, 116, 45,
207+
118, 97, 108, 117, 101, 50, 34, 93});
208+
assertThat(mappedHeaders.headers(multiValueHeader1).iterator().next()).isEqualTo(expectedHeader);
209+
----
210+
211+
For example, on the consumer side:
212+
[source, java]
213+
----
214+
// Consumer Side
215+
RecordHeaders recordHeaders = new RecordHeaders();
216+
recordHeaders.add("test-multi-value1", new byte[] { 0, 0, 0, 1 });
217+
recordHeaders.add("test-multi-value1", new byte[] { 0, 0, 0, 2 });
218+
219+
Map<String, Object> mappedHeaders = new HashMap<>();
220+
headerMapper.toHeaders(recordHeaders, mappedHeaders);
221+
222+
assertThat(headersMapped.get("test-multi-value1")).isEqualTo(new byte[] { 0, 0, 0, 2 });
223+
----
224+
192225
Preserving each individual header requires explicit registration of patterns that designate the header as multi‑valued.
193226

194227
`DefaultKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names.
@@ -224,16 +257,3 @@ Regular expressions are *not* supported; only the `*` wildcard is allowed in sim
224257
All elements collected under the same multi‑value header **must be of the same Java type**.
225258
Mixing, for example, `String` and `byte[]` values under a single header key is not supported and will lead to a conversion error.
226259
====
227-
228-
The mapper is used by multiple components (`KafkaTemplate`, `MessagingMessageConverter`, lister container factories, etc.).
229-
Define and expose it as a Spring bean wherever it makes sense for your application:
230-
231-
[source,java]
232-
----
233-
@Bean
234-
public DefaultKafkaHeaderMapper multiValueHeaderMapper() {
235-
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
236-
mapper.setMultiValueHeaderPatterns("test-multi-*");
237-
return mapper;
238-
}
239-
----

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,5 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
7474
[[x40-multi-value-header]]
7575
=== Support multi-value header
7676

77-
Spring for Apache Kafka 4.0 supports multi-value header for Kafka Record.
77+
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` supports multi-value header for Kafka Record.
7878
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header].

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 107 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.stream.StreamSupport;
2728

2829
import org.apache.kafka.common.header.Header;
2930
import org.apache.kafka.common.header.Headers;
@@ -338,86 +339,115 @@ void inboundJson() {
338339
void multiValueHeaderToTest() {
339340
// GIVEN
340341
String multiValueHeader1 = "test-multi-value1";
342+
byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 };
343+
byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 };
344+
byte[] multiValueHeader1Value3 = { 0, 0, 0, 2 };
345+
byte[] multiValueHeader1Value4 = { 0, 0, 0, 3 };
346+
341347
String multiValueHeader2 = "test-multi-value2";
342-
String multiValueWildeCardHeader1 = "test-wildcard-value1";
343-
String multiValueWildeCardHeader2 = "test-wildcard-value2";
348+
byte[] multiValueHeader2Value1 = { 0, 0, 0, 4 };
349+
byte[] multiValueHeader2Value2 = { 0, 0, 0, 5 };
350+
351+
String multiValueWildCardHeader1 = "test-wildcard-value1";
352+
byte[] multiValueWildCardHeader1Value1 = { 0, 0, 0, 6 };
353+
byte[] multiValueWildCardHeader1Value2 = { 0, 0, 0, 7 };
354+
355+
String multiValueWildCardHeader2 = "test-wildcard-value2";
356+
byte[] multiValueWildCardHeader2Value1 = { 0, 0, 0, 8 };
357+
byte[] multiValueWildCardHeader2Value2 = { 0, 0, 0, 9 };
358+
344359
String singleValueHeader = "test-single-value1";
360+
byte[] singleValueHeaderValue = { 0, 0, 0, 6 };
345361

346362
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
347363
mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2, "test-wildcard-*");
348364

349365
Headers rawHeaders = new RecordHeaders();
350-
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
351-
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, new byte[] { 0, 0, 0, 1 });
352-
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, new byte[] { 0, 0, 0, 5 });
353-
rawHeaders.add(singleValueHeader, new byte[] { 0, 0, 0, 6 });
354366

355-
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 0 });
356-
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 1 });
357-
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 2 });
358-
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 3 });
367+
byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 };
368+
byte[] originalOffsetHeaderValue = { 0, 0, 0, 1 };
369+
byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 };
359370

360-
rawHeaders.add(multiValueHeader2, new byte[] { 0, 0, 0, 4 });
361-
rawHeaders.add(multiValueHeader2, new byte[] { 0, 0, 0, 5 });
371+
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue);
372+
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue);
373+
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues);
374+
rawHeaders.add(singleValueHeader, singleValueHeaderValue);
362375

363-
rawHeaders.add(multiValueWildeCardHeader1, new byte[] { 0, 0, 0, 6 });
364-
rawHeaders.add(multiValueWildeCardHeader1, new byte[] { 0, 0, 0, 7 });
365-
rawHeaders.add(multiValueWildeCardHeader2, new byte[] { 0, 0, 0, 8 });
366-
rawHeaders.add(multiValueWildeCardHeader2, new byte[] { 0, 0, 0, 9 });
376+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value1);
377+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value2);
378+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value3);
379+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value4);
380+
381+
rawHeaders.add(multiValueHeader2, multiValueHeader2Value1);
382+
rawHeaders.add(multiValueHeader2, multiValueHeader2Value2);
383+
384+
rawHeaders.add(multiValueWildCardHeader1, multiValueWildCardHeader1Value1);
385+
rawHeaders.add(multiValueWildCardHeader1, multiValueWildCardHeader1Value2);
386+
rawHeaders.add(multiValueWildCardHeader2, multiValueWildCardHeader2Value1);
387+
rawHeaders.add(multiValueWildCardHeader2, multiValueWildCardHeader2Value2);
367388

368389
// WHEN
369390
Map<String, Object> mappedHeaders = new HashMap<>();
370391
mapper.toHeaders(rawHeaders, mappedHeaders);
371392

372393
// THEN
373394
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1);
374-
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(new byte[] { 0, 0, 0, 1 });
375-
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(new byte[] { 0, 0, 0, 5 });
376-
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(new byte[] { 0, 0, 0, 6 });
377-
378-
@SuppressWarnings("unchecked")
379-
List<byte[]> multiHeader1Values = (List<byte[]>) mappedHeaders.get(multiValueHeader1);
380-
assertThat(multiHeader1Values).contains(new byte[] { 0, 0, 0, 0 },
381-
new byte[] { 0, 0, 0, 1 },
382-
new byte[] { 0, 0, 0, 2 },
383-
new byte[] { 0, 0, 0, 3 });
384-
385-
@SuppressWarnings("unchecked")
386-
List<byte[]> multiHeader2Values = (List<byte[]>) mappedHeaders.get(multiValueHeader2);
387-
assertThat(multiHeader2Values).contains(new byte[] { 0, 0, 0, 4 },
388-
new byte[] { 0, 0, 0, 5 });
389-
390-
@SuppressWarnings("unchecked")
391-
List<byte[]> multiValueWildeCardHeader1Values = (List<byte[]>) mappedHeaders.get(multiValueWildeCardHeader1);
392-
assertThat(multiValueWildeCardHeader1Values).contains(new byte[] { 0, 0, 0, 6 },
393-
new byte[] { 0, 0, 0, 7 });
394-
395-
@SuppressWarnings("unchecked")
396-
List<byte[]> multiValueWildeCardHeader2Values = (List<byte[]>) mappedHeaders.get(multiValueWildeCardHeader2);
397-
assertThat(multiValueWildeCardHeader2Values).contains(new byte[] { 0, 0, 0, 8 },
398-
new byte[] { 0, 0, 0, 9 });
395+
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue);
396+
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues);
397+
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue);
398+
399+
assertValueOfMultiValueHeader(mappedHeaders, multiValueHeader1,
400+
multiValueHeader1Value1,
401+
multiValueHeader1Value2,
402+
multiValueHeader1Value3,
403+
multiValueHeader1Value4);
404+
405+
assertValueOfMultiValueHeader(mappedHeaders, multiValueHeader2,
406+
multiValueHeader2Value1,
407+
multiValueHeader2Value2);
408+
409+
assertValueOfMultiValueHeader(mappedHeaders, multiValueWildCardHeader1,
410+
multiValueWildCardHeader1Value1,
411+
multiValueWildCardHeader1Value2);
412+
413+
assertValueOfMultiValueHeader(mappedHeaders, multiValueWildCardHeader2,
414+
multiValueWildCardHeader2Value1,
415+
multiValueWildCardHeader2Value2);
399416
}
400417

401418
@Test
402419
void multiValueHeaderFromTest() {
403420
// GIVEN
404421
String multiValueHeader1 = "test-multi-value1";
422+
byte[] multiValueHeader1Value1 = { 0, 0, 0, 1 };
423+
byte[] multiValueHeader1Value2 = { 0, 0, 0, 2 };
424+
405425
String multiValueHeader2 = "test-multi-value2";
426+
byte[] multiValueHeader2Value1 = { 0, 0, 0, 3 };
427+
byte[] multiValueHeader2Value2 = { 0, 0, 0, 4 };
428+
406429
String multiValueHeader3 = "test-other-multi-value1";
430+
byte[] multiValueHeader3Value1 = { 0, 0, 0, 9 };
431+
byte[] multiValueHeader3Value2 = { 0, 0, 0, 10 };
432+
407433
String multiValueHeader4 = "test-prefix-match-multi";
434+
byte[] multiValueHeader4Value1 = { 0, 0, 0, 11 };
435+
byte[] multiValueHeader4Value2 = { 0, 0, 0, 12 };
436+
408437
String singleValueHeader = "test-single-value1";
438+
byte[] singleValueHeaderValue1 = { 0, 0, 0, 5 };
409439

410440
Message<String> message = MessageBuilder
411441
.withPayload("test-multi-value-header")
412-
.setHeader(multiValueHeader1, List.of(new byte[] { 0, 0, 0, 1 },
413-
new byte[] { 0, 0, 0, 2 }))
414-
.setHeader(multiValueHeader2, List.of(new byte[] { 0, 0, 0, 3 },
415-
new byte[] { 0, 0, 0, 4 }))
416-
.setHeader(multiValueHeader3, List.of(new byte[] { 0, 0, 0, 9 },
417-
new byte[] { 0, 0, 0, 10 }))
418-
.setHeader(multiValueHeader4, List.of(new byte[] { 0, 0, 0, 11 },
419-
new byte[] { 0, 0, 0, 12 }))
420-
.setHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
442+
.setHeader(multiValueHeader1, List.of(multiValueHeader1Value1,
443+
multiValueHeader1Value2))
444+
.setHeader(multiValueHeader2, List.of(multiValueHeader2Value1,
445+
multiValueHeader2Value2))
446+
.setHeader(multiValueHeader3, List.of(multiValueHeader3Value1,
447+
multiValueHeader3Value2))
448+
.setHeader(multiValueHeader4, List.of(multiValueHeader4Value1,
449+
multiValueHeader4Value2))
450+
.setHeader(singleValueHeader, singleValueHeaderValue1)
421451
.build();
422452

423453
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
@@ -430,17 +460,20 @@ void multiValueHeaderFromTest() {
430460
mapper.fromHeaders(message.getHeaders(), results);
431461

432462
// THEN
433-
assertThat(results).contains(
434-
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 1 }),
435-
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 2 }),
436-
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 3 }),
437-
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 4 }),
438-
new RecordHeader(multiValueHeader3, new byte[] { 0, 0, 0, 9 }),
439-
new RecordHeader(multiValueHeader3, new byte[] { 0, 0, 0, 10 }),
440-
new RecordHeader(multiValueHeader4, new byte[] { 0, 0, 0, 11 }),
441-
new RecordHeader(multiValueHeader4, new byte[] { 0, 0, 0, 12 }),
442-
new RecordHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
443-
);
463+
List<byte[]> multiValueHeader1Values = extractHeaderValues(results, multiValueHeader1);
464+
assertThat(multiValueHeader1Values).containsExactly(multiValueHeader1Value1, multiValueHeader1Value2);
465+
466+
List<byte[]> multiValueHeader2Values = extractHeaderValues(results, multiValueHeader2);
467+
assertThat(multiValueHeader2Values).containsExactly(multiValueHeader2Value1, multiValueHeader2Value2);
468+
469+
List<byte[]> multiValueHeader3Values = extractHeaderValues(results, multiValueHeader3);
470+
assertThat(multiValueHeader3Values).containsExactly(multiValueHeader3Value1, multiValueHeader3Value2);
471+
472+
List<byte[]> multiValueHeader4Values = extractHeaderValues(results, multiValueHeader4);
473+
assertThat(multiValueHeader4Values).containsExactly(multiValueHeader4Value1, multiValueHeader4Value2);
474+
475+
List<byte[]> singleValueHeaderValues = extractHeaderValues(results, singleValueHeader);
476+
assertThat(singleValueHeaderValues).containsExactly(singleValueHeaderValue1);
444477
}
445478

446479
@Test
@@ -489,6 +522,19 @@ void ensureNullHeaderValueHandledGraciously() {
489522
verify(mockHeader, never()).key();
490523
}
491524

525+
private List<byte[]> extractHeaderValues(Headers headers, String headerName) {
526+
return StreamSupport.stream(headers.headers(headerName).spliterator(), false)
527+
.map(Header::value)
528+
.toList();
529+
}
530+
531+
private void assertValueOfMultiValueHeader(Map<String, Object> mappedHeaders,
532+
String headerName, byte[]... expectedByteValues) {
533+
@SuppressWarnings("unchecked")
534+
List<byte[]> multiHeaderValues = (List<byte[]>) mappedHeaders.get(headerName);
535+
Arrays.stream(expectedByteValues).forEach(value -> assertThat(multiHeaderValues).contains(expectedByteValues));
536+
}
537+
492538
public static final class Foo {
493539

494540
private String bar = "bar";

0 commit comments

Comments
 (0)