Skip to content

Commit b9421a9

Browse files
garyrussellartembilan
authored andcommitted
GH-1929: Fix BatchInterceptor
Resolves #1929 Need to rebuild the record list after interception (when `interceptBeforeTx` is false). Otherwise changes made to the records in the `ConsumerRecords` is lost. Always intercept early when transactions are not used; rebuild list if transactions in use, `interceptBeforeTx` is false, and a batch interceptor is present. **cherry-pick to 2.7.x**
1 parent f744094 commit b9421a9

File tree

2 files changed

+33
-14
lines changed

2 files changed

+33
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -611,23 +611,25 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
611611

612612
private final Duration syncCommitTimeout;
613613

614-
private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx()
614+
private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
615615
? getRecordInterceptor()
616616
: null;
617617

618-
private final RecordInterceptor<K, V> earlyRecordInterceptor = isInterceptBeforeTx()
619-
? getRecordInterceptor()
620-
: null;
618+
private final RecordInterceptor<K, V> earlyRecordInterceptor =
619+
isInterceptBeforeTx() || this.kafkaTxManager == null
620+
? getRecordInterceptor()
621+
: null;
621622

622623
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
623624

624-
private final BatchInterceptor<K, V> batchInterceptor = !isInterceptBeforeTx()
625+
private final BatchInterceptor<K, V> batchInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
625626
? getBatchInterceptor()
626627
: null;
627628

628-
private final BatchInterceptor<K, V> earlyBatchInterceptor = isInterceptBeforeTx()
629-
? getBatchInterceptor()
630-
: null;
629+
private final BatchInterceptor<K, V> earlyBatchInterceptor =
630+
isInterceptBeforeTx() || this.kafkaTxManager == null
631+
? getBatchInterceptor()
632+
: null;
631633

632634
private final BatchInterceptor<K, V> commonBatchInterceptor = getBatchInterceptor();
633635

@@ -1961,11 +1963,20 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
19611963
}
19621964

19631965
private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> recordsArg,
1964-
@Nullable List<ConsumerRecord<K, V>> recordList) {
1966+
@Nullable List<ConsumerRecord<K, V>> recordListArg) {
19651967

19661968
ConsumerRecords<K, V> records = recordsArg;
1969+
List<ConsumerRecord<K, V>> recordList = recordListArg;
19671970
if (this.batchInterceptor != null) {
19681971
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
1972+
if (records == null) {
1973+
this.logger.debug(() -> "BatchInterceptor returned null, skipping: "
1974+
+ recordsArg + " with " + recordsArg.count() + " records");
1975+
return;
1976+
}
1977+
else {
1978+
recordList = createRecordList(records);
1979+
}
19691980
}
19701981
if (this.wantsFullRecords) {
19711982
this.batchListener.onMessage(records, // NOSONAR
@@ -2154,7 +2165,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
21542165
if (this.earlyBatchInterceptor != null) {
21552166
next = this.earlyBatchInterceptor.intercept(next, this.consumer);
21562167
if (next == null) {
2157-
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2168+
this.logger.debug(() -> "BatchInterceptor returned null, skipping: "
21582169
+ nextArg + " with " + nextArg.count() + " records");
21592170
}
21602171
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,11 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc
450450
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
451451
final Consumer consumer = mock(Consumer.class);
452452
TopicPartition tp0 = new TopicPartition("foo", 0);
453-
ConsumerRecord record = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
454-
ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record)));
453+
ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
454+
ConsumerRecord record2 = new ConsumerRecord("foo", 0, 1L, null, null);
455+
ConsumerRecords records = batch
456+
? new ConsumerRecords(Collections.singletonMap(tp0, List.of(record1, record2)))
457+
: new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1)));
455458
ConsumerRecords empty = new ConsumerRecords<>(Collections.emptyMap());
456459
AtomicInteger firstOrSecondPoll = new AtomicInteger();
457460
willAnswer(invocation -> {
@@ -470,11 +473,13 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc
470473
ContainerProperties containerProperties = new ContainerProperties("foo");
471474
containerProperties.setGroupId("grp");
472475
AtomicBoolean first = new AtomicBoolean(true);
476+
AtomicReference<List<ConsumerRecord<String, String>>> received = new AtomicReference<>();
473477
if (batch) {
474-
containerProperties.setMessageListener((BatchMessageListener) recs -> {
478+
containerProperties.setMessageListener((BatchMessageListener<String, String>) recs -> {
475479
if (first.getAndSet(false)) {
476480
throw new RuntimeException("test");
477481
}
482+
received.set(recs);
478483
});
479484
}
480485
else {
@@ -546,7 +551,7 @@ public void failure(ConsumerRecord record, Exception exception, Consumer consume
546551
public ConsumerRecords intercept(ConsumerRecords recs, Consumer consumer) {
547552
order.add("interceptor");
548553
latch.countDown();
549-
return recs;
554+
return new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1)));
550555
}
551556

552557
@Override
@@ -586,6 +591,9 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
586591
"success");
587592
}
588593
}
594+
if (batch) {
595+
assertThat(received.get()).hasSize(1);
596+
}
589597
}
590598
finally {
591599
container.stop();

0 commit comments

Comments
 (0)