Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,24 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t

The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.

Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
Starting with version 4.0, `AbstractKafkaListenerContainerFactory` and `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
If the returned interceptor is an instance of `CompositeRecordInterceptor` or `CompositeBatchInterceptor`, additional `RecordInterceptor` or `BatchInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` or `BatchInterceptor` has already been configured.
The following example shows how to do so:

[source, java]
----
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
CompositeRecordInterceptor compositeInterceptor;

RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
compositeInterceptor = interceptor;
} else {
compositeInterceptor = new CompositeRecordInterceptor<>();
container.setRecordInterceptor(compositeInterceptor);
}

if (previousInterceptor != null) {
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
containerFactory.setRecordInterceptor(compositeInterceptor);
if (previousInterceptor != null) {
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
}
}

RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* @author Stephane Nicoll
* @author Gary Russell
* @author Artem Bilan
* @author Christian Fredriksson
*
* @see AbstractMessageListenerContainer
*/
Expand Down Expand Up @@ -275,6 +276,15 @@ public ContainerProperties getContainerProperties() {
return this.containerProperties;
}

/**
* Get the {@link RecordInterceptor} for modification, if configured.
* @return the {@link RecordInterceptor}, or {@code null} if not configured
* @since 4.0
*/
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
return this.recordInterceptor;
}

/**
* Set an interceptor to be called before calling the listener.
* Only used with record listeners.
Expand All @@ -286,6 +296,15 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

/**
* Get the {@link BatchInterceptor} for modification, if configured.
* @return the {@link BatchInterceptor}, or {@code null} if not configured
* @since 4.0
*/
public @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
return this.batchInterceptor;
}

/**
* Set a batch interceptor to be called before and after calling the listener.
* Only used with batch listeners.
Expand Down