Skip to content

Commit 7a6a385

Browse files
Revert 2 commits.
Signed-off-by: Sanghyeok An <[email protected]>
1 parent 5896ec2 commit 7a6a385

File tree

3 files changed

+45
-57
lines changed

3 files changed

+45
-57
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.util.ArrayList;
2019
import java.util.Arrays;
2120
import java.util.Collection;
2221
import java.util.List;
@@ -115,7 +114,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
115114

116115
private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;
117116

118-
private List<RecordInterceptor<K, V>> recordInterceptors = new ArrayList<>();
117+
private @Nullable RecordInterceptor<K, V> recordInterceptor;
119118

120119
private @Nullable BatchInterceptor<K, V> batchInterceptor;
121120

@@ -461,8 +460,8 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
461460
this.kafkaAdmin = kafkaAdmin;
462461
}
463462

464-
protected List<RecordInterceptor<K, V>> getRecordInterceptors() {
465-
return this.recordInterceptors;
463+
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
464+
return this.recordInterceptor;
466465
}
467466

468467
/**
@@ -473,9 +472,7 @@ protected List<RecordInterceptor<K, V>> getRecordInterceptors() {
473472
* @see #setInterceptBeforeTx(boolean)
474473
*/
475474
public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterceptor) {
476-
if (recordInterceptor != null) {
477-
this.recordInterceptors.add(recordInterceptor);
478-
}
475+
this.recordInterceptor = recordInterceptor;
479476
}
480477

481478
protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
* @author Tomaz Fernandes
6161
* @author Wang Zhiyang
6262
* @author Lokesh Alamuri
63-
* @author Sanghyeok An
6463
*/
6564
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
6665

@@ -283,9 +282,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
283282
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
284283
container.setCommonErrorHandler(getCommonErrorHandler());
285284
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
286-
for (RecordInterceptor<K, V> recordInterceptor : getRecordInterceptors()) {
287-
container.setRecordInterceptor(recordInterceptor);
288-
}
285+
container.setRecordInterceptor(getRecordInterceptor());
289286
container.setBatchInterceptor(getBatchInterceptor());
290287
container.setInterceptBeforeTx(isInterceptBeforeTx());
291288
container.setListenerInfo(getListenerInfo());

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -714,17 +714,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
714714

715715
private final @Nullable Duration syncCommitTimeout;
716716

717-
private final List<RecordInterceptor<K, V>> recordInterceptors =
717+
private final @Nullable RecordInterceptor<K, V> recordInterceptor =
718718
!isInterceptBeforeTx() || this.transactionManager == null
719-
? getRecordInterceptors()
720-
: new ArrayList<>();
719+
? getRecordInterceptor()
720+
: null;
721721

722-
private final List<RecordInterceptor<K, V>> earlyRecordInterceptors =
722+
private final @Nullable RecordInterceptor<K, V> earlyRecordInterceptor =
723723
isInterceptBeforeTx() && this.transactionManager != null
724-
? getRecordInterceptors()
725-
: new ArrayList<>();
724+
? getRecordInterceptor()
725+
: null;
726726

727-
private final List<RecordInterceptor<K, V>> commonRecordInterceptors = getRecordInterceptors();
727+
private final @Nullable RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
728728

729729
private final @Nullable BatchInterceptor<K, V> batchInterceptor =
730730
!isInterceptBeforeTx() || this.transactionManager == null
@@ -738,7 +738,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
738738

739739
private final @Nullable BatchInterceptor<K, V> commonBatchInterceptor = getBatchInterceptor();
740740

741-
private final List<ThreadStateProcessor> pollThreadStateProcessor;
741+
private final @Nullable ThreadStateProcessor pollThreadStateProcessor;
742742

743743
private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback();
744744

@@ -1040,20 +1040,9 @@ private void obtainClusterId() {
10401040
}
10411041
}
10421042

1043-
private List<ThreadStateProcessor> setUpPollProcessor(boolean batch) {
1044-
if (batch) {
1045-
if (this.commonBatchInterceptor != null) {
1046-
List<ThreadStateProcessor> threadStateProcessors = new ArrayList<>();
1047-
threadStateProcessors.add(this.commonBatchInterceptor);
1048-
return threadStateProcessors;
1049-
}
1050-
else {
1051-
return new ArrayList<>();
1052-
}
1053-
}
1054-
else {
1055-
return new ArrayList<>(this.commonRecordInterceptors);
1056-
}
1043+
@Nullable
1044+
private ThreadStateProcessor setUpPollProcessor(boolean batch) {
1045+
return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor;
10571046
}
10581047

10591048
@Nullable
@@ -1559,7 +1548,9 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
15591548
}
15601549

15611550
private void clearThreadState() {
1562-
this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.clearThreadState(this.consumer));
1551+
if (this.pollThreadStateProcessor != null) {
1552+
this.pollThreadStateProcessor.clearThreadState(this.consumer);
1553+
}
15631554
}
15641555

15651556
private void checkIdlePartitions() {
@@ -1717,7 +1708,9 @@ private ConsumerRecords<K, V> pollConsumer() {
17171708
}
17181709

17191710
private void beforePoll() {
1720-
this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.setupThreadState(this.consumer));
1711+
if (this.pollThreadStateProcessor != null) {
1712+
this.pollThreadStateProcessor.setupThreadState(this.consumer);
1713+
}
17211714
}
17221715

17231716
private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
@@ -2555,7 +2548,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
25552548
this.logger.error(ex, "Transaction rolled back");
25562549
recordAfterRollback(iterator, cRecord, ex);
25572550
}
2558-
this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer));
2551+
if (this.commonRecordInterceptor != null) {
2552+
this.commonRecordInterceptor.afterRecord(cRecord, this.consumer);
2553+
}
25592554
if (this.nackSleepDurationMillis >= 0) {
25602555
handleNack(records, cRecord);
25612556
break;
@@ -2632,7 +2627,9 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
26322627
}
26332628
this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord));
26342629
doInvokeRecordListener(cRecord, iterator);
2635-
this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer));
2630+
if (this.commonRecordInterceptor != null) {
2631+
this.commonRecordInterceptor.afterRecord(cRecord, this.consumer);
2632+
}
26362633
if (this.nackSleepDurationMillis >= 0) {
26372634
handleNack(records, cRecord);
26382635
break;
@@ -2683,16 +2680,14 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
26832680
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
26842681
internalHeaders(recordArg);
26852682
ConsumerRecord<K, V> cRecord = recordArg;
2686-
2687-
for (RecordInterceptor<K, V> earlyRecordInterceptor : this.earlyRecordInterceptors) {
2688-
cRecord = earlyRecordInterceptor.intercept(cRecord, this.consumer);
2683+
if (this.earlyRecordInterceptor != null) {
2684+
cRecord = this.earlyRecordInterceptor.intercept(cRecord, this.consumer);
26892685
if (cRecord == null) {
26902686
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2691-
+ KafkaUtils.format(recordArg));
2687+
+ KafkaUtils.format(recordArg));
26922688
ackCurrent(recordArg);
2693-
earlyRecordInterceptor.success(recordArg, this.consumer);
2694-
earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
2695-
break;
2689+
this.earlyRecordInterceptor.success(recordArg, this.consumer);
2690+
this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
26962691
}
26972692
}
26982693
return cRecord;
@@ -2853,17 +2848,19 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V>
28532848
}
28542849

28552850
private void recordInterceptAfter(ConsumerRecord<K, V> records, @Nullable Exception exception) {
2856-
try {
2857-
if (exception == null) {
2858-
this.commonRecordInterceptors.forEach(interceptor -> interceptor.success(records, this.consumer));
2851+
if (this.commonRecordInterceptor != null) {
2852+
try {
2853+
if (exception == null) {
2854+
this.commonRecordInterceptor.success(records, this.consumer);
2855+
}
2856+
else {
2857+
this.commonRecordInterceptor.failure(records, exception, this.consumer);
2858+
}
28592859
}
2860-
else {
2861-
this.commonRecordInterceptors.forEach(interceptor -> interceptor.failure(records, exception, this.consumer));
2860+
catch (Exception e) {
2861+
this.logger.error(e, "RecordInterceptor.success/failure threw an exception");
28622862
}
28632863
}
2864-
catch (Exception e) {
2865-
this.logger.error(e, "RecordInterceptor.success/failure threw an exception");
2866-
}
28672864
}
28682865

28692866
private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
@@ -2891,11 +2888,8 @@ private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
28912888

28922889
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
28932890
ConsumerRecord<K, V> cRecord = recordArg;
2894-
for (RecordInterceptor<K, V> recordInterceptor : this.recordInterceptors) {
2895-
cRecord = recordInterceptor.intercept(cRecord, this.consumer);
2896-
if (cRecord == null) {
2897-
break;
2898-
}
2891+
if (this.recordInterceptor != null) {
2892+
cRecord = this.recordInterceptor.intercept(cRecord, this.consumer);
28992893
}
29002894
if (cRecord == null) {
29012895
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "

0 commit comments

Comments
 (0)