1919import java .io .IOException ;
2020import java .nio .ByteBuffer ;
2121import java .nio .charset .StandardCharsets ;
22+ import java .util .ArrayList ;
2223import java .util .Arrays ;
24+ import java .util .Collection ;
2325import java .util .Collections ;
2426import java .util .HashMap ;
2527import java .util .LinkedHashSet ;
2628import java .util .List ;
2729import java .util .Map ;
2830import java .util .Set ;
31+ import java .util .stream .Collectors ;
2932
3033import org .apache .kafka .common .header .Header ;
3134import org .apache .kafka .common .header .Headers ;
3235import org .apache .kafka .common .header .internals .RecordHeader ;
36+ import org .assertj .core .util .Streams ;
3337
3438import org .springframework .messaging .MessageHeaders ;
3539import org .springframework .util .Assert ;
4852 *
4953 * @author Gary Russell
5054 * @author Artem Bilan
55+ * @author Grzegorz Poznachowski
5156 *
5257 * @since 1.3
53- *
5458 */
5559public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
5660
61+ private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s" ;
62+
5763 private static final String JAVA_LANG_STRING = "java.lang.String" ;
5864
5965 private static final Set <String > TRUSTED_ARRAY_TYPES = Set .of (
@@ -96,6 +102,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
96102 * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
97103 * {@link KafkaHeaders} are never mapped as headers since they represent data in
98104 * consumer/producer records.
105+ *
99106 * @see #DefaultKafkaHeaderMapper(ObjectMapper)
100107 */
101108 public DefaultKafkaHeaderMapper () {
@@ -110,6 +117,7 @@ public DefaultKafkaHeaderMapper() {
110117 * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
111118 * {@link KafkaHeaders} are never mapped as headers since they represent data in
112119 * consumer/producer records.
120+ *
113121 * @param objectMapper the object mapper.
114122 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
115123 */
@@ -128,6 +136,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
128136 * generally should not map the {@code "id" and "timestamp"} headers. Note:
129137 * most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
130138 * represent data in consumer/producer records.
139+ *
131140 * @param patterns the patterns.
132141 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
133142 */
@@ -143,8 +152,9 @@ public DefaultKafkaHeaderMapper(String... patterns) {
143152 * you generally should not map the {@code "id" and "timestamp"} headers. Note: most
144153 * of the headers in {@link KafkaHeaders} are never mapped as headers since they
145154 * represent data in consumer/producer records.
155+ *
146156 * @param objectMapper the object mapper.
147- * @param patterns the patterns.
157+ * @param patterns the patterns.
148158 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
149159 */
150160 public DefaultKafkaHeaderMapper (ObjectMapper objectMapper , String ... patterns ) {
@@ -160,6 +170,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St
160170
161171 /**
162172 * Create an instance for inbound mapping only with pattern matching.
173+ *
163174 * @param patterns the patterns to match.
164175 * @return the header mapper.
165176 * @since 2.8.8
@@ -170,8 +181,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt
170181
171182 /**
172183 * Create an instance for inbound mapping only with pattern matching.
184+ *
173185 * @param objectMapper the object mapper.
174- * @param patterns the patterns to match.
186+ * @param patterns the patterns to match.
175187 * @return the header mapper.
176188 * @since 2.8.8
177189 */
@@ -181,6 +193,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o
181193
182194 /**
183195 * Return the object mapper.
196+ *
184197 * @return the mapper.
185198 */
186199 protected ObjectMapper getObjectMapper () {
@@ -189,6 +202,7 @@ protected ObjectMapper getObjectMapper() {
189202
190203 /**
191204 * Provide direct access to the trusted packages set for subclasses.
205+ *
192206 * @return the trusted packages.
193207 * @since 2.2
194208 */
@@ -198,6 +212,7 @@ protected Set<String> getTrustedPackages() {
198212
199213 /**
200214 * Provide direct access to the toString() classes by subclasses.
215+ *
201216 * @return the toString() classes.
202217 * @since 2.2
203218 */
@@ -214,6 +229,7 @@ protected boolean isEncodeStrings() {
214229 * raw String value is converted to a byte array using the configured charset. Set to
215230 * true if a consumer of the outbound record is using Spring for Apache Kafka version
216231 * less than 2.3
232+ *
217233 * @param encodeStrings true to encode (default false).
218234 * @since 2.3
219235 */
@@ -234,6 +250,7 @@ public void setEncodeStrings(boolean encodeStrings) {
234250 * If any of the supplied packages is {@code "*"}, all packages are trusted.
235251 * If a class for a non-trusted package is encountered, the header is returned to the
236252 * application with value of type {@link NonTrustedHeaderType}.
253+ *
237254 * @param packagesToTrust the packages to trust.
238255 */
239256 public void addTrustedPackages (String ... packagesToTrust ) {
@@ -253,6 +270,7 @@ public void addTrustedPackages(String... packagesToTrust) {
253270 /**
254271 * Add class names that the outbound mapper should perform toString() operations on
255272 * before mapping.
273+ *
256274 * @param classNames the class names.
257275 * @since 2.2
258276 */
@@ -264,32 +282,17 @@ public void addToStringClasses(String... classNames) {
264282 public void fromHeaders (MessageHeaders headers , Headers target ) {
265283 final Map <String , String > jsonHeaders = new HashMap <>();
266284 final ObjectMapper headerObjectMapper = getObjectMapper ();
267- headers .forEach ((key , rawValue ) -> {
268- if (matches (key , rawValue )) {
269- Object valueToAdd = headerValueToAddOut (key , rawValue );
270- if (valueToAdd instanceof byte []) {
271- target .add (new RecordHeader (key , (byte []) valueToAdd ));
285+ headers .forEach ((key , value ) -> {
286+ if (matches (key , value )) {
287+ if (value instanceof Collection <?> values ) {
288+ int i = 0 ;
289+ for (Object element : values ) {
290+ resolveSingleHeader (key , element , target , jsonHeaders , i );
291+ i ++;
292+ }
272293 }
273294 else {
274- try {
275- String className = valueToAdd .getClass ().getName ();
276- boolean encodeToJson = this .encodeStrings ;
277- if (this .toStringClasses .contains (className )) {
278- valueToAdd = valueToAdd .toString ();
279- className = JAVA_LANG_STRING ;
280- encodeToJson = true ;
281- }
282- if (!encodeToJson && valueToAdd instanceof String ) {
283- target .add (new RecordHeader (key , ((String ) valueToAdd ).getBytes (getCharset ())));
284- }
285- else {
286- target .add (new RecordHeader (key , headerObjectMapper .writeValueAsBytes (valueToAdd )));
287- }
288- jsonHeaders .put (key , className );
289- }
290- catch (Exception e ) {
291- logger .error (e , () -> "Could not map " + key + " with type " + rawValue .getClass ().getName ());
292- }
295+ resolveSingleHeader (key , value , target , jsonHeaders );
293296 }
294297 }
295298 });
@@ -303,30 +306,82 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
303306 }
304307 }
305308
306- @ Override
307- public void toHeaders (Headers source , final Map <String , Object > headers ) {
308- final Map <String , String > jsonTypes = decodeJsonTypes (source );
309- source .forEach (header -> {
310- String headerName = header .key ();
311- if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT ) && matchesForInbound (headerName )) {
312- headers .put (headerName , ByteBuffer .wrap (header .value ()).getInt ());
313- }
314- else if (headerName .equals (KafkaHeaders .LISTENER_INFO ) && matchesForInbound (headerName )) {
315- headers .put (headerName , new String (header .value (), getCharset ()));
316- }
317- else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
318- if (jsonTypes .containsKey (headerName )) {
319- String requestedType = jsonTypes .get (headerName );
320- populateJsonValueHeader (header , requestedType , headers );
309+ private void resolveSingleHeader (String headerName , Object value , Headers target , Map <String , String > jsonHeaders ) {
310+ resolveSingleHeader (headerName , value , target , jsonHeaders , null );
311+ }
312+
313+ private void resolveSingleHeader (String headerName , Object value , Headers target , Map <String , String > jsonHeaders , Integer headerIndex ) {
314+ Object valueToAdd = headerValueToAddOut (headerName , value );
315+ if (valueToAdd instanceof byte [] byteArray ) {
316+ target .add (new RecordHeader (headerName , byteArray ));
317+ }
318+ else {
319+ try {
320+ String className = valueToAdd .getClass ().getName ();
321+ boolean encodeToJson = this .encodeStrings ;
322+ if (this .toStringClasses .contains (className )) {
323+ valueToAdd = valueToAdd .toString ();
324+ className = JAVA_LANG_STRING ;
325+ encodeToJson = true ;
326+ }
327+ if (!encodeToJson && valueToAdd instanceof String stringValue ) {
328+ target .add (new RecordHeader (headerName , stringValue .getBytes (getCharset ())));
321329 }
322330 else {
323- headers . put ( headerName , headerValueToAddIn ( header ));
331+ target . add ( new RecordHeader ( headerName , this . objectMapper . writeValueAsBytes ( valueToAdd ) ));
324332 }
333+ jsonHeaders .put (headerIndex == null ?
334+ headerName :
335+ ITERABLE_HEADER_TYPE_PATTERN .formatted (headerName , headerIndex ), className );
325336 }
326- });
337+ catch (Exception e ) {
338+ logger .error (e , () -> "Could not map " + headerName + " with type " + value .getClass ().getName ());
339+ }
340+ }
341+ }
342+
343+ @ Override
344+ public void toHeaders (Headers source , final Map <String , Object > target ) {
345+ final Map <String , String > jsonTypes = decodeJsonTypes (source );
346+
347+ Streams .stream (source )
348+ .collect (Collectors .groupingBy (Header ::key ))
349+ .forEach ((headerName , headers ) -> {
350+ if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT ) && matchesForInbound (headerName )) {
351+ target .put (headerName , ByteBuffer .wrap (headers .get (headers .size () - 1 ).value ()).getInt ());
352+ }
353+ else if (headerName .equals (KafkaHeaders .LISTENER_INFO ) && matchesForInbound (headerName )) {
354+ target .put (headerName , new String (headers .get (headers .size () - 1 ).value (), getCharset ()));
355+ }
356+ else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
357+ if (headers .size () == 1 ) {
358+ if (jsonTypes .containsKey (headerName )) {
359+ String requestedType = jsonTypes .get (headerName );
360+ target .put (headerName , resolveJsonValueHeader (headers .get (0 ), requestedType ));
361+ }
362+ else {
363+ target .put (headerName , headerValueToAddIn (headers .get (0 )));
364+ }
365+ }
366+ else {
367+ List <Object > valueList = new ArrayList <>();
368+ for (int i = 0 ; i < headers .size (); i ++) {
369+ var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN .formatted (headerName , i );
370+ if (jsonTypes .containsKey (jsonTypeIterableHeader )) {
371+ String requestedType = jsonTypes .get (jsonTypeIterableHeader );
372+ valueList .add (resolveJsonValueHeader (headers .get (i ), requestedType ));
373+ }
374+ else {
375+ valueList .add (headerValueToAddIn (headers .get (i )));
376+ }
377+ }
378+ target .put (headerName , valueList );
379+ }
380+ }
381+ });
327382 }
328383
329- private void populateJsonValueHeader (Header header , String requestedType , Map < String , Object > headers ) {
384+ private Object resolveJsonValueHeader (Header header , String requestedType ) {
330385 Class <?> type = Object .class ;
331386 boolean trusted = false ;
332387 try {
@@ -339,22 +394,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
339394 logger .error (e , () -> "Could not load class for header: " + header .key ());
340395 }
341396 if (String .class .equals (type ) && (header .value ().length == 0 || header .value ()[0 ] != '"' )) {
342- headers . put ( header . key (), new String (header .value (), getCharset () ));
397+ return new String (header .value (), getCharset ());
343398 }
344399 else {
345400 if (trusted ) {
346401 try {
347- Object value = decodeValue (header , type );
348- headers .put (header .key (), value );
402+ return decodeValue (header , type );
349403 }
350404 catch (IOException e ) {
351405 logger .error (e , () ->
352406 "Could not decode json type: " + requestedType + " for key: " + header .key ());
353- headers . put ( header . key (), header .value () );
407+ return header .value ();
354408 }
355409 }
356410 else {
357- headers . put ( header . key (), new NonTrustedHeaderType (header .value (), requestedType ) );
411+ return new NonTrustedHeaderType (header .value (), requestedType );
358412 }
359413 }
360414 }
0 commit comments