Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -79,6 +80,12 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {

private Charset charset = StandardCharsets.UTF_8;

private final List<HeaderMatcher> matchersForListValue = new ArrayList<>();

private final Set<String> cachedHeadersForListValue = new LinkedHashSet<>();

private final Set<String> cachedHeadersForSingleValue = new LinkedHashSet<>();
Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline comment;

I added two caches for single-value and multi-value to optimize matching performance.
I assume that headers in ConsumerRecord will have lower cardinality.
From this assume, we can save our CPU usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is OK, but not perfect . It might be good to compare some performance and make sure it worth to sacrifice memory for these caches.

another optimization could be with eviction policy to avoid the case of “too many headers” and don’t keep too old headers in the cache.

with that in mind , I would suggest to consider caching as a separate bonus task.
It doesn’t feel crucial for the big picture in hands.


/**
* Construct a mapper that will match the supplied patterns (outbound) and all headers
* (inbound). For outbound mapping, certain internal framework headers are never
Expand All @@ -97,6 +104,20 @@ public AbstractKafkaHeaderMapper(String... patterns) {
* @param patterns the patterns.
*/
protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
this(outbound, new ArrayList<>(), patterns);
}

/**
* Construct a mapper that will match the supplied patterns (outbound) and all headers
* (inbound). For outbound mapping, certain internal framework headers are never
* mapped. For inbound mapping, Headers that match the pattern specified in
* {@code patternsForListValue} will be appended to the values under the same key.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blank lines in method Javadocs, please.

* @param outbound true for an outbound mapper.
* @param patternsForListValue the patterns for multiple values at the same key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of Apache Kafka, it is a multi-value headers. Can we follow that style instead ?
I mean the Kafka requirements first and we should map to its concepts even if internally it is a List for us.
so, my suggestion is to rename this property to something like multiValuePatterns.
Remember, this is an Open Source? So, code is visible and has to be as clear as possible?

Plus, why bother constructor?
This is an optional feature, so has to be just a setter .
My requirements are to expose via constructor, if we cannot live without that property.
This way the setter for new property could be a varargs even if you set into a Set internally .

* @param patterns the patterns.
*/
protected AbstractKafkaHeaderMapper(boolean outbound, List<String> patternsForListValue, String... patterns) {
Assert.notNull(patterns, "'patterns' must not be null");
this.outbound = outbound;
if (outbound) {
Expand All @@ -123,6 +144,11 @@ protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
for (String pattern : patterns) {
this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern));
}

for (String patternForListValue : patternsForListValue) {
this.matchersForListValue.add(new SimplePatternBasedHeaderMatcher(patternForListValue));
}

}

/**
Expand Down Expand Up @@ -287,6 +313,34 @@ private String mapRawIn(String header, byte[] value) {
return null;
}

/**
* Check whether the header value should be mapped to multiple values.
* @param headerName the header name.
* @return True for multiple values at the same key.
*/
protected boolean isHeaderForListValue(String headerName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be mapAsMultiValue()?
@sobychacko , what’s your idea for naming here? 🥹

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is to judge whether header is for multi-value. so, return type is boolean and there is no mapping logic.
So, I think it would be better to keep IsXXX or DoesXXX.
WDYT?

if (this.matchersForListValue.isEmpty()) {
return false;
}

if (this.cachedHeadersForSingleValue.contains(headerName)) {
return false;
}

if (this.cachedHeadersForListValue.contains(headerName)) {
return true;
}

for (HeaderMatcher headerMatcher : this.matchersForListValue) {
if (headerMatcher.matchHeader(headerName)) {
this.cachedHeadersForListValue.add(headerName);
return true;
}
}
this.cachedHeadersForSingleValue.add(headerName);
return false;
}

/**
* A matcher for headers.
* @since 2.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -48,6 +49,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 1.3
*
Expand Down Expand Up @@ -102,6 +104,27 @@ public DefaultKafkaHeaderMapper() {
this(JacksonUtils.enhancedObjectMapper());
}

/**
* Construct an instance with the default object mapper and default header patterns
* for outbound headers and default header patterns for inbound multi-value headers;
* all inbound headers are mapped. The default pattern list is
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
* Headers that match the pattern specified in {@code patternsForListValue} will be
* appended to the values under the same key.
*
* @param patternsForListValue the patterns for multiple values at the same key.
* @see #DefaultKafkaHeaderMapper(ObjectMapper)
*/
public DefaultKafkaHeaderMapper(List<String> patternsForListValue) {
this(JacksonUtils.enhancedObjectMapper(),
patternsForListValue,
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
"*");
}

/**
* Construct an instance with the provided object mapper and default header patterns
* for outbound headers; all inbound headers are mapped. The patterns are applied in
Expand Down Expand Up @@ -148,11 +171,32 @@ public DefaultKafkaHeaderMapper(String... patterns) {
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
this(true, objectMapper, patterns);
this(true, objectMapper, new ArrayList<>(), patterns);
}

/**
* Construct an instance with the provided object mapper and the provided header
* patterns for outbound headers; all inbound headers are mapped. The patterns are
* applied in order, stopping on the first match (positive or negative). Patterns are
* negated by preceding them with "!". The patterns will replace the default patterns;
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
* represent data in consumer/producer records.
* @param objectMapper the object mapper.
* @param patternsForListValue the patterns for multiple values at the same key.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, List<String> patternsForListValue, String... patterns) {
this(true, objectMapper, patternsForListValue, patterns);
}

private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, String... patterns) {
super(outbound, patterns);
this(outbound, objectMapper, new ArrayList<>(), patterns);
}

private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, List<String> patternsForListValue, String... patterns) {
super(outbound, patternsForListValue, patterns);
Assert.notNull(objectMapper, "'objectMapper' must not be null");
Assert.noNullElements(patterns, "'patterns' must not have null elements");
this.objectMapper = objectMapper;
Expand Down Expand Up @@ -324,12 +368,39 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
populateJsonValueHeader(header, requestedType, headers);
}
else {
headers.put(headerName, headerValueToAddIn(header));
handleHeader(headerName, header, headers);
}
}
});
}

/**
* Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}.
* @param headerName the header name.
* @param header the header instance.
* @param headers the target headers.
* @since 4.0.0
*/

protected void handleHeader(String headerName, Header header, final Map<String, Object> headers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mapping from Kafka headers, So, I believe this name is better in this context: fromUserHeader.
Since standard headers have been already mapped before.

if (!this.isHeaderForListValue(headerName)) {
headers.put(headerName, headerValueToAddIn(header));
}
else {
Object values = headers.getOrDefault(headerName, new ArrayList<>());

if (values instanceof List) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully sure in this logic.
Right now you mean: if there is a header with requested name and it is not a list, then we override its value.
Is it really expected?
Why we just cannot always override withe list since this is exactly what is asked by the respective pattern?
Otherwise I feel like we cannot override since the value is already there.

@SuppressWarnings("unchecked")
List<Object> castedValues = (List<Object>) values;
castedValues.add(headerValueToAddIn(header));
headers.put(headerName, castedValues);
}
else {
headers.put(headerName, headerValueToAddIn(header));
}
}
}

private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
Class<?> type = Object.class;
boolean trusted = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,9 @@
package org.springframework.kafka.support;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -35,6 +37,8 @@
* The exceptions are correlation and reply headers for request/reply
*
* @author Gary Russell
* @author Sanghyeok An
*
* @since 2.1.3
*
*/
Expand All @@ -61,6 +65,25 @@ public SimpleKafkaHeaderMapper() {
"*");
}

/**
* Construct an instance with the default object mapper and default header patterns
* for outbound headers default header patterns for inbound multi-value headers;
* all inbound headers are mapped. The default pattern list is
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
* Headers that match the pattern specified in {@code patternsForListValue} will be
* appended to the values under the same key.
*
* @param patternsForMultiValue the patterns for multiple values at the same key.
*/
public SimpleKafkaHeaderMapper(List<String> patternsForMultiValue) {
this(true, patternsForMultiValue,
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
"*");
}

/**
* Construct an instance with a default object mapper and the provided header patterns
* for outbound headers; all inbound headers are mapped. The patterns are applied in
Expand All @@ -77,7 +100,11 @@ public SimpleKafkaHeaderMapper(String... patterns) {
}

private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {
super(outbound, patterns);
this(outbound, new ArrayList<>(), patterns);
}

private SimpleKafkaHeaderMapper(boolean outbound, List<String> patternsForListValue, String... patterns) {
super(outbound, patternsForListValue, patterns);
}

/**
Expand Down Expand Up @@ -111,7 +138,21 @@ public void toHeaders(Headers source, Map<String, Object> target) {
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(headerName, headerValueToAddIn(header));
if (!this.isHeaderForListValue(headerName)) {
target.put(headerName, headerValueToAddIn(header));
}
else {
Object values = target.getOrDefault(headerName, new ArrayList<>());
if (values instanceof List) {
@SuppressWarnings("unchecked")
List<Object> castedValues = (List<Object>) values;
castedValues.add(headerValueToAddIn(header));
target.put(headerName, castedValues);
}
else {
target.put(headerName, headerValueToAddIn(header));
}
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* @author Gary Russell
* @author Dariusz Szablinski
* @author Biju Kunjummen
* @author Sanghyeok An
*/
public class MessagingMessageConverter implements RecordMessageConverter {

Expand All @@ -83,6 +84,15 @@ public MessagingMessageConverter() {
this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class));
}

/**
* Construct an instance that uses given HeaderMapper.
* @param headerMapper the Header mapper.
* @since 4.0.0
*/
public MessagingMessageConverter(KafkaHeaderMapper headerMapper) {
this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class), headerMapper);
}

/**
* Construct an instance that uses the supplied partition provider function. The
* function can return null to delegate the partition selection to the kafka client.
Expand All @@ -100,6 +110,18 @@ public MessagingMessageConverter(Function<Message<?>, @Nullable Integer> partiti
this.partitionProvider = partitionProvider;
}

/**
* Construct an instance that uses the supplied partition provider function and given HeaderMapper.
* @param partitionProvider the provider.
* @param headerMapper the Header mapper.
* @since 4.0.0
*/
public MessagingMessageConverter(Function<Message<?>, @Nullable Integer> partitionProvider, KafkaHeaderMapper headerMapper) {
Assert.notNull(partitionProvider, "'partitionProvider' cannot be null");
this.headerMapper = headerMapper;
this.partitionProvider = partitionProvider;
}

/**
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
* will try to use a default value. By default set to {@code false}.
Expand Down
Loading