@@ -163,21 +163,7 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
163163
164164 Map <String , Object > rawHeaders = kafkaMessageHeaders .getRawHeaders ();
165165 if (record .headers () != null ) {
166- if (this .headerMapper != null ) {
167- this .headerMapper .toHeaders (record .headers (), rawHeaders );
168- }
169- else {
170- this .logger .debug (() ->
171- "No header mapper is available; Jackson is required for the default mapper; "
172- + "headers (if present) are not mapped but provided raw in "
173- + KafkaHeaders .NATIVE_HEADERS );
174- rawHeaders .put (KafkaHeaders .NATIVE_HEADERS , record .headers ());
175- Header contentType = record .headers ().lastHeader (MessageHeaders .CONTENT_TYPE );
176- if (contentType != null ) {
177- rawHeaders .put (MessageHeaders .CONTENT_TYPE ,
178- new String (contentType .value (), StandardCharsets .UTF_8 ));
179- }
180- }
166+ mapOrAddHeaders (record , rawHeaders );
181167 }
182168 String ttName = record .timestampType () != null ? record .timestampType ().name () : null ;
183169 commonHeaders (acknowledgment , consumer , rawHeaders , record .key (), record .topic (), record .partition (),
@@ -197,6 +183,24 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
197183 return message ;
198184 }
199185
186+ private void mapOrAddHeaders (ConsumerRecord <?, ?> record , Map <String , Object > rawHeaders ) {
187+ if (this .headerMapper != null ) {
188+ this .headerMapper .toHeaders (record .headers (), rawHeaders );
189+ }
190+ else {
191+ this .logger .debug (() ->
192+ "No header mapper is available; Jackson is required for the default mapper; "
193+ + "headers (if present) are not mapped but provided raw in "
194+ + KafkaHeaders .NATIVE_HEADERS );
195+ rawHeaders .put (KafkaHeaders .NATIVE_HEADERS , record .headers ());
196+ Header contentType = record .headers ().lastHeader (MessageHeaders .CONTENT_TYPE );
197+ if (contentType != null ) {
198+ rawHeaders .put (MessageHeaders .CONTENT_TYPE ,
199+ new String (contentType .value (), StandardCharsets .UTF_8 ));
200+ }
201+ }
202+ }
203+
200204 @ SuppressWarnings ({ "unchecked" , "rawtypes" })
201205 @ Override
202206 public ProducerRecord <?, ?> fromMessage (Message <?> messageArg , String defaultTopic ) {
0 commit comments