1919import java .io .IOException ;
2020import java .nio .ByteBuffer ;
2121import java .nio .charset .StandardCharsets ;
22+ import java .util .ArrayList ;
2223import java .util .Arrays ;
2324import java .util .Collections ;
2425import java .util .HashMap ;
2526import java .util .LinkedHashSet ;
2627import java .util .List ;
2728import java .util .Map ;
2829import java .util .Set ;
30+ import java .util .stream .Collectors ;
2931
3032import org .apache .kafka .common .header .Header ;
3133import org .apache .kafka .common .header .Headers ;
3234import org .apache .kafka .common .header .internals .RecordHeader ;
35+ import org .assertj .core .util .Streams ;
3336
3437import org .springframework .messaging .MessageHeaders ;
3538import org .springframework .util .Assert ;
4952 * @author Gary Russell
5053 * @author Artem Bilan
5154 * @author Soby Chacko
55+ * @author Grzegorz Poznachowski
5256 *
5357 * @since 1.3
54- *
5558 */
5659public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
5760
61+ private static final String ITERABLE_HEADER_TYPE_PATTERN = "%s#%s" ;
62+
5863 private static final String JAVA_LANG_STRING = "java.lang.String" ;
5964
6065 private static final Set <String > TRUSTED_ARRAY_TYPES = Set .of (
@@ -97,6 +102,7 @@ public class DefaultKafkaHeaderMapper extends AbstractKafkaHeaderMapper {
97102 * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
98103 * {@link KafkaHeaders} are never mapped as headers since they represent data in
99104 * consumer/producer records.
105+ *
100106 * @see #DefaultKafkaHeaderMapper(ObjectMapper)
101107 */
102108 public DefaultKafkaHeaderMapper () {
@@ -111,6 +117,7 @@ public DefaultKafkaHeaderMapper() {
111117 * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
112118 * {@link KafkaHeaders} are never mapped as headers since they represent data in
113119 * consumer/producer records.
120+ *
114121 * @param objectMapper the object mapper.
115122 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
116123 */
@@ -129,6 +136,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {
129136 * generally should not map the {@code "id" and "timestamp"} headers. Note:
130137 * most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
131138 * represent data in consumer/producer records.
139+ *
132140 * @param patterns the patterns.
133141 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
134142 */
@@ -144,8 +152,9 @@ public DefaultKafkaHeaderMapper(String... patterns) {
144152 * you generally should not map the {@code "id" and "timestamp"} headers. Note: most
145153 * of the headers in {@link KafkaHeaders} are never mapped as headers since they
146154 * represent data in consumer/producer records.
155+ *
147156 * @param objectMapper the object mapper.
148- * @param patterns the patterns.
157+ * @param patterns the patterns.
149158 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
150159 */
151160 public DefaultKafkaHeaderMapper (ObjectMapper objectMapper , String ... patterns ) {
@@ -161,6 +170,7 @@ private DefaultKafkaHeaderMapper(boolean outbound, ObjectMapper objectMapper, St
161170
162171 /**
163172 * Create an instance for inbound mapping only with pattern matching.
173+ *
164174 * @param patterns the patterns to match.
165175 * @return the header mapper.
166176 * @since 2.8.8
@@ -171,8 +181,9 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patt
171181
172182 /**
173183 * Create an instance for inbound mapping only with pattern matching.
184+ *
174185 * @param objectMapper the object mapper.
175- * @param patterns the patterns to match.
186+ * @param patterns the patterns to match.
176187 * @return the header mapper.
177188 * @since 2.8.8
178189 */
@@ -182,6 +193,7 @@ public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper o
182193
183194 /**
184195 * Return the object mapper.
196+ *
185197 * @return the mapper.
186198 */
187199 protected ObjectMapper getObjectMapper () {
@@ -190,6 +202,7 @@ protected ObjectMapper getObjectMapper() {
190202
191203 /**
192204 * Provide direct access to the trusted packages set for subclasses.
205+ *
193206 * @return the trusted packages.
194207 * @since 2.2
195208 */
@@ -199,6 +212,7 @@ protected Set<String> getTrustedPackages() {
199212
200213 /**
201214 * Provide direct access to the toString() classes by subclasses.
215+ *
202216 * @return the toString() classes.
203217 * @since 2.2
204218 */
@@ -215,6 +229,7 @@ protected boolean isEncodeStrings() {
215229 * raw String value is converted to a byte array using the configured charset. Set to
216230 * true if a consumer of the outbound record is using Spring for Apache Kafka version
217231 * less than 2.3
232+ *
218233 * @param encodeStrings true to encode (default false).
219234 * @since 2.3
220235 */
@@ -235,6 +250,7 @@ public void setEncodeStrings(boolean encodeStrings) {
235250 * If any of the supplied packages is {@code "*"}, all packages are trusted.
236251 * If a class for a non-trusted package is encountered, the header is returned to the
237252 * application with value of type {@link NonTrustedHeaderType}.
253+ *
238254 * @param packagesToTrust the packages to trust.
239255 */
240256 public void addTrustedPackages (String ... packagesToTrust ) {
@@ -254,6 +270,7 @@ public void addTrustedPackages(String... packagesToTrust) {
254270 /**
255271 * Add class names that the outbound mapper should perform toString() operations on
256272 * before mapping.
273+ *
257274 * @param classNames the class names.
258275 * @since 2.2
259276 */
@@ -265,32 +282,15 @@ public void addToStringClasses(String... classNames) {
265282 public void fromHeaders (MessageHeaders headers , Headers target ) {
266283 final Map <String , String > jsonHeaders = new HashMap <>();
267284 final ObjectMapper headerObjectMapper = getObjectMapper ();
268- headers .forEach ((key , rawValue ) -> {
269- if (matches (key , rawValue )) {
270- Object valueToAdd = headerValueToAddOut (key , rawValue );
271- if (valueToAdd instanceof byte []) {
272- target .add (new RecordHeader (key , (byte []) valueToAdd ));
285+ headers .forEach ((key , value ) -> {
286+ if (matches (key , value )) {
287+ if (value instanceof List <?> values ) {
288+ for (int i = 0 ; i < values .size (); i ++) {
289+ resolveHeader (key , values .get (i ), target , jsonHeaders , i );
290+ }
273291 }
274292 else {
275- try {
276- String className = valueToAdd .getClass ().getName ();
277- boolean encodeToJson = this .encodeStrings ;
278- if (this .toStringClasses .contains (className )) {
279- valueToAdd = valueToAdd .toString ();
280- className = JAVA_LANG_STRING ;
281- encodeToJson = true ;
282- }
283- if (!encodeToJson && valueToAdd instanceof String ) {
284- target .add (new RecordHeader (key , ((String ) valueToAdd ).getBytes (getCharset ())));
285- }
286- else {
287- target .add (new RecordHeader (key , headerObjectMapper .writeValueAsBytes (valueToAdd )));
288- }
289- jsonHeaders .put (key , className );
290- }
291- catch (Exception e ) {
292- logger .error (e , () -> "Could not map " + key + " with type " + rawValue .getClass ().getName ());
293- }
293+ resolveHeader (key , value , target , jsonHeaders , null );
294294 }
295295 }
296296 });
@@ -304,34 +304,84 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
304304 }
305305 }
306306
307- @ Override
308- public void toHeaders (Headers source , final Map <String , Object > headers ) {
309- final Map <String , String > jsonTypes = decodeJsonTypes (source );
310- source .forEach (header -> {
311- String headerName = header .key ();
312- if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT ) && matchesForInbound (headerName )) {
313- headers .put (headerName , ByteBuffer .wrap (header .value ()).getInt ());
314- }
315- else if (headerName .equals (KafkaHeaders .LISTENER_INFO ) && matchesForInbound (headerName )) {
316- headers .put (headerName , new String (header .value (), getCharset ()));
317- }
318- else if (headerName .equals (KafkaUtils .KEY_DESERIALIZER_EXCEPTION_HEADER ) ||
319- headerName .equals (KafkaUtils .VALUE_DESERIALIZER_EXCEPTION_HEADER )) {
320- headers .put (headerName , header );
321- }
322- else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
323- if (jsonTypes .containsKey (headerName )) {
324- String requestedType = jsonTypes .get (headerName );
325- populateJsonValueHeader (header , requestedType , headers );
307+ private void resolveHeader (String headerName , Object value , Headers target , Map <String , String > jsonHeaders , Integer headerIndex ) {
308+ Object valueToAdd = headerValueToAddOut (headerName , value );
309+ if (valueToAdd instanceof byte [] byteArray ) {
310+ target .add (new RecordHeader (headerName , byteArray ));
311+ }
312+ else {
313+ try {
314+ String className = valueToAdd .getClass ().getName ();
315+ boolean encodeToJson = this .encodeStrings ;
316+ if (this .toStringClasses .contains (className )) {
317+ valueToAdd = valueToAdd .toString ();
318+ className = JAVA_LANG_STRING ;
319+ encodeToJson = true ;
320+ }
321+ if (!encodeToJson && valueToAdd instanceof String stringValue ) {
322+ target .add (new RecordHeader (headerName , stringValue .getBytes (getCharset ())));
326323 }
327324 else {
328- headers . put ( headerName , headerValueToAddIn ( header ));
325+ target . add ( new RecordHeader ( headerName , this . objectMapper . writeValueAsBytes ( valueToAdd ) ));
329326 }
327+ jsonHeaders .put (headerIndex == null ?
328+ headerName :
329+ ITERABLE_HEADER_TYPE_PATTERN .formatted (headerName , headerIndex ), className );
330330 }
331- });
331+ catch (Exception e ) {
332+ logger .error (e , () -> "Could not map " + headerName + " with type " + value .getClass ().getName ());
333+ }
334+ }
335+ }
336+
337+ @ Override
338+ public void toHeaders (Headers source , final Map <String , Object > target ) {
339+ final Map <String , String > jsonTypes = decodeJsonTypes (source );
340+
341+ Streams .stream (source )
342+ .collect (Collectors .groupingBy (Header ::key ))
343+ .forEach ((headerName , headers ) -> {
344+ Header lastHeader = headers .get (headers .size () - 1 );
345+ if (headerName .equals (KafkaUtils .KEY_DESERIALIZER_EXCEPTION_HEADER ) ||
346+ headerName .equals (KafkaUtils .VALUE_DESERIALIZER_EXCEPTION_HEADER )) {
347+ target .put (headerName , lastHeader );
348+ }
349+ else if (headerName .equals (KafkaHeaders .DELIVERY_ATTEMPT ) && matchesForInbound (headerName )) {
350+ target .put (headerName , ByteBuffer .wrap (lastHeader .value ()).getInt ());
351+ }
352+ else if (headerName .equals (KafkaHeaders .LISTENER_INFO ) && matchesForInbound (headerName )) {
353+ target .put (headerName , new String (lastHeader .value (), getCharset ()));
354+ }
355+ else if (!(headerName .equals (JSON_TYPES )) && matchesForInbound (headerName )) {
356+ if (headers .size () == 1 ) {
357+ if (jsonTypes .containsKey (headerName )) {
358+ String requestedType = jsonTypes .get (headerName );
359+ target .put (headerName , resolveJsonValueHeader (headers .get (0 ), requestedType ));
360+ }
361+ else {
362+ target .put (headerName , headerValueToAddIn (headers .get (0 )));
363+ }
364+ }
365+ else {
366+ List <Object > valueList = new ArrayList <>();
367+ for (int i = 0 ; i < headers .size (); i ++) {
368+ var jsonTypeIterableHeader = ITERABLE_HEADER_TYPE_PATTERN .formatted (headerName , i );
369+ if (jsonTypes .containsKey (jsonTypeIterableHeader )) {
370+ String requestedType = jsonTypes .get (jsonTypeIterableHeader );
371+ valueList .add (resolveJsonValueHeader (headers .get (i ), requestedType ));
372+ }
373+ else {
374+ valueList .add (headerValueToAddIn (headers .get (i )));
375+ }
376+ }
377+ Collections .reverse (valueList );
378+ target .put (headerName , valueList );
379+ }
380+ }
381+ });
332382 }
333383
334- private void populateJsonValueHeader (Header header , String requestedType , Map < String , Object > headers ) {
384+ private Object resolveJsonValueHeader (Header header , String requestedType ) {
335385 Class <?> type = Object .class ;
336386 boolean trusted = false ;
337387 try {
@@ -344,22 +394,21 @@ private void populateJsonValueHeader(Header header, String requestedType, Map<St
344394 logger .error (e , () -> "Could not load class for header: " + header .key ());
345395 }
346396 if (String .class .equals (type ) && (header .value ().length == 0 || header .value ()[0 ] != '"' )) {
347- headers . put ( header . key (), new String (header .value (), getCharset () ));
397+ return new String (header .value (), getCharset ());
348398 }
349399 else {
350400 if (trusted ) {
351401 try {
352- Object value = decodeValue (header , type );
353- headers .put (header .key (), value );
402+ return decodeValue (header , type );
354403 }
355404 catch (IOException e ) {
356405 logger .error (e , () ->
357406 "Could not decode json type: " + requestedType + " for key: " + header .key ());
358- headers . put ( header . key (), header .value () );
407+ return header .value ();
359408 }
360409 }
361410 else {
362- headers . put ( header . key (), new NonTrustedHeaderType (header .value (), requestedType ) );
411+ return new NonTrustedHeaderType (header .value (), requestedType );
363412 }
364413 }
365414 }
0 commit comments