Skip to content

Commit 72fddc1

Browse files
garyrussellartembilan
authored andcommitted
GH-1569: Add Message Listener Advice Chain
Resolves #1569 Allow provision of around advices, for example to set/reset logging MDC. **cherry-pick to 2.5.x (what's new will need fixing)** # Conflicts: # src/reference/asciidoc/whats-new.adoc
1 parent 24b9e96 commit 72fddc1

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.Arrays;
2022
import java.util.Collections;
2123
import java.util.HashMap;
24+
import java.util.List;
2225
import java.util.Map;
2326
import java.util.Properties;
2427
import java.util.regex.Pattern;
2528

29+
import org.aopalliance.aop.Advice;
30+
31+
import org.springframework.aop.framework.Advised;
32+
import org.springframework.aop.framework.ProxyFactory;
33+
import org.springframework.aop.support.AopUtils;
2634
import org.springframework.core.task.AsyncListenableTaskExecutor;
2735
import org.springframework.kafka.support.KafkaHeaders;
2836
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -31,6 +39,7 @@
3139
import org.springframework.transaction.PlatformTransactionManager;
3240
import org.springframework.transaction.TransactionDefinition;
3341
import org.springframework.util.Assert;
42+
import org.springframework.util.CollectionUtils;
3443

3544
/**
3645
* Contains runtime properties for a listener container.
@@ -164,6 +173,8 @@ public enum EOSMode {
164173

165174
private final Map<String, String> micrometerTags = new HashMap<>();
166175

176+
private final List<Advice> adviceChain = new ArrayList<>();
177+
167178
/**
168179
* The ack mode to use when auto ack (in the configuration properties) is false.
169180
* <ul>
@@ -280,6 +291,7 @@ public ContainerProperties(TopicPartitionOffset... topicPartitions) {
280291
*/
281292
public void setMessageListener(Object messageListener) {
282293
this.messageListener = messageListener;
294+
adviseListenerIfNeeded();
283295
}
284296

285297
/**
@@ -709,6 +721,45 @@ public void setTransactionDefinition(TransactionDefinition transactionDefinition
709721
this.transactionDefinition = transactionDefinition;
710722
}
711723

724+
/**
725+
* A chain of listener {@link Advice}s.
726+
* @return the adviceChain.
727+
* @since 2.5.6
728+
*/
729+
public Advice[] getAdviceChain() {
730+
return this.adviceChain.toArray(new Advice[0]);
731+
}
732+
733+
/**
734+
* Set a chain of listener {@link Advice}s; must not be null or have null elements.
735+
* @param adviceChain the adviceChain to set.
736+
* @since 2.5.6
737+
*/
738+
public void setAdviceChain(Advice... adviceChain) {
739+
Assert.notNull(adviceChain, "'adviceChain' cannot be null");
740+
Assert.noNullElements(adviceChain, "'adviceChain' cannot have null elements");
741+
this.adviceChain.clear();
742+
this.adviceChain.addAll(Arrays.asList(adviceChain));
743+
if (this.messageListener != null) {
744+
adviseListenerIfNeeded();
745+
}
746+
}
747+
748+
private void adviseListenerIfNeeded() {
749+
if (!CollectionUtils.isEmpty(this.adviceChain)) {
750+
if (AopUtils.isAopProxy(this.messageListener)) {
751+
Advised advised = (Advised) this.messageListener;
752+
this.adviceChain.forEach(advised::removeAdvice);
753+
this.adviceChain.forEach(advised::addAdvice);
754+
}
755+
else {
756+
ProxyFactory pf = new ProxyFactory(this.messageListener);
757+
this.adviceChain.forEach(pf::addAdvice);
758+
this.messageListener = pf.getProxy();
759+
}
760+
}
761+
}
762+
712763
@Override
713764
public String toString() {
714765
return "ContainerProperties ["

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.regex.Pattern;
5858
import java.util.stream.Collectors;
5959

60+
import org.aopalliance.intercept.MethodInterceptor;
6061
import org.apache.commons.logging.LogFactory;
6162
import org.apache.kafka.clients.consumer.CommitFailedException;
6263
import org.apache.kafka.clients.consumer.Consumer;
@@ -596,6 +597,16 @@ public void testRecordAckMock() throws Exception {
596597
containerProps.setGroupId("grp");
597598
containerProps.setAckMode(AckMode.RECORD);
598599
containerProps.setMissingTopicsFatal(false);
600+
List<String> advised = new ArrayList<>();
601+
MethodInterceptor advice1 = invoc -> {
602+
advised.add("one");
603+
return invoc.proceed();
604+
};
605+
MethodInterceptor advice2 = invoc -> {
606+
advised.add("two");
607+
return invoc.proceed();
608+
};
609+
containerProps.setAdviceChain(advice1, advice2);
599610
final CountDownLatch latch = new CountDownLatch(2);
600611
MessageListener<Integer, String> messageListener = spy(
601612
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
@@ -632,6 +643,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
632643
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
633644
inOrder.verify(consumer).commitSync(anyMap(), any());
634645
container.stop();
646+
assertThat(advised).containsExactly("one", "two", "one", "two");
635647
}
636648

637649
@SuppressWarnings("unchecked")

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,6 +2173,10 @@ To revert to the previous behavior, set the error handler's `resetStateOnRecover
21732173
|1
21742174
|The number of records before committing pending offsets when the `ackMode` is `COUNT` or `COUNT_TIME`.
21752175

2176+
|adviceChain
2177+
|`null`
2178+
|A chain of `Advice` objects (e.g. `MethodInterceptor` around advice) wrapping the message listener, invoked in order.
2179+
21762180
|ackMode
21772181
|BATCH
21782182
|Controls how often offsets are committed - see <<committing-offsets>>.

0 commit comments

Comments
 (0)