Skip to content

Commit dca07ac

Browse files
Andrey Dyomingaryrussell
authored andcommitted
GH-1631: fix missing @sendto for cglib proxies
Resolves #1631 GH-1631: add test Fix imports.
1 parent a227594 commit dca07ac

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,9 @@ private void setupReplyTo(InvocableHandlerMethod handler) {
207207
replyTo = extractSendTo(method.toString(), ann);
208208
}
209209
if (ann == null) {
210-
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
211-
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
210+
Class<?> beanType = handler.getBeanType();
211+
ann = AnnotationUtils.getAnnotation(beanType, SendTo.class);
212+
replyTo = extractSendTo(beanType.getSimpleName(), ann);
212213
}
213214
if (ann != null && replyTo == null) {
214215
replyTo = AdapterUtils.getDefaultReplyTopicExpression();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import javax.validation.ValidationException;
5050
import javax.validation.constraints.Max;
5151

52+
import org.aopalliance.intercept.MethodInterceptor;
53+
import org.aopalliance.intercept.MethodInvocation;
5254
import org.apache.commons.logging.Log;
5355
import org.apache.commons.logging.LogFactory;
5456
import org.apache.kafka.clients.admin.NewTopic;
@@ -67,14 +69,21 @@
6769
import org.junit.jupiter.api.Test;
6870
import org.mockito.Mockito;
6971

72+
import org.springframework.aop.framework.ProxyFactory;
73+
import org.springframework.beans.BeansException;
7074
import org.springframework.beans.factory.ObjectProvider;
7175
import org.springframework.beans.factory.annotation.Autowired;
76+
import org.springframework.beans.factory.config.BeanDefinition;
77+
import org.springframework.beans.factory.config.BeanPostProcessor;
7278
import org.springframework.context.annotation.Bean;
7379
import org.springframework.context.annotation.Configuration;
7480
import org.springframework.context.annotation.Primary;
81+
import org.springframework.context.annotation.Role;
7582
import org.springframework.context.event.EventListener;
7683
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
7784
import org.springframework.core.MethodParameter;
85+
import org.springframework.core.Ordered;
86+
import org.springframework.core.annotation.Order;
7887
import org.springframework.core.convert.converter.Converter;
7988
import org.springframework.data.web.JsonPath;
8089
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -168,7 +177,7 @@
168177
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
169178
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
170179
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
171-
"annotated38", "annotated38reply", "annotated39"})
180+
"annotated38", "annotated38reply", "annotated39", "sentto-in", "sendto-out"})
172181
public class EnableKafkaIntegrationTests {
173182

174183
private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
@@ -1277,6 +1286,13 @@ public IfaceListener<String> ifaceListener() {
12771286
return new IfaceListenerImpl();
12781287
}
12791288

1289+
@Bean
1290+
@Order(Ordered.HIGHEST_PRECEDENCE)
1291+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
1292+
public ProxyListenerPostProcessor proxyListenerPostProcessor() {
1293+
return new ProxyListenerPostProcessor();
1294+
}
1295+
12801296
@Bean
12811297
public MultiListenerBean multiListener() {
12821298
return new MultiListenerBean();
@@ -2001,6 +2017,27 @@ public void registerSeekCallback(ConsumerSeekCallback callback) {
20012017

20022018
}
20032019

2020+
static class ProxyListenerPostProcessor implements BeanPostProcessor {
2021+
2022+
@Override
2023+
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
2024+
if ("multiListenerSendTo".equals(beanName)) {
2025+
ProxyFactory proxyFactory = new ProxyFactory(bean);
2026+
proxyFactory.setProxyTargetClass(true);
2027+
proxyFactory.addAdvice(new MethodInterceptor() {
2028+
@Override
2029+
public Object invoke(MethodInvocation invocation) throws Throwable {
2030+
logger.info(String.format("Proxy listener for %s.$s",
2031+
invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName()));
2032+
return invocation.proceed();
2033+
}
2034+
});
2035+
return proxyFactory.getProxy();
2036+
}
2037+
return bean;
2038+
}
2039+
}
2040+
20042041
public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
20052042

20062043
private final CountDownLatch latch1 = new CountDownLatch(10);

0 commit comments

Comments
 (0)