Skip to content

Commit 666fd5c

Browse files
garyrussellartembilan
authored andcommitted
GH-1012 Allow raw Strings in default header mapper
Resolves #1012 Add configuration to map string-valued headers as raw `byte[]` instead of adding to the map of json-mapped headers. * Polishing - PR Comments.
1 parent c56e8e7 commit 666fd5c

File tree

6 files changed

+383
-36
lines changed

6 files changed

+383
-36
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,16 +16,23 @@
1616

1717
package org.springframework.kafka.support;
1818

19+
import java.nio.charset.Charset;
20+
import java.nio.charset.StandardCharsets;
1921
import java.text.MessageFormat;
2022
import java.util.ArrayList;
2123
import java.util.Arrays;
24+
import java.util.HashMap;
2225
import java.util.List;
26+
import java.util.Map;
2327

2428
import org.apache.commons.logging.Log;
2529
import org.apache.commons.logging.LogFactory;
30+
import org.apache.kafka.common.header.Header;
2631

32+
import org.springframework.lang.Nullable;
2733
import org.springframework.messaging.MessageHeaders;
2834
import org.springframework.util.Assert;
35+
import org.springframework.util.ObjectUtils;
2936
import org.springframework.util.PatternMatchUtils;
3037

3138
/**
@@ -58,13 +65,62 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
5865

5966
protected final List<SimplePatternBasedHeaderMatcher> matchers = new ArrayList<>(NEVER_MAPPED); // NOSONAR
6067

68+
private final Map<String, Boolean> rawMappedtHeaders = new HashMap<>();
69+
70+
private boolean mapAllStringsOut;
71+
72+
private Charset charset = StandardCharsets.UTF_8;
73+
6174
public AbstractKafkaHeaderMapper(String... patterns) {
6275
Assert.notNull(patterns, "'patterns' must not be null");
6376
for (String pattern : patterns) {
6477
this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern));
6578
}
6679
}
6780

81+
/**
82+
* Set to true to map all {@code String} valued outbound headers to {@code byte[]}.
83+
* To map to a {@code String} for inbound, there must be an entry in the rawMappedHeaders map.
84+
* @param mapAllStringsOut true to map all strings.
85+
* @since 2.2.5
86+
* @see #setRawMappedHaeaders(Map)
87+
*/
88+
public void setMapAllStringsOut(boolean mapAllStringsOut) {
89+
this.mapAllStringsOut = mapAllStringsOut;
90+
}
91+
92+
protected Charset getCharset() {
93+
return this.charset;
94+
}
95+
96+
/**
97+
* Set the charset to use when mapping String-valued headers to/from byte[]. Default UTF-8.
98+
* @param charset the charset.
99+
* @since 2.2.5
100+
* @see #setRawMappedHaeaders(Map)
101+
*/
102+
public void setCharset(Charset charset) {
103+
Assert.notNull(charset, "'charset' cannot be null");
104+
this.charset = charset;
105+
}
106+
107+
/**
108+
* Set the headers to not perform any conversion on (except {@code String} to
109+
* {@code byte[]} for outbound). Inbound headers that match will be mapped as
110+
* {@code byte[]} unless the corresponding boolean in the map value is true,
111+
* in which case it will be mapped as a String.
112+
* @param rawMappedHeaders the header names to not convert and
113+
* @since 2.2.5
114+
* @see #setCharset(Charset)
115+
* @see #setMapAllStringsOut(boolean)
116+
*/
117+
public void setRawMappedHaeaders(Map<String, Boolean> rawMappedHeaders) {
118+
if (!ObjectUtils.isEmpty(rawMappedHeaders)) {
119+
this.rawMappedtHeaders.clear();
120+
this.rawMappedtHeaders.putAll(rawMappedHeaders);
121+
}
122+
}
123+
68124
protected boolean matches(String header, Object value) {
69125
if (matches(header)) {
70126
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL))
@@ -93,6 +149,57 @@ protected boolean matches(String header) {
93149
return false;
94150
}
95151

152+
/**
153+
* Check if the value is a String and convert to byte[], if so configured.
154+
* @param key the header name.
155+
* @param value the headet value.
156+
* @return the value to add.
157+
* @since 2.2.5
158+
*/
159+
protected Object headerValueToAddOut(String key, Object value) {
160+
Object valueToAdd = mapRawOut(key, value);
161+
if (valueToAdd == null) {
162+
valueToAdd = value;
163+
}
164+
return valueToAdd;
165+
}
166+
167+
@Nullable
168+
private byte[] mapRawOut(String header, Object value) {
169+
if (this.mapAllStringsOut || this.rawMappedtHeaders.containsKey(header)) {
170+
if (value instanceof byte[]) {
171+
return (byte[]) value;
172+
}
173+
else if (value instanceof String) {
174+
return ((String) value).getBytes(this.charset);
175+
}
176+
}
177+
return null;
178+
}
179+
180+
/**
181+
* Check if the header value should be mapped to a String, if so configured.
182+
* @param header the header.
183+
* @return the value to add.
184+
*/
185+
protected Object headertValueToAddIn(Header header) {
186+
Object mapped = mapRawIn(header.key(), header.value());
187+
if (mapped == null) {
188+
mapped = header.value();
189+
}
190+
return mapped;
191+
}
192+
193+
@Nullable
194+
private String mapRawIn(String header, byte[] value) {
195+
Boolean asString = this.rawMappedtHeaders.get(header);
196+
if (Boolean.TRUE.equals(asString)) {
197+
return new String(value, this.charset);
198+
}
199+
return null;
200+
}
201+
202+
96203
/**
97204
* A pattern-based header matcher that matches if the specified
98205
* header matches the specified simple pattern.

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2018 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -183,11 +183,11 @@ protected Set<String> getToStringClasses() {
183183
* If any of the supplied packages is {@code "*"}, all packages are trusted.
184184
* If a class for a non-trusted package is encountered, the header is returned to the
185185
* application with value of type {@link NonTrustedHeaderType}.
186-
* @param trustedPackages the packages to trust.
186+
* @param packagesToTrust the packages to trust.
187187
*/
188-
public void addTrustedPackages(String... trustedPackages) {
189-
if (trustedPackages != null) {
190-
for (String whiteList : trustedPackages) {
188+
public void addTrustedPackages(String... packagesToTrust) {
189+
if (packagesToTrust != null) {
190+
for (String whiteList : packagesToTrust) {
191191
if ("*".equals(whiteList)) {
192192
this.trustedPackages.clear();
193193
break;
@@ -213,25 +213,26 @@ public void addToStringClasses(String... classNames) {
213213
public void fromHeaders(MessageHeaders headers, Headers target) {
214214
final Map<String, String> jsonHeaders = new HashMap<>();
215215
final ObjectMapper headerObjectMapper = getObjectMapper();
216-
headers.forEach((k, v) -> {
217-
if (matches(k, v)) {
218-
if (v instanceof byte[]) {
219-
target.add(new RecordHeader(k, (byte[]) v));
216+
headers.forEach((key, val) -> {
217+
if (matches(key, val)) {
218+
Object valueToAdd = headerValueToAddOut(key, val);
219+
if (valueToAdd instanceof byte[]) {
220+
target.add(new RecordHeader(key, (byte[]) valueToAdd));
220221
}
221222
else {
222223
try {
223-
Object value = v;
224-
String className = v.getClass().getName();
224+
Object value = valueToAdd;
225+
String className = valueToAdd.getClass().getName();
225226
if (this.toStringClasses.contains(className)) {
226-
value = v.toString();
227+
value = valueToAdd.toString();
227228
className = "java.lang.String";
228229
}
229-
target.add(new RecordHeader(k, headerObjectMapper.writeValueAsBytes(value)));
230-
jsonHeaders.put(k, className);
230+
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(value)));
231+
jsonHeaders.put(key, className);
231232
}
232-
catch (Exception e) {
233+
catch (@SuppressWarnings("unused") Exception e) {
233234
if (logger.isDebugEnabled()) {
234-
logger.debug("Could not map " + k + " with type " + v.getClass().getName());
235+
logger.debug("Could not map " + key + " with type " + valueToAdd.getClass().getName());
235236
}
236237
}
237238
}
@@ -250,11 +251,11 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
250251
@Override
251252
public void toHeaders(Headers source, final Map<String, Object> headers) {
252253
final Map<String, String> jsonTypes = decodeJsonTypes(source);
253-
source.forEach(h -> {
254-
if (!(h.key().equals(JSON_TYPES))) {
255-
if (jsonTypes != null && jsonTypes.containsKey(h.key())) {
254+
source.forEach(header -> {
255+
if (!(header.key().equals(JSON_TYPES))) {
256+
if (jsonTypes != null && jsonTypes.containsKey(header.key())) {
256257
Class<?> type = Object.class;
257-
String requestedType = jsonTypes.get(h.key());
258+
String requestedType = jsonTypes.get(header.key());
258259
boolean trusted = false;
259260
try {
260261
trusted = trusted(requestedType);
@@ -263,26 +264,26 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
263264
}
264265
}
265266
catch (Exception e) {
266-
logger.error("Could not load class for header: " + h.key(), e);
267+
logger.error("Could not load class for header: " + header.key(), e);
267268
}
268269
if (trusted) {
269270
try {
270-
Object value = decodeValue(h, type);
271-
headers.put(h.key(), value);
271+
Object value = decodeValue(header, type);
272+
headers.put(header.key(), value);
272273
}
273274
catch (IOException e) {
274-
logger.error("Could not decode json type: " + new String(h.value()) + " for key: " + h
275+
logger.error("Could not decode json type: " + new String(header.value()) + " for key: " + header
275276
.key(),
276277
e);
277-
headers.put(h.key(), h.value());
278+
headers.put(header.key(), header.value());
278279
}
279280
}
280281
else {
281-
headers.put(h.key(), new NonTrustedHeaderType(h.value(), requestedType));
282+
headers.put(header.key(), new NonTrustedHeaderType(header.value(), requestedType));
282283
}
283284
}
284285
else {
285-
headers.put(h.key(), h.value());
286+
headers.put(header.key(), headertValueToAddIn(header));
286287
}
287288
}
288289
});
@@ -419,7 +420,7 @@ public String toString() {
419420
return "NonTrustedHeaderType [headerValue=" + new String(this.headerValue, StandardCharsets.UTF_8)
420421
+ ", untrustedType=" + this.untrustedType + "]";
421422
}
422-
catch (Exception e) {
423+
catch (@SuppressWarnings("unused") Exception e) {
423424
return "NonTrustedHeaderType [headerValue=" + Arrays.toString(this.headerValue) + ", untrustedType="
424425
+ this.untrustedType + "]";
425426
}

spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018 the original author or authors.
2+
* Copyright 2018-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,7 +26,8 @@
2626
/**
2727
* A simple header mapper that maps headers directly; for outbound,
2828
* only byte[] headers are mapped; for inbound, headers are mapped
29-
* unchanged, as byte[].
29+
* unchanged, as byte[]. Strings can also be mapped to/from byte.
30+
* See {@link #setRawMappedHaeaders(Map)}.
3031
* Most headers in {@link KafkaHeaders} are not mapped on outbound messages.
3132
* The exceptions are correlation and reply headers for request/reply
3233
*
@@ -64,16 +65,17 @@ public SimpleKafkaHeaderMapper(String... patterns) {
6465

6566
@Override
6667
public void fromHeaders(MessageHeaders headers, Headers target) {
67-
headers.forEach((k, v) -> {
68-
if (v instanceof byte[] && matches(k, v)) {
69-
target.add(new RecordHeader(k, (byte[]) v));
68+
headers.forEach((key, value) -> {
69+
Object valueToAdd = headerValueToAddOut(key, value);
70+
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
71+
target.add(new RecordHeader(key, (byte[]) valueToAdd));
7072
}
7173
});
7274
}
7375

7476
@Override
7577
public void toHeaders(Headers source, Map<String, Object> target) {
76-
source.forEach(header -> target.put(header.key(), header.value()));
78+
source.forEach(header -> target.put(header.key(), headertValueToAddIn(header)));
7779
}
7880

7981
}

0 commit comments

Comments
 (0)