@@ -315,21 +315,35 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
315315 DeserializationException kDeserEx = ListenerUtils .getExceptionFromHeader (record ,
316316 ErrorHandlingDeserializer .KEY_DESERIALIZER_EXCEPTION_HEADER , this .logger );
317317 Headers headers = new RecordHeaders (record .headers ().toArray ());
318- if (kDeserEx != null && !this .retainExceptionHeader ) {
319- headers .remove (ErrorHandlingDeserializer .KEY_DESERIALIZER_EXCEPTION_HEADER );
320- addExceptionInfoHeaders (headers , kDeserEx , true );
321- }
322- if (vDeserEx != null && !this .retainExceptionHeader ) {
323- headers .remove (ErrorHandlingDeserializer .VALUE_DESERIALIZER_EXCEPTION_HEADER );
324- }
325- enhanceHeaders (headers , record , exception ); // NOSONAR headers are never null
318+ addAndEnhanceHeaders (record , exception , vDeserEx , kDeserEx , headers );
326319 ProducerRecord <Object , Object > outRecord = createProducerRecord (record , tp , headers ,
327320 kDeserEx == null ? null : kDeserEx .getData (), vDeserEx == null ? null : vDeserEx .getData ());
328321 KafkaOperations <Object , Object > kafkaTemplate =
329322 (KafkaOperations <Object , Object >) this .templateResolver .apply (outRecord );
330323 sendOrThrow (outRecord , kafkaTemplate );
331324 }
332325
326+ private void addAndEnhanceHeaders (ConsumerRecord <?, ?> record , Exception exception ,
327+ @ Nullable DeserializationException vDeserEx , @ Nullable DeserializationException kDeserEx , Headers headers ) {
328+
329+ if (kDeserEx != null ) {
330+ if (!this .retainExceptionHeader ) {
331+ headers .remove (ErrorHandlingDeserializer .KEY_DESERIALIZER_EXCEPTION_HEADER );
332+ }
333+ addExceptionInfoHeaders (headers , kDeserEx , true );
334+ }
335+ if (vDeserEx != null ) {
336+ if (!this .retainExceptionHeader ) {
337+ headers .remove (ErrorHandlingDeserializer .VALUE_DESERIALIZER_EXCEPTION_HEADER );
338+ }
339+ addExceptionInfoHeaders (headers , vDeserEx , false );
340+ }
341+ if (kDeserEx == null && vDeserEx == null ) {
342+ addExceptionInfoHeaders (headers , exception , false );
343+ }
344+ enhanceHeaders (headers , record , exception ); // NOSONAR headers are never null
345+ }
346+
333347 private void sendOrThrow (ProducerRecord <Object , Object > outRecord ,
334348 @ Nullable KafkaOperations <Object , Object > kafkaTemplate ) {
335349
@@ -501,7 +515,6 @@ private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
501515
502516 private void enhanceHeaders (Headers kafkaHeaders , ConsumerRecord <?, ?> record , Exception exception ) {
503517 maybeAddOriginalHeaders (kafkaHeaders , record );
504- addExceptionInfoHeaders (kafkaHeaders , exception , false );
505518 Headers headers = this .headersFunction .apply (record , exception );
506519 if (headers != null ) {
507520 headers .forEach (kafkaHeaders ::add );
0 commit comments