Skip to content

Commit ffa7e80

Browse files
committed
GH-1681: Add "after" methods to RecordInterceptor
Resolves #1681 Also add `BatchInterceptor`. * Fix LogAccessor Usage.
1 parent a6c86cf commit ffa7e80

File tree

12 files changed

+408
-35
lines changed

12 files changed

+408
-35
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
3737
import org.springframework.kafka.listener.AfterRollbackProcessor;
3838
import org.springframework.kafka.listener.BatchErrorHandler;
39+
import org.springframework.kafka.listener.BatchInterceptor;
3940
import org.springframework.kafka.listener.ContainerProperties;
4041
import org.springframework.kafka.listener.ErrorHandler;
4142
import org.springframework.kafka.listener.GenericErrorHandler;
@@ -106,6 +107,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
106107

107108
private RecordInterceptor<K, V> recordInterceptor;
108109

110+
private BatchInterceptor<K, V> batchInterceptor;
111+
109112
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
110113

111114
private ApplicationContext applicationContext;
@@ -309,6 +312,15 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
309312
this.recordInterceptor = recordInterceptor;
310313
}
311314

315+
/**
316+
* Set a batch interceptor to be called before and after calling the listener.
317+
* @param batchInterceptor the interceptor.
318+
* @since 2.6.8
319+
*/
320+
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
321+
this.batchInterceptor = batchInterceptor;
322+
}
323+
312324
/**
313325
* Set a {@link BatchToRecordAdapter}.
314326
* @param batchToRecordAdapter the adapter.
@@ -407,6 +419,7 @@ else if (this.autoStartup != null) {
407419
instance.setAutoStartup(this.autoStartup);
408420
}
409421
instance.setRecordInterceptor(this.recordInterceptor);
422+
instance.setBatchInterceptor(this.batchInterceptor);
410423
JavaUtils.INSTANCE
411424
.acceptIfNotNull(this.phase, instance::setPhase)
412425
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -95,6 +95,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
9595

9696
private RecordInterceptor<K, V> recordInterceptor;
9797

98+
private BatchInterceptor<K, V> batchInterceptor;
99+
98100
private boolean interceptBeforeTx;
99101

100102
private volatile boolean running = false;
@@ -299,7 +301,7 @@ protected RecordInterceptor<K, V> getRecordInterceptor() {
299301
}
300302

301303
/**
302-
* Set an interceptor to be called before calling the listener.
304+
* Set an interceptor to be called before calling the record listener.
303305
* Does not apply to batch listeners.
304306
* @param recordInterceptor the interceptor.
305307
* @since 2.2.7
@@ -309,6 +311,21 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
309311
this.recordInterceptor = recordInterceptor;
310312
}
311313

314+
protected BatchInterceptor<K, V> getBatchInterceptor() {
315+
return this.batchInterceptor;
316+
}
317+
318+
/**
319+
* Set an interceptor to be called before calling the record listener.
320+
* Does not apply to batch listeners.
321+
* @param batchInterceptor the interceptor.
322+
* @since 2.6.8
323+
* @see #setInterceptBeforeTx(boolean)
324+
*/
325+
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
326+
this.batchInterceptor = batchInterceptor;
327+
}
328+
312329
protected boolean isInterceptBeforeTx() {
313330
return this.interceptBeforeTx;
314331
}
@@ -318,6 +335,7 @@ protected boolean isInterceptBeforeTx() {
318335
* @param interceptBeforeTx true to intercept before the transaction.
319336
* @since 2.3.4
320337
* @see #setRecordInterceptor(RecordInterceptor)
338+
* @see #setBatchInterceptor(BatchInterceptor)
321339
*/
322340
public void setInterceptBeforeTx(boolean interceptBeforeTx) {
323341
this.interceptBeforeTx = interceptBeforeTx;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecords;
20+
21+
import org.springframework.lang.Nullable;
22+
23+
/**
24+
* An interceptor for batches of records.
25+
*
26+
* @param <K> the key type.
27+
* @param <V> the value type.
28+
*
29+
* @author Gary Russell
30+
* @since 2.6.8
31+
*
32+
*/
33+
@FunctionalInterface
34+
public interface BatchInterceptor<K, V> {
35+
36+
/**
37+
* Perform some action on the records or return a different one. If null is returned
38+
* the records will be skipped. Invoked before the listener.
39+
* @param records the records.
40+
* @return the records or null.
41+
*/
42+
@Nullable
43+
ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records);
44+
45+
/**
46+
* Called after the listener exits normally.
47+
* @param records the records.
48+
*/
49+
default void success(ConsumerRecords<K, V> records) {
50+
}
51+
52+
/**
53+
* Called after the listener throws an exception.
54+
* @param records the records.
55+
* @param exception the exception.
56+
*/
57+
default void failure(ConsumerRecords<K, V> records, Exception exception) {
58+
}
59+
60+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.ArrayList;
20+
import java.util.Arrays;
21+
import java.util.Collection;
22+
23+
import org.apache.kafka.clients.consumer.ConsumerRecords;
24+
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* A {@link BatchInterceptor} that delegates to one or more {@link BatchInterceptor}s in
29+
* order.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
*
34+
* @author Gary Russell
35+
* @since 2.6.8
36+
*
37+
*/
38+
public class CompositeBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
39+
40+
private final Collection<BatchInterceptor<K, V>> delegates = new ArrayList<>();
41+
42+
/**
43+
* Construct an instance with the provided delegates.
44+
* @param delegates the delegates.
45+
*/
46+
@SafeVarargs
47+
@SuppressWarnings("varargs")
48+
public CompositeBatchInterceptor(BatchInterceptor<K, V>... delegates) {
49+
Assert.notNull(delegates, "'delegates' cannot be null");
50+
Assert.noNullElements(delegates, "'delegates' cannot have null entries");
51+
this.delegates.addAll(Arrays.asList(delegates));
52+
}
53+
54+
@Override
55+
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records) {
56+
ConsumerRecords<K, V> recordsToIntercept = records;
57+
for (BatchInterceptor<K, V> delegate : this.delegates) {
58+
recordsToIntercept = delegate.intercept(recordsToIntercept);
59+
}
60+
return recordsToIntercept;
61+
}
62+
63+
@Override
64+
public void success(ConsumerRecords<K, V> records) {
65+
this.delegates.forEach(del -> del.success(records));
66+
}
67+
68+
@Override
69+
public void failure(ConsumerRecords<K, V> records, Exception exception) {
70+
this.delegates.forEach(del -> del.failure(records, exception));
71+
}
72+
73+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@
2525
import org.springframework.util.Assert;
2626

2727
/**
28-
* A {@link RecordInterceptor} that delegates to one or more {@link RecordInterceptor} in
28+
* A {@link RecordInterceptor} that delegates to one or more {@link RecordInterceptor}s in
2929
* order.
3030
*
3131
* @param <K> the key type.
@@ -40,6 +40,10 @@ public class CompositeRecordInterceptor<K, V> implements RecordInterceptor<K, V>
4040

4141
private final Collection<RecordInterceptor<K, V>> delegates = new ArrayList<>();
4242

43+
/**
44+
* Construct an instance with the provided delegates.
45+
* @param delegates the delegates.
46+
*/
4347
@SafeVarargs
4448
@SuppressWarnings("varargs")
4549
public CompositeRecordInterceptor(RecordInterceptor<K, V>... delegates) {
@@ -57,4 +61,14 @@ public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
5761
return recordToIntercept;
5862
}
5963

64+
@Override
65+
public void success(ConsumerRecord<K, V> record) {
66+
this.delegates.forEach(del -> del.success(record));
67+
}
68+
69+
@Override
70+
public void failure(ConsumerRecord<K, V> record, Exception exception) {
71+
this.delegates.forEach(del -> del.failure(record, exception));
72+
}
73+
6074
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -191,6 +191,7 @@ protected void doStart() {
191191
container.setGenericErrorHandler(getGenericErrorHandler());
192192
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
193193
container.setRecordInterceptor(getRecordInterceptor());
194+
container.setBatchInterceptor(getBatchInterceptor());
194195
container.setInterceptBeforeTx(isInterceptBeforeTx());
195196
container.setEmergencyStop(() -> {
196197
stop(() -> {

0 commit comments

Comments
 (0)