Skip to content

Commit 61c4ad2

Browse files
Addressing PR review
Signed-off-by: Sanghyeok An <[email protected]>
1 parent 0d9010d commit 61c4ad2

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
7676

7777
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
7878
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].
79+
80+
[[x40-add-record-interceptor]]
81+
=== Configure additional `RecordInterceptor`
82+
83+
The `KafkaMessageListenerContainer` and `ConcurrentMessageListenerContainer` support `getRecordInterceptor()`.
84+
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it.

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
463463
/**
464464
* Get the {@link RecordInterceptor} for modification, if configured.
465465
* @return the {@link RecordInterceptor}, or {@code null} if not configured
466+
* @since 4.0
466467
*/
467468
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
468469
return this.recordInterceptor;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3842,7 +3842,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
38423842
containerProps.setClientId("clientId");
38433843

38443844
CountDownLatch afterLatch = new CountDownLatch(1);
3845-
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
3845+
RecordInterceptor<Integer, String> recordInterceptor1 = spy(new RecordInterceptor<Integer, String>() {
38463846

38473847
@Override
38483848
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
@@ -3858,25 +3858,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
38583858

38593859
});
38603860

3861+
RecordInterceptor<Integer, String> recordInterceptor2 = spy(new RecordInterceptor<Integer, String>() {
3862+
3863+
@Override
3864+
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
3865+
Consumer<Integer, String> consumer) {
3866+
3867+
return record;
3868+
}
3869+
3870+
@Override
3871+
public void clearThreadState(Consumer<?, ?> consumer) {
3872+
afterLatch.countDown();
3873+
}
3874+
3875+
});
3876+
38613877
KafkaMessageListenerContainer<Integer, String> container =
38623878
new KafkaMessageListenerContainer<>(cf, containerProps);
3863-
container.setRecordInterceptor(recordInterceptor);
3879+
container.setRecordInterceptor(new CompositeRecordInterceptor<>());
3880+
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor<Integer, String> composite) {
3881+
composite.addRecordInterceptor(recordInterceptor1);
3882+
composite.addRecordInterceptor(recordInterceptor2);
3883+
}
3884+
38643885
container.start();
38653886
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
38663887
assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue();
38673888

3868-
InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer);
3869-
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
3889+
InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer);
3890+
inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer));
3891+
inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer));
38703892
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
3871-
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
3893+
inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer));
3894+
inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer));
38723895
inOrder.verify(messageListener).onMessage(eq(firstRecord));
3873-
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
3874-
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
3875-
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
3896+
inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer));
3897+
inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer));
3898+
inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer));
3899+
inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer));
3900+
inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer));
3901+
inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer));
38763902
inOrder.verify(messageListener).onMessage(eq(secondRecord));
3877-
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
3878-
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
3879-
inOrder.verify(recordInterceptor).clearThreadState(eq(consumer));
3903+
inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer));
3904+
inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer));
3905+
inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer));
3906+
inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer));
3907+
inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer));
3908+
inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer));
38803909
container.stop();
38813910
}
38823911

0 commit comments

Comments
 (0)