@@ -514,14 +514,14 @@ private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
514514 }
515515
516516 private void enhanceHeaders (Headers kafkaHeaders , ConsumerRecord <?, ?> record , Exception exception ) {
517- maybeAddOriginalHeaders (kafkaHeaders , record );
517+ maybeAddOriginalHeaders (kafkaHeaders , record , exception );
518518 Headers headers = this .headersFunction .apply (record , exception );
519519 if (headers != null ) {
520520 headers .forEach (kafkaHeaders ::add );
521521 }
522522 }
523523
524- private void maybeAddOriginalHeaders (Headers kafkaHeaders , ConsumerRecord <?, ?> record ) {
524+ private void maybeAddOriginalHeaders (Headers kafkaHeaders , ConsumerRecord <?, ?> record , Exception ex ) {
525525 maybeAddHeader (kafkaHeaders , this .headerNames .original .topicHeader ,
526526 record .topic ().getBytes (StandardCharsets .UTF_8 ));
527527 maybeAddHeader (kafkaHeaders , this .headerNames .original .partitionHeader ,
@@ -532,6 +532,13 @@ private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?>
532532 ByteBuffer .allocate (Long .BYTES ).putLong (record .timestamp ()).array ());
533533 maybeAddHeader (kafkaHeaders , this .headerNames .original .timestampTypeHeader ,
534534 record .timestampType ().toString ().getBytes (StandardCharsets .UTF_8 ));
535+ if (ex instanceof ListenerExecutionFailedException ) {
536+ String consumerGroup = ((ListenerExecutionFailedException ) ex ).getGroupId ();
537+ if (consumerGroup != null ) {
538+ maybeAddHeader (kafkaHeaders , this .headerNames .original .consumerGroup ,
539+ consumerGroup .getBytes (StandardCharsets .UTF_8 ));
540+ }
541+ }
535542 }
536543
537544 private void maybeAddHeader (Headers kafkaHeaders , String header , byte [] value ) {
@@ -540,11 +547,14 @@ private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
540547 }
541548 }
542549
543- void addExceptionInfoHeaders (Headers kafkaHeaders , Exception exception ,
544- boolean isKey ) {
550+ void addExceptionInfoHeaders (Headers kafkaHeaders , Exception exception , boolean isKey ) {
545551 kafkaHeaders .add (new RecordHeader (isKey ? this .headerNames .exceptionInfo .keyExceptionFqcn
546552 : this .headerNames .exceptionInfo .exceptionFqcn ,
547553 exception .getClass ().getName ().getBytes (StandardCharsets .UTF_8 )));
554+ if (!isKey && exception .getCause () != null ) {
555+ kafkaHeaders .add (new RecordHeader (this .headerNames .exceptionInfo .exceptionCauseFqcn ,
556+ exception .getCause ().getClass ().getName ().getBytes (StandardCharsets .UTF_8 )));
557+ }
548558 String message = exception .getMessage ();
549559 if (message != null ) {
550560 kafkaHeaders .add (new RecordHeader (isKey
@@ -579,9 +589,11 @@ protected HeaderNames getHeaderNames() {
579589 .timestampTypeHeader (KafkaHeaders .DLT_ORIGINAL_TIMESTAMP_TYPE )
580590 .topicHeader (KafkaHeaders .DLT_ORIGINAL_TOPIC )
581591 .partitionHeader (KafkaHeaders .DLT_ORIGINAL_PARTITION )
592+ .consumerGroupHeader (KafkaHeaders .DLT_ORIGINAL_CONSUMER_GROUP )
582593 .exception ()
583594 .keyExceptionFqcn (KafkaHeaders .DLT_KEY_EXCEPTION_FQCN )
584595 .exceptionFqcn (KafkaHeaders .DLT_EXCEPTION_FQCN )
596+ .exceptionCauseFqcn (KafkaHeaders .DLT_EXCEPTION_CAUSE_FQCN )
585597 .keyExceptionMessage (KafkaHeaders .DLT_KEY_EXCEPTION_MESSAGE )
586598 .exceptionMessage (KafkaHeaders .DLT_EXCEPTION_MESSAGE )
587599 .keyExceptionStacktrace (KafkaHeaders .DLT_KEY_EXCEPTION_STACKTRACE )
@@ -605,50 +617,67 @@ public static class HeaderNames {
605617 }
606618
607619 static class Original {
608- private final String offsetHeader ;
609- private final String timestampHeader ;
610- private final String timestampTypeHeader ;
611- private final String topicHeader ;
612- private final String partitionHeader ;
620+
621+ final String offsetHeader ;
622+
623+ final String timestampHeader ;
624+
625+ final String timestampTypeHeader ;
626+
627+ final String topicHeader ;
628+
629+ final String partitionHeader ;
630+
631+ final String consumerGroup ;
613632
614633 Original (String offsetHeader ,
615634 String timestampHeader ,
616635 String timestampTypeHeader ,
617636 String topicHeader ,
618- String partitionHeader ) {
637+ String partitionHeader ,
638+ String consumerGroup ) {
619639 this .offsetHeader = offsetHeader ;
620640 this .timestampHeader = timestampHeader ;
621641 this .timestampTypeHeader = timestampTypeHeader ;
622642 this .topicHeader = topicHeader ;
623643 this .partitionHeader = partitionHeader ;
644+ this .consumerGroup = consumerGroup ;
624645 }
625646 }
626647
627648 static class ExceptionInfo {
628649
629- private final String keyExceptionFqcn ;
630- private final String exceptionFqcn ;
631- private final String keyExceptionMessage ;
632- private final String exceptionMessage ;
633- private final String keyExceptionStacktrace ;
634- private final String exceptionStacktrace ;
650+ final String keyExceptionFqcn ;
651+
652+ final String exceptionFqcn ;
653+
654+ final String exceptionCauseFqcn ;
655+
656+ final String keyExceptionMessage ;
657+
658+ final String exceptionMessage ;
659+
660+ final String keyExceptionStacktrace ;
661+
662+ final String exceptionStacktrace ;
635663
636664 ExceptionInfo (String keyExceptionFqcn ,
637- String exceptionFqcn ,
638- String keyExceptionMessage ,
639- String exceptionMessage ,
640- String keyExceptionStacktrace ,
641- String exceptionStacktrace ) {
665+ String exceptionFqcn ,
666+ String exceptionCauseFqcn ,
667+ String keyExceptionMessage ,
668+ String exceptionMessage ,
669+ String keyExceptionStacktrace ,
670+ String exceptionStacktrace ) {
642671 this .keyExceptionFqcn = keyExceptionFqcn ;
643672 this .exceptionFqcn = exceptionFqcn ;
673+ this .exceptionCauseFqcn = exceptionCauseFqcn ;
644674 this .keyExceptionMessage = keyExceptionMessage ;
645675 this .exceptionMessage = exceptionMessage ;
646676 this .keyExceptionStacktrace = keyExceptionStacktrace ;
647677 this .exceptionStacktrace = exceptionStacktrace ;
648678 }
649679 }
650680
651-
652681 /**
653682 * Provides a convenient API for creating
654683 * {@link DeadLetterPublishingRecoverer.HeaderNames}.
@@ -685,6 +714,8 @@ public class Original {
685714
686715 private String partitionHeader ;
687716
717+ private String consumerGroupHeader ;
718+
688719 /**
689720 * Sets the name of the header that will be used to store the offset
690721 * of the original record.
@@ -745,6 +776,18 @@ public Builder.Original partitionHeader(String partitionHeader) {
745776 return this ;
746777 }
747778
779+ /**
780+ * Sets the name of the header that will be used to store the consumer
781+ * group that failed to consume the original record.
782+ * @param consumerGroupHeader the consumer group header name.
783+ * @return the Original builder instance
784+ * @since 2.8
785+ */
786+ public Builder .Original consumerGroupHeader (String consumerGroupHeader ) {
787+ this .consumerGroupHeader = consumerGroupHeader ;
788+ return this ;
789+ }
790+
748791 /**
749792 * Returns the exception builder.
750793 * @return the exception builder.
@@ -760,16 +803,18 @@ public ExceptionInfo exception() {
760803 * @since 2.7
761804 */
762805 private DeadLetterPublishingRecoverer .HeaderNames .Original build () {
763- Assert .notNull (this .offsetHeader , "offsetHeader header cannot be null" );
764- Assert .notNull (this .timestampHeader , "timestampHeader header cannot be null" );
765- Assert .notNull (this .timestampTypeHeader , "timestampTypeHeader header cannot be null" );
766- Assert .notNull (this .topicHeader , "topicHeader header cannot be null" );
767- Assert .notNull (this .partitionHeader , "partitionHeader header cannot be null" );
806+ Assert .notNull (this .offsetHeader , "offsetHeader cannot be null" );
807+ Assert .notNull (this .timestampHeader , "timestampHeader cannot be null" );
808+ Assert .notNull (this .timestampTypeHeader , "timestampTypeHeader cannot be null" );
809+ Assert .notNull (this .topicHeader , "topicHeader cannot be null" );
810+ Assert .notNull (this .partitionHeader , "partitionHeader cannot be null" );
811+ Assert .notNull (this .consumerGroupHeader , "consumerGroupHeader cannot be null" );
768812 return new DeadLetterPublishingRecoverer .HeaderNames .Original (this .offsetHeader ,
769813 this .timestampHeader ,
770814 this .timestampTypeHeader ,
771815 this .topicHeader ,
772- this .partitionHeader );
816+ this .partitionHeader ,
817+ this .consumerGroupHeader );
773818 }
774819 }
775820
@@ -785,6 +830,8 @@ public class ExceptionInfo {
785830
786831 private String exceptionFqcn ;
787832
833+ private String exceptionCauseFqcn ;
834+
788835 private String keyExceptionMessage ;
789836
790837 private String exceptionMessage ;
@@ -817,6 +864,17 @@ public ExceptionInfo exceptionFqcn(String exceptionFqcn) {
817864 return this ;
818865 }
819866
867+ /**
868+ * Sets the name of the header that will be used to store the exceptionCauseFqcn
869+ * of the original record.
870+ * @param exceptionCauseFqcn the exceptionFqcn header name.
871+ * @return the Exception builder instance
872+ * @since 2.8
873+ */
874+ public ExceptionInfo exceptionCauseFqcn (String exceptionCauseFqcn ) {
875+ this .exceptionCauseFqcn = exceptionCauseFqcn ;
876+ return this ;
877+ }
820878 /**
821879 * Sets the name of the header that will be used to store the keyExceptionMessage
822880 * of the original record.
@@ -873,13 +931,15 @@ public ExceptionInfo exceptionStacktrace(String exceptionStacktrace) {
873931 public DeadLetterPublishingRecoverer .HeaderNames build () {
874932 Assert .notNull (this .keyExceptionFqcn , "keyExceptionFqcn header cannot be null" );
875933 Assert .notNull (this .exceptionFqcn , "exceptionFqcn header cannot be null" );
934+ Assert .notNull (this .exceptionCauseFqcn , "exceptionCauseFqcn header cannot be null" );
876935 Assert .notNull (this .keyExceptionMessage , "keyExceptionMessage header cannot be null" );
877936 Assert .notNull (this .exceptionMessage , "exceptionMessage header cannot be null" );
878937 Assert .notNull (this .keyExceptionStacktrace , "keyExceptionStacktrace header cannot be null" );
879938 Assert .notNull (this .exceptionStacktrace , "exceptionStacktrace header cannot be null" );
880939 return new DeadLetterPublishingRecoverer .HeaderNames (Builder .this .original .build (),
881940 new HeaderNames .ExceptionInfo (this .keyExceptionFqcn ,
882941 this .exceptionFqcn ,
942+ this .exceptionCauseFqcn ,
883943 this .keyExceptionMessage ,
884944 this .exceptionMessage ,
885945 this .keyExceptionStacktrace ,
0 commit comments