Skip to content

Commit 34e6860

Browse files
Addressing PR review
Signed-off-by: Sanghyeok An <[email protected]>
1 parent b3f4374 commit 34e6860

File tree

4 files changed

+130
-27
lines changed

4 files changed

+130
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -267,30 +267,18 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
267267
final ObjectMapper headerObjectMapper = getObjectMapper();
268268
headers.forEach((key, rawValue) -> {
269269
if (matches(key, rawValue)) {
270-
Object valueToAdd = headerValueToAddOut(key, rawValue);
271-
if (valueToAdd instanceof byte[]) {
272-
target.add(new RecordHeader(key, (byte[]) valueToAdd));
273-
}
274-
else {
275-
try {
276-
String className = valueToAdd.getClass().getName();
277-
boolean encodeToJson = this.encodeStrings;
278-
if (this.toStringClasses.contains(className)) {
279-
valueToAdd = valueToAdd.toString();
280-
className = JAVA_LANG_STRING;
281-
encodeToJson = true;
282-
}
283-
if (!encodeToJson && valueToAdd instanceof String) {
284-
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
285-
}
286-
else {
287-
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
288-
}
289-
jsonHeaders.put(key, className);
270+
if (doesMatchMultiValueHeader(key)) {
271+
Iterable<?> valuesToMap;
272+
if (rawValue instanceof Iterable<?> iterable) {
273+
valuesToMap = iterable;
290274
}
291-
catch (Exception e) {
292-
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
275+
else {
276+
valuesToMap = List.of(rawValue);
293277
}
278+
valuesToMap.forEach(o -> fromHeader(key, o, jsonHeaders, headerObjectMapper, target));
279+
}
280+
else {
281+
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target);
294282
}
295283
}
296284
});
@@ -331,6 +319,36 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
331319
});
332320
}
333321

322+
private void fromHeader(String key, Object rawValue, Map<String, String> jsonHeaders,
323+
ObjectMapper headerObjectMapper, Headers target) {
324+
325+
Object valueToAdd = headerValueToAddOut(key, rawValue);
326+
if (valueToAdd instanceof byte[]) {
327+
target.add(new RecordHeader(key, (byte[]) valueToAdd));
328+
}
329+
else {
330+
try {
331+
String className = valueToAdd.getClass().getName();
332+
boolean encodeToJson = this.encodeStrings;
333+
if (this.toStringClasses.contains(className)) {
334+
valueToAdd = valueToAdd.toString();
335+
className = JAVA_LANG_STRING;
336+
encodeToJson = true;
337+
}
338+
if (!encodeToJson && valueToAdd instanceof String) {
339+
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
340+
}
341+
else {
342+
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
343+
}
344+
jsonHeaders.put(key, className);
345+
}
346+
catch (Exception e) {
347+
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
348+
}
349+
}
350+
}
351+
334352
private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
335353
Class<?> type = Object.class;
336354
boolean trusted = false;

spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.nio.ByteBuffer;
2020
import java.util.HashSet;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Set;
2324

@@ -95,9 +96,18 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte
9596
public void fromHeaders(MessageHeaders headers, Headers target) {
9697
headers.forEach((key, value) -> {
9798
if (!NEVER.contains(key)) {
98-
Object valueToAdd = headerValueToAddOut(key, value);
99-
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
100-
target.add(new RecordHeader(key, (byte[]) valueToAdd));
99+
if (doesMatchMultiValueHeader(key)) {
100+
Iterable<?> valuesToMap;
101+
if (value instanceof Iterable<?> iterable) {
102+
valuesToMap = iterable;
103+
}
104+
else {
105+
valuesToMap = List.of(value);
106+
}
107+
valuesToMap.forEach(o -> fromHeader(key, o, target));
108+
}
109+
else {
110+
fromHeader(key, value, target);
101111
}
102112
}
103113
});
@@ -118,4 +128,11 @@ public void toHeaders(Headers source, Map<String, Object> headers) {
118128
});
119129
}
120130

131+
private void fromHeader(String key, Object value, Headers target) {
132+
Object valueToAdd = headerValueToAddOut(key, value);
133+
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
134+
target.add(new RecordHeader(key, (byte[]) valueToAdd));
135+
}
136+
}
137+
121138
}

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ void inboundJson() {
335335
}
336336

337337
@Test
338-
void multiValueHeader() {
338+
void multiValueHeaderToTest() {
339339
// GIVEN
340340
String multiValueHeader1 = "test-multi-value1";
341341
String multiValueHeader2 = "test-multi-value2";
@@ -381,6 +381,40 @@ void multiValueHeader() {
381381
new byte[] { 0, 0, 0, 5 });
382382
}
383383

384+
@Test
385+
void multiValueHeaderFromTest() {
386+
// GIVEN
387+
String multiValueHeader1 = "test-multi-value1";
388+
String multiValueHeader2 = "test-multi-value2";
389+
String singleValueHeader = "test-single-value1";
390+
391+
Message<String> message = MessageBuilder
392+
.withPayload("test-multi-value-header")
393+
.setHeader(multiValueHeader1, List.of(new byte[] { 0, 0, 0, 1 },
394+
new byte[] { 0, 0, 0, 2 }))
395+
.setHeader(multiValueHeader2, List.of(new byte[] { 0, 0, 0, 3 },
396+
new byte[] { 0, 0, 0, 4 }))
397+
.setHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
398+
.build();
399+
400+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
401+
mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2);
402+
403+
// WHEN
404+
Headers results = new RecordHeaders();
405+
mapper.fromHeaders(message.getHeaders(), results);
406+
407+
// THEN
408+
assertThat(results).contains(
409+
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 1 }),
410+
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 2 }),
411+
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 3 }),
412+
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 4 }),
413+
new RecordHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
414+
);
415+
}
416+
417+
384418
@Test
385419
void deserializationExceptionHeadersAreMappedAsNonByteArray() {
386420
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();

spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.junit.jupiter.api.Test;
2929

3030
import org.springframework.kafka.retrytopic.RetryTopicHeaders;
31+
import org.springframework.messaging.Message;
3132
import org.springframework.messaging.MessageHeaders;
33+
import org.springframework.messaging.support.MessageBuilder;
3234

3335
import static org.assertj.core.api.Assertions.assertThat;
3436
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
@@ -190,7 +192,7 @@ void inboundMappingWithPatterns() {
190192
}
191193

192194
@Test
193-
void multiValueHeader() {
195+
void multiValueHeaderToTest() {
194196
// GIVEN
195197
String multiValueHeader1 = "test-multi-value1";
196198
String multiValueHeader2 = "test-multi-value2";
@@ -236,4 +238,36 @@ void multiValueHeader() {
236238
new byte[] { 0, 0, 0, 5 });
237239
}
238240

241+
@Test
242+
void multiValueHeaderFromTest() {
243+
// GIVEN
244+
String multiValueHeader1 = "test-multi-value1";
245+
String multiValueHeader2 = "test-multi-value2";
246+
String singleValueHeader = "test-single-value1";
247+
248+
Message<String> message = MessageBuilder
249+
.withPayload("test-multi-value-header")
250+
.setHeader(multiValueHeader1, List.of(new byte[] { 0, 0, 0, 1 },
251+
new byte[] { 0, 0, 0, 2 }))
252+
.setHeader(multiValueHeader2, List.of(new byte[] { 0, 0, 0, 3 },
253+
new byte[] { 0, 0, 0, 4 }))
254+
.setHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
255+
.build();
256+
257+
SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper();
258+
259+
// WHEN
260+
Headers results = new RecordHeaders();
261+
mapper.fromHeaders(message.getHeaders(), results);
262+
263+
// THEN
264+
assertThat(results).contains(
265+
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 1 }),
266+
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 2 }),
267+
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 3 }),
268+
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 4 }),
269+
new RecordHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
270+
);
271+
}
272+
239273
}

0 commit comments

Comments
 (0)