Skip to content

Commit 42010c6

Browse files
Draft candidate
Signed-off-by: chickenchickenlove <[email protected]>
1 parent c7e5e09 commit 42010c6

File tree

4 files changed

+140
-16
lines changed

4 files changed

+140
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Arrays;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.LinkedHashSet;
2627
import java.util.List;
2728
import java.util.Locale;
2829
import java.util.Map;
@@ -79,6 +80,12 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
7980

8081
private Charset charset = StandardCharsets.UTF_8;
8182

83+
private final List<HeaderMatcher> matchersForListValue = new ArrayList<>();
84+
85+
private final Set<String> cachedHeadersForListValue = new LinkedHashSet<>();
86+
87+
private final Set<String> cachedHeadersForSingleValue = new LinkedHashSet<>();
88+
8289
/**
8390
* Construct a mapper that will match the supplied patterns (outbound) and all headers
8491
* (inbound). For outbound mapping, certain internal framework headers are never
@@ -97,6 +104,18 @@ public AbstractKafkaHeaderMapper(String... patterns) {
97104
* @param patterns the patterns.
98105
*/
99106
protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
107+
this(outbound, new ArrayList<>(), patterns);
108+
}
109+
110+
/**
111+
* Construct a mapper that will match the supplied patterns (outbound) and all headers
112+
* (inbound). For outbound mapping, certain internal framework headers are never
113+
* mapped.
114+
* @param outbound true for an outbound mapper.
115+
* @param patternsForListValue the patterns for multiple values at the same key.
116+
* @param patterns the patterns.
117+
*/
118+
protected AbstractKafkaHeaderMapper(boolean outbound, List<String> patternsForListValue, String... patterns) {
100119
Assert.notNull(patterns, "'patterns' must not be null");
101120
this.outbound = outbound;
102121
if (outbound) {
@@ -123,6 +142,11 @@ protected AbstractKafkaHeaderMapper(boolean outbound, String... patterns) {
123142
for (String pattern : patterns) {
124143
this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern));
125144
}
145+
146+
for (String patternForListValue : patternsForListValue) {
147+
this.matchersForListValue.add(new SimplePatternBasedHeaderMatcher(patternForListValue));
148+
}
149+
126150
}
127151

128152
/**
@@ -287,6 +311,34 @@ private String mapRawIn(String header, byte[] value) {
287311
return null;
288312
}
289313

314+
/**
315+
* Check whether the header value should be mapped to multiple values.
316+
* @param headerName the header name.
317+
* @return True for multiple values at the same key.
318+
*/
319+
protected boolean isHeaderForListValue(String headerName) {
320+
if (this.matchersForListValue.isEmpty()) {
321+
return false;
322+
}
323+
324+
if (this.cachedHeadersForSingleValue.contains(headerName)) {
325+
return false;
326+
}
327+
328+
if (this.cachedHeadersForListValue.contains(headerName)) {
329+
return true;
330+
}
331+
332+
for (HeaderMatcher headerMatcher : this.matchersForListValue) {
333+
if (headerMatcher.matchHeader(headerName)) {
334+
this.cachedHeadersForListValue.add(headerName);
335+
return true;
336+
}
337+
}
338+
this.cachedHeadersForSingleValue.add(headerName);
339+
return false;
340+
}
341+
290342
/**
291343
* A matcher for headers.
292344
* @since 2.3

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121
import java.nio.charset.StandardCharsets;
22+
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Collections;
2425
import java.util.HashMap;
@@ -103,6 +104,23 @@ public DefaultKafkaHeaderMapper() {
103104
this(JacksonUtils.enhancedObjectMapper());
104105
}
105106

107+
/**
108+
* Construct an instance with the default object mapper and default header patterns
109+
* for outbound headers; all inbound headers are mapped. The default pattern list is
110+
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
111+
* {@link KafkaHeaders} are never mapped as headers since they represent data in
112+
* consumer/producer records.
113+
* @see #DefaultKafkaHeaderMapper(ObjectMapper)
114+
* @param patternsForListValue
115+
*/
116+
public DefaultKafkaHeaderMapper(List<String> patternsForListValue) {
117+
this(JacksonUtils.enhancedObjectMapper(),
118+
patternsForListValue,
119+
"!" + MessageHeaders.ID,
120+
"!" + MessageHeaders.TIMESTAMP,
121+
"*");
122+
}
123+
106124
/**
107125
* Construct an instance with the provided object mapper and default header patterns
108126
* for outbound headers; all inbound headers are mapped. The patterns are applied in
@@ -149,11 +167,32 @@ public DefaultKafkaHeaderMapper(String... patterns) {
149167
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
150168
*/
151169
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) {
152-
this(true, objectMapper, patterns);
170+
this(true, objectMapper, new ArrayList<>(), patterns);
171+
}
172+
173+
/**
174+
* Construct an instance with the provided object mapper and the provided header
175+
* patterns for outbound headers; all inbound headers are mapped. The patterns are
176+
* applied in order, stopping on the first match (positive or negative). Patterns are
177+
* negated by preceding them with "!". The patterns will replace the default patterns;
178+
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
179+
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
180+
* represent data in consumer/producer records.
181+
* @param objectMapper the object mapper.
182+
* @param patternsForListValue the patterns for multiple values at the same key.
183+
* @param patterns the patterns.
184+
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
185+
*/
186+
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, List<String> patternsForListValue, String... patterns) {
187+
this(true, objectMapper, patternsForListValue, patterns);
153188
}
154189

155190
private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, String... patterns) {
156-
super(outbound, patterns);
191+
this(outbound, objectMapper, new ArrayList<>(), patterns);
192+
}
193+
194+
private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, List<String> patternsForListValue, String... patterns) {
195+
super(outbound, patternsForListValue, patterns);
157196
Assert.notNull(objectMapper, "'objectMapper' must not be null");
158197
Assert.noNullElements(patterns, "'patterns' must not have null elements");
159198
this.objectMapper = objectMapper;
@@ -340,7 +379,22 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
340379
*/
341380

342381
protected void handleHeader(String headerName, Header header, final Map<String, Object> headers) {
343-
headers.put(headerName, headerValueToAddIn(header));
382+
if (!this.isHeaderForListValue(headerName)) {
383+
headers.put(headerName, headerValueToAddIn(header));
384+
}
385+
else {
386+
Object values = headers.getOrDefault(headerName, new ArrayList<>());
387+
388+
if (values instanceof List) {
389+
@SuppressWarnings("unchecked")
390+
List<Object> castedValues = (List<Object>) values;
391+
castedValues.add(headerValueToAddIn(header));
392+
headers.put(headerName, castedValues);
393+
}
394+
else {
395+
headers.put(headerName, headerValueToAddIn(header));
396+
}
397+
}
344398
}
345399

346400
private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {

spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,9 @@
1717
package org.springframework.kafka.support;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
2021
import java.util.HashSet;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.Set;
2325

@@ -77,7 +79,11 @@ public SimpleKafkaHeaderMapper(String... patterns) {
7779
}
7880

7981
private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {
80-
super(outbound, patterns);
82+
this(outbound, new ArrayList<>(), patterns);
83+
}
84+
85+
private SimpleKafkaHeaderMapper(boolean outbound, List<String> patternsForListValue, String... patterns) {
86+
super(outbound, patternsForListValue, patterns);
8187
}
8288

8389
/**
@@ -111,7 +117,21 @@ public void toHeaders(Headers source, Map<String, Object> target) {
111117
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
112118
}
113119
else {
114-
target.put(headerName, headerValueToAddIn(header));
120+
if (!this.isHeaderForListValue(headerName)) {
121+
target.put(headerName, headerValueToAddIn(header));
122+
}
123+
else {
124+
Object values = target.getOrDefault(headerName, new ArrayList<>());
125+
if (values instanceof List) {
126+
@SuppressWarnings("unchecked")
127+
List<Object> castedValues = (List<Object>) values;
128+
castedValues.add(headerValueToAddIn(header));
129+
target.put(headerName, castedValues);
130+
}
131+
else {
132+
target.put(headerName, headerValueToAddIn(header));
133+
}
134+
}
115135
}
116136
}
117137
});

spring-kafka/src/test/java/org/springframework/kafka/support/MultiValueKafkaHeaderMapperTest.java renamed to spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperForMultiValueTest.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
4545
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
4646
import org.springframework.kafka.retrytopic.RetryTopicHeaders;
47-
import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.Config.MultiValueTestListener;
48-
import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.RetryTopicConfigurations.FirstTopicListener;
49-
import org.springframework.kafka.support.MultiValueKafkaHeaderMapperTest.RetryTopicConfigurations.MyCustomDltProcessor;
47+
import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTest.Config.MultiValueTestListener;
48+
import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTest.RetryTopicConfigurations.FirstTopicListener;
49+
import org.springframework.kafka.support.DefaultKafkaHeaderMapperForMultiValueTest.RetryTopicConfigurations.MyCustomDltProcessor;
5050
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5151
import org.springframework.kafka.test.EmbeddedKafkaBroker;
5252
import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -71,10 +71,10 @@
7171
@SpringJUnitConfig
7272
@DirtiesContext
7373
@EmbeddedKafka(partitions = 1, topics = {
74-
MultiValueKafkaHeaderMapperTest.TEST_TOPIC,
75-
MultiValueKafkaHeaderMapperTest.RETRY_TOPIC
74+
DefaultKafkaHeaderMapperForMultiValueTest.TEST_TOPIC,
75+
DefaultKafkaHeaderMapperForMultiValueTest.RETRY_TOPIC
7676
})
77-
class MultiValueKafkaHeaderMapperTest {
77+
class DefaultKafkaHeaderMapperForMultiValueTest {
7878

7979
public final static String TEST_TOPIC = "multi-value.tests";
8080

@@ -211,10 +211,8 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
211211
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
212212
new ConcurrentKafkaListenerContainerFactory<>();
213213

214-
MultiValueKafkaHeaderMapper headerMapper = new MultiValueKafkaHeaderMapper();
215-
216-
// Add Test
217-
headerMapper.addSingleValueHeader(SINGLE_VALUE_HEADER);
214+
// For Test
215+
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(List.of(MULTI_VALUE_HEADER1, MULTI_VALUE_HEADER2));
218216
MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper);
219217

220218
factory.setConsumerFactory(consumerFactory());

0 commit comments

Comments
 (0)