@@ -714,17 +714,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
714714
715715 private final @ Nullable Duration syncCommitTimeout ;
716716
717- private final @ Nullable RecordInterceptor <K , V > recordInterceptor =
717+ private final List < RecordInterceptor <K , V >> recordInterceptors =
718718 !isInterceptBeforeTx () || this .transactionManager == null
719- ? getRecordInterceptor ()
720- : null ;
719+ ? getRecordInterceptors ()
720+ : new ArrayList <>() ;
721721
722- private final @ Nullable RecordInterceptor <K , V > earlyRecordInterceptor =
722+ private final List < RecordInterceptor <K , V >> earlyRecordInterceptors =
723723 isInterceptBeforeTx () && this .transactionManager != null
724- ? getRecordInterceptor ()
725- : null ;
724+ ? getRecordInterceptors ()
725+ : new ArrayList <>() ;
726726
727- private final @ Nullable RecordInterceptor <K , V > commonRecordInterceptor = getRecordInterceptor ();
727+ private final List < RecordInterceptor <K , V >> commonRecordInterceptors = getRecordInterceptors ();
728728
729729 private final @ Nullable BatchInterceptor <K , V > batchInterceptor =
730730 !isInterceptBeforeTx () || this .transactionManager == null
@@ -738,7 +738,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
738738
739739 private final @ Nullable BatchInterceptor <K , V > commonBatchInterceptor = getBatchInterceptor ();
740740
741- private final @ Nullable ThreadStateProcessor pollThreadStateProcessor ;
741+ private final List < ThreadStateProcessor > pollThreadStateProcessor ;
742742
743743 private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback ();
744744
@@ -1040,9 +1040,20 @@ private void obtainClusterId() {
10401040 }
10411041 }
10421042
1043- @ Nullable
1044- private ThreadStateProcessor setUpPollProcessor (boolean batch ) {
1045- return batch ? this .commonBatchInterceptor : this .commonRecordInterceptor ;
1043+ private List <ThreadStateProcessor > setUpPollProcessor (boolean batch ) {
1044+ if (batch ) {
1045+ if (this .commonBatchInterceptor != null ) {
1046+ List <ThreadStateProcessor > threadStateProcessors = new ArrayList <>();
1047+ threadStateProcessors .add (this .commonBatchInterceptor );
1048+ return threadStateProcessors ;
1049+ }
1050+ else {
1051+ return new ArrayList <>();
1052+ }
1053+ }
1054+ else {
1055+ return new ArrayList <>(this .commonRecordInterceptors );
1056+ }
10461057 }
10471058
10481059 @ Nullable
@@ -1548,9 +1559,7 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
15481559 }
15491560
15501561 private void clearThreadState () {
1551- if (this .pollThreadStateProcessor != null ) {
1552- this .pollThreadStateProcessor .clearThreadState (this .consumer );
1553- }
1562+ this .pollThreadStateProcessor .forEach (threadStateProcessor -> threadStateProcessor .clearThreadState (this .consumer ));
15541563 }
15551564
15561565 private void checkIdlePartitions () {
@@ -1708,9 +1717,7 @@ private ConsumerRecords<K, V> pollConsumer() {
17081717 }
17091718
17101719 private void beforePoll () {
1711- if (this .pollThreadStateProcessor != null ) {
1712- this .pollThreadStateProcessor .setupThreadState (this .consumer );
1713- }
1720+ this .pollThreadStateProcessor .forEach (threadStateProcessor -> threadStateProcessor .setupThreadState (this .consumer ));
17141721 }
17151722
17161723 private synchronized void captureOffsets (ConsumerRecords <K , V > records ) {
@@ -2548,9 +2555,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
25482555 this .logger .error (ex , "Transaction rolled back" );
25492556 recordAfterRollback (iterator , cRecord , ex );
25502557 }
2551- if (this .commonRecordInterceptor != null ) {
2552- this .commonRecordInterceptor .afterRecord (cRecord , this .consumer );
2553- }
2558+ this .commonRecordInterceptors .forEach (interceptor -> interceptor .afterRecord (cRecord , this .consumer ));
25542559 if (this .nackSleepDurationMillis >= 0 ) {
25552560 handleNack (records , cRecord );
25562561 break ;
@@ -2627,9 +2632,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
26272632 }
26282633 this .logger .trace (() -> "Processing " + KafkaUtils .format (cRecord ));
26292634 doInvokeRecordListener (cRecord , iterator );
2630- if (this .commonRecordInterceptor != null ) {
2631- this .commonRecordInterceptor .afterRecord (cRecord , this .consumer );
2632- }
2635+ this .commonRecordInterceptors .forEach (interceptor -> interceptor .afterRecord (cRecord , this .consumer ));
26332636 if (this .nackSleepDurationMillis >= 0 ) {
26342637 handleNack (records , cRecord );
26352638 break ;
@@ -2680,14 +2683,16 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
26802683 private ConsumerRecord <K , V > checkEarlyIntercept (ConsumerRecord <K , V > recordArg ) {
26812684 internalHeaders (recordArg );
26822685 ConsumerRecord <K , V > cRecord = recordArg ;
2683- if (this .earlyRecordInterceptor != null ) {
2684- cRecord = this .earlyRecordInterceptor .intercept (cRecord , this .consumer );
2686+
2687+ for (RecordInterceptor <K , V > earlyRecordInterceptor : this .earlyRecordInterceptors ) {
2688+ cRecord = earlyRecordInterceptor .intercept (cRecord , this .consumer );
26852689 if (cRecord == null ) {
26862690 this .logger .debug (() -> "RecordInterceptor returned null, skipping: "
2687- + KafkaUtils .format (recordArg ));
2691+ + KafkaUtils .format (recordArg ));
26882692 ackCurrent (recordArg );
2689- this .earlyRecordInterceptor .success (recordArg , this .consumer );
2690- this .earlyRecordInterceptor .afterRecord (recordArg , this .consumer );
2693+ earlyRecordInterceptor .success (recordArg , this .consumer );
2694+ earlyRecordInterceptor .afterRecord (recordArg , this .consumer );
2695+ break ;
26912696 }
26922697 }
26932698 return cRecord ;
@@ -2848,13 +2853,13 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V>
28482853 }
28492854
28502855 private void recordInterceptAfter (ConsumerRecord <K , V > records , @ Nullable Exception exception ) {
2851- if (this .commonRecordInterceptor != null ) {
2856+ if (! this .commonRecordInterceptors . isEmpty () ) {
28522857 try {
28532858 if (exception == null ) {
2854- this .commonRecordInterceptor . success (records , this .consumer );
2859+ this .commonRecordInterceptors . forEach ( interceptor -> interceptor . success (records , this .consumer ) );
28552860 }
28562861 else {
2857- this .commonRecordInterceptor . failure (records , exception , this .consumer );
2862+ this .commonRecordInterceptors . forEach ( interceptor -> interceptor . failure (records , exception , this .consumer ) );
28582863 }
28592864 }
28602865 catch (Exception e ) {
@@ -2888,8 +2893,11 @@ private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
28882893
28892894 private void doInvokeOnMessage (final ConsumerRecord <K , V > recordArg ) {
28902895 ConsumerRecord <K , V > cRecord = recordArg ;
2891- if (this .recordInterceptor != null ) {
2892- cRecord = this .recordInterceptor .intercept (cRecord , this .consumer );
2896+ for (RecordInterceptor <K , V > recordInterceptor : this .recordInterceptors ) {
2897+ cRecord = recordInterceptor .intercept (cRecord , this .consumer );
2898+ if (cRecord == null ) {
2899+ break ;
2900+ }
28932901 }
28942902 if (cRecord == null ) {
28952903 this .logger .debug (() -> "RecordInterceptor returned null, skipping: "
0 commit comments