11/*
2- * Copyright 2018-2022 the original author or authors.
2+ * Copyright 2018-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1717package org .springframework .kafka .support ;
1818
1919import java .nio .ByteBuffer ;
20+ import java .util .Collection ;
2021import java .util .HashSet ;
2122import java .util .Map ;
2223import java .util .Set ;
24+ import java .util .stream .Collectors ;
2325
26+ import org .apache .kafka .common .header .Header ;
2427import org .apache .kafka .common .header .Headers ;
2528import org .apache .kafka .common .header .internals .RecordHeader ;
29+ import org .assertj .core .util .Streams ;
2630
2731import org .springframework .messaging .MessageHeaders ;
2832
3640 *
3741 * @author Gary Russell
3842 * @since 2.1.3
39- *
4043 */
4144public class SimpleKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
4245
@@ -69,6 +72,7 @@ public SimpleKafkaHeaderMapper() {
6972 * generally should not map the {@code "id" and "timestamp"} headers. Note:
7073 * most of the headers in {@link KafkaHeaders} are never mapped as headers since they
7174 * represent data in consumer/producer records.
75+ *
7276 * @param patterns the patterns.
7377 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
7478 */
@@ -82,6 +86,7 @@ private SimpleKafkaHeaderMapper(boolean outbound, String... patterns) {
8286
8387 /**
8488 * Create an instance for inbound mapping only with pattern matching.
89+ *
8590 * @param patterns the patterns to match.
8691 * @return the header mapper.
8792 * @since 2.8.8
@@ -94,27 +99,40 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte
9499 public void fromHeaders (MessageHeaders headers , Headers target ) {
95100 headers .forEach ((key , value ) -> {
96101 if (!NEVER .contains (key )) {
97- Object valueToAdd = headerValueToAddOut (key , value );
98- if (valueToAdd instanceof byte [] && matches (key , valueToAdd )) {
99- target .add (new RecordHeader (key , (byte []) valueToAdd ));
102+ if (value instanceof Collection <?> values ) {
103+ values .forEach (v -> mapIfMatched (target , key , v ));
104+ } else {
105+ mapIfMatched (target , key , value );
100106 }
101107 }
102108 });
103109 }
104110
111+ private void mapIfMatched (Headers target , String key , Object value ) {
112+ Object valueToAdd = headerValueToAddOut (key , value );
113+ if (valueToAdd instanceof byte [] && matches (key , valueToAdd )) {
114+ target .add (new RecordHeader (key , (byte []) valueToAdd ));
115+ }
116+ }
117+
105118 @ Override
106119 public void toHeaders (Headers source , Map <String , Object > target ) {
107- source .forEach (header -> {
108- String headerName = header .key ();
109- if (matchesForInbound (headerName )) {
110- if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT )) {
111- target .put (headerName , ByteBuffer .wrap (header .value ()).getInt ());
112- }
113- else {
114- target .put (headerName , headerValueToAddIn (header ));
115- }
116- }
117- });
120+ Streams .stream (source )
121+ .collect (Collectors .groupingBy (Header ::key ))
122+ .forEach ((headerName , headers ) -> {
123+ if (matchesForInbound (headerName )) {
124+ if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT )) {
125+ target .put (headerName , ByteBuffer .wrap (headers .get (headers .size () - 1 ).value ()).getInt ());
126+ } else {
127+ var values = headers .stream ().map (super ::headerValueToAddIn ).toList ();
128+ if (values .size () == 1 ) {
129+ target .put (headerName , values .get (0 ));
130+ } else {
131+ target .put (headerName , values );
132+ }
133+ }
134+ }
135+ });
118136 }
119137
120138}
0 commit comments