Skip to content

Commit 0717605

Browse files
committed
Configure interceptors on container factory
This changes allows configuring interceptors on AbstractKafkaListenerContainerFactory in same way as on KafkaMessageListenerContainer, and changes the examle docs to show this instead. Signed-off-by: Christian Fredriksson <[email protected]>
1 parent 2704f85 commit 0717605

File tree

2 files changed

+47
-21
lines changed

2 files changed

+47
-21
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,39 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
2222

2323
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
2424

25-
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
25+
Starting with version 4.0, `AbstractKafkaListenerContainerFactory` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
2626
If the returned interceptor is an instance of `CompositeRecordInterceptor` or `CompositeBatchInterceptor`, additional `RecordInterceptor` or `BatchInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` or `BatchInterceptor` has already been configured.
27-
The following example shows how to do so:
27+
The following example shows how to do so in a `BeanPostProcessor`:
2828

2929
[source, java]
3030
----
31-
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
32-
CompositeRecordInterceptor compositeInterceptor;
33-
34-
RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
35-
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
36-
compositeInterceptor = interceptor;
37-
} else {
38-
compositeInterceptor = new CompositeRecordInterceptor<>();
39-
container.setRecordInterceptor(compositeInterceptor);
40-
}
41-
42-
if (previousInterceptor != null) {
43-
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
31+
public class RecordInterceptorBeanPostProcessor implements BeanPostProcessor {
32+
33+
@Override
34+
public Object postProcessAfterInitialization(Object bean, String beanName) {
35+
if (bean instanceof AbstractKafkaListenerContainerFactory containerFactory) {
36+
37+
RecordInterceptor previousInterceptor = containerFactory.getRecordInterceptor();
38+
39+
CompositeRecordInterceptor compositeInterceptor;
40+
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
41+
compositeInterceptor = interceptor;
42+
} else {
43+
compositeInterceptor = new CompositeRecordInterceptor<>();
44+
containerFactory.setRecordInterceptor(compositeInterceptor);
45+
if (previousInterceptor != null) {
46+
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
47+
}
48+
}
49+
50+
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
51+
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
52+
53+
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
54+
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
55+
}
56+
return bean;
4457
}
45-
46-
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
47-
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
48-
49-
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
50-
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
5158
}
5259
----
5360

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
* @author Stephane Nicoll
6262
* @author Gary Russell
6363
* @author Artem Bilan
64+
* @author Christian Fredriksson
6465
*
6566
* @see AbstractMessageListenerContainer
6667
*/
@@ -275,6 +276,15 @@ public ContainerProperties getContainerProperties() {
275276
return this.containerProperties;
276277
}
277278

279+
/**
280+
* Get the {@link RecordInterceptor} for modification, if configured.
281+
* @return the {@link RecordInterceptor}, or {@code null} if not configured
282+
* @since 4.0
283+
*/
284+
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
285+
return this.recordInterceptor;
286+
}
287+
278288
/**
279289
* Set an interceptor to be called before calling the listener.
280290
* Only used with record listeners.
@@ -286,6 +296,15 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
286296
this.recordInterceptor = recordInterceptor;
287297
}
288298

299+
/**
300+
* Get the {@link BatchInterceptor} for modification, if configured.
301+
* @return the {@link BatchInterceptor}, or {@code null} if not configured
302+
* @since 4.0
303+
*/
304+
public @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
305+
return this.batchInterceptor;
306+
}
307+
289308
/**
290309
* Set a batch interceptor to be called before and after calling the listener.
291310
* Only used with batch listeners.

0 commit comments

Comments
 (0)