-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-3067: Spring Kafka support multiple headers with same key. #3874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
c7e5e09
42010c6
dfdfb8d
865b26a
019efb7
065bedc
6aed15b
462fe25
b3f4374
34e6860
dd24248
13267dc
c9c360e
4a762bd
b5375d4
c945dd5
07a49df
732ae6a
585f356
667f76c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { | |
|
|
||
| private final List<HeaderMatcher> matchers = new ArrayList<>(); | ||
|
|
||
| private final List<HeaderMatcher> headerMatchersForMultiValue = new ArrayList<>(); | ||
|
|
||
| private final Map<String, Boolean> rawMappedHeaders = new HashMap<>(); | ||
|
|
||
| { | ||
|
|
@@ -191,6 +193,16 @@ public void addRawMappedHeader(String name, boolean toString) { | |
| this.rawMappedHeaders.put(name, toString); | ||
| } | ||
|
|
||
| /** | ||
| * Add patterns for matching multi-value headers under the same key. | ||
| * @param patterns the patterns for header. | ||
| */ | ||
| public void addHeaderPatternsForMultiValue(String ... patterns) { | ||
|
||
| for (String pattern : patterns) { | ||
| this.headerMatchersForMultiValue.add(new SimplePatternBasedHeaderMatcher(pattern)); | ||
|
||
| } | ||
| } | ||
|
|
||
| protected boolean matches(String header, Object value) { | ||
| if (matches(header)) { | ||
| if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL)) | ||
|
|
@@ -251,6 +263,20 @@ protected Object headerValueToAddOut(String key, Object value) { | |
| return valueToAdd; | ||
| } | ||
|
|
||
| /** | ||
| * Check whether the header value should be mapped to multiple values. | ||
| * @param headerName the header name. | ||
| * @return True for multiple values at the same key. | ||
| */ | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| protected boolean doesMatchMultiValueHeader(String headerName) { | ||
| for (HeaderMatcher headerMatcher : this.headerMatchersForMultiValue) { | ||
| if (headerMatcher.matchHeader(headerName)) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @SuppressWarnings("NullAway") // Dataflow analysis limitation | ||
| @Nullable | ||
| private byte[] mapRawOut(String header, Object value) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
|
|
@@ -48,6 +49,7 @@ | |
| * @author Gary Russell | ||
| * @author Artem Bilan | ||
| * @author Soby Chacko | ||
| * @author Sanghyoek An | ||
| * | ||
| * @since 1.3 | ||
| * | ||
|
|
@@ -324,12 +326,37 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { | |
| populateJsonValueHeader(header, requestedType, headers); | ||
| } | ||
| else { | ||
| headers.put(headerName, headerValueToAddIn(header)); | ||
| handleHeader(headerName, header, headers); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}. | ||
| * @param headerName the header name. | ||
| * @param header the header instance. | ||
| * @param headers the target headers. | ||
| * @since 4.0.0 | ||
| */ | ||
| protected void handleHeader(String headerName, Header header, final Map<String, Object> headers) { | ||
|
||
| if (!this.doesMatchMultiValueHeader(headerName)) { | ||
|
||
| headers.put(headerName, headerValueToAddIn(header)); | ||
| } | ||
| else { | ||
| Object values = headers.getOrDefault(headerName, new ArrayList<>()); | ||
| if (values instanceof List) { | ||
|
||
| @SuppressWarnings("unchecked") | ||
| List<Object> castedValues = (List<Object>) values; | ||
| castedValues.add(headerValueToAddIn(header)); | ||
| headers.put(headerName, castedValues); | ||
| } | ||
| else { | ||
| headers.put(headerName, headerValueToAddIn(header)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) { | ||
| Class<?> type = Object.class; | ||
| boolean trusted = false; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.