Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,61 @@ MessagingMessageConverter converter() {

If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template.

[[multi-value-header]]
== Support multi-value header

Spring for Apache Kafka 4.0 supports multi-value header where same logical header key appears more than once in a Kafka record.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we say Starting with 4.0,. Otherwise it sounds like in 5.0 there won’t be such a support 😜.
Plus no need to mention our project name in every sentence since it is kinda obvious that this doc is about Spring for Apache Kafka.


By default, when multiple headers share the same name, the `HeaderMapper` treats them as a single value and serialises the collection to JSON.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence does not reflect reality. The message headers abstraction doesn’t have multi-value headers notion.
You probably talk about a collection header which just serialized to a single Kafka header as JSON.


* **Producer side:** `DefaultKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it.
* **Consumer side:** the mapper exposes the header as a single value—the **last occurrence wins**; earlier duplicates are silently discarded.

Preserving each individual header requires explicit registration of patterns that designate the header as multi‑valued.

`DefaultKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names.

[source, java]
----
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();

// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");

// Wildcard patterns for test-multi-value1, test-multi-value2
mapper.setMultiValueHeaderPatterns("test-multi-*");
----

Any header whose name matches one of the supplied patterns is

* **Producer side:** written as separate Kafka headers, one per element.
* **Consumer side:** reconstructed as a `List<?>` and returned unchanged to the application.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure about this “unchanged” statement.


[NOTE]
====
Regular expressions are *not* supported; only the `*` wildcard is allowed in simple patterns—supporting direct equality and forms such as:

- `xxx*`
- `*xxx`
- `*xxx*`
- `xxx*yyy`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a value extracting it into a list section. Feels like continuation of the previous sentence is enough:

... and forms such as:  `xxx*`, `*xxx`, `*xxx*`, `xxx*yyy`.

====

[IMPORTANT]
====
All elements collected under the same multi‑value header **must be of the same Java type**.
Mixing, for example, `String` and `byte[]` values under a single header key is not supported and will lead to a conversion error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true only if producer side is our DefaultKafkaHeaderMapper when each item of the collection header is JSON-seriailized. If producer is some other framework, then we cannot claim this, since there won't be any dedicated type for those items, so everything is going come back to the consumer application as List<byte[]>.

====

The mapper is used by multiple components (`KafkaTemplate`, `MessagingMessageConverter`, lister container factories, etc.).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo with a lister word .

Define and expose it as a Spring bean wherever it makes sense for your application:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t see a reason in this paragraph . It is kinda obvious from Spring docs that most components have to be declared as beans.
Plus this sentence is personalized 😜


[source,java]
----
@Bean
public DefaultKafkaHeaderMapper multiValueHeaderMapper() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setMultiValueHeaderPatterns("test-multi-*");
return mapper;
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ Several deprecated items have been removed:

Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].
For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs].

[[x40-multi-value-header]]
=== Support multi-value header

Spring for Apache Kafka 4.0 supports multi-value header for Kafka Record.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t need to repeat version since the whole chapter is about 4.0 news. Better to say what class(es) got a new option and then link to the target chapter.

More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header].
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {

private final List<HeaderMatcher> matchers = new ArrayList<>();

private final List<HeaderMatcher> multiValueHeaderMatchers = new ArrayList<>();

private final Map<String, Boolean> rawMappedHeaders = new HashMap<>();

{
Expand Down Expand Up @@ -191,6 +193,18 @@ public void addRawMappedHeader(String name, boolean toString) {
this.rawMappedHeaders.put(name, toString);
}

/**
* Add patterns for matching multi-value headers under the same key.
* @param patterns the patterns for header.
* @since 4.0
*/
public void setMultiValueHeaderPatterns(String ... patterns) {
this.multiValueHeaderMatchers.addAll(Arrays
.stream(patterns)
.map(SimplePatternBasedHeaderMatcher::new)
.toList());
}

protected boolean matches(String header, Object value) {
if (matches(header)) {
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL))
Expand Down Expand Up @@ -251,6 +265,40 @@ protected Object headerValueToAddOut(String key, Object value) {
return valueToAdd;
}

/**
* Check whether the header value should be mapped to multiple values.
* @param headerName the header name.
* @return True for multiple values at the same key.
* @since 4.0
*/
protected boolean doesMatchMultiValueHeader(String headerName) {
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) {
if (headerMatcher.matchHeader(headerName)) {
return true;
}
}
return false;
}

/**
* Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}.
* @param headerName the header name.
* @param header the header instance.
* @param headers the target headers.
* @since 4.0
*/
protected void fromUserHeader(String headerName, Header header, final Map<String, Object> headers) {
if (!doesMatchMultiValueHeader(headerName)) {
headers.put(headerName, headerValueToAddIn(header));
}
else {
@SuppressWarnings("unchecked")
List<Object> headerValues = (List<Object>)
headers.computeIfAbsent(headerName, key -> new ArrayList<>());
headerValues.add(headerValueToAddIn(header));
}
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Nullable
private byte[] mapRawOut(String header, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Sanghyoek An
*
* @since 1.3
*
Expand Down Expand Up @@ -266,31 +267,17 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
final ObjectMapper headerObjectMapper = getObjectMapper();
headers.forEach((key, rawValue) -> {
if (matches(key, rawValue)) {
Object valueToAdd = headerValueToAddOut(key, rawValue);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
}
else {
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(key, className);
if (doesMatchMultiValueHeader(key)) {
if (rawValue instanceof Iterable<?> valuesToMap) {
valuesToMap.forEach(o -> fromHeader(key, o, jsonHeaders, headerObjectMapper, target));
}
catch (Exception e) {
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
else {
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target);
}
}
else {
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target);
}
}
});
if (!jsonHeaders.isEmpty()) {
Expand Down Expand Up @@ -324,12 +311,44 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
populateJsonValueHeader(header, requestedType, headers);
}
else {
headers.put(headerName, headerValueToAddIn(header));
fromUserHeader(headerName, header, headers);
}
}
});
}

private void fromHeader(String key, Object rawValue, Map<String, String> jsonHeaders,
ObjectMapper headerObjectMapper, Headers target) {

Object valueToAdd = headerValueToAddOut(key, rawValue);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
final byte[] calculatedValue;
if (!encodeToJson && valueToAdd instanceof String) {
calculatedValue = ((String) valueToAdd).getBytes(getCharset());
}
else {
calculatedValue = headerObjectMapper.writeValueAsBytes(valueToAdd);
}
target.add(new RecordHeader(key, calculatedValue));
jsonHeaders.putIfAbsent(key, className);
}
catch (Exception e) {
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
}
}
}

private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
Class<?> type = Object.class;
boolean trusted = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
* The exceptions are correlation and reply headers for request/reply
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.1.3
*
*/
Expand Down Expand Up @@ -94,27 +95,40 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte
public void fromHeaders(MessageHeaders headers, Headers target) {
headers.forEach((key, value) -> {
if (!NEVER.contains(key)) {
Object valueToAdd = headerValueToAddOut(key, value);
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
if (doesMatchMultiValueHeader(key)) {
if (value instanceof Iterable<?> valuesToMap) {
valuesToMap.forEach(o -> fromHeader(key, o, target));
}
else {
fromHeader(key, value, target);
}
}
else {
fromHeader(key, value, target);
}
}
});
}

@Override
public void toHeaders(Headers source, Map<String, Object> target) {
public void toHeaders(Headers source, Map<String, Object> headers) {
source.forEach(header -> {
String headerName = header.key();
if (matchesForInbound(headerName)) {
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(headerName, headerValueToAddIn(header));
fromUserHeader(headerName, header, headers);
}
}
});
}

private void fromHeader(String key, Object value, Headers target) {
if (headerValueToAddOut(key, value) instanceof byte[] valueToAdd && matches(key, valueToAdd)) {
target.add(new RecordHeader(key, valueToAdd));
}
}

}
Loading