Skip to content

Commit 8e9f9de

Browse files
Andrey Dyomingaryrussell
authored andcommitted
GH-1631: fix missing @sendto for cglib proxies
Resolves #1631 GH-1631: add test Fix imports.
1 parent c52d736 commit 8e9f9de

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ private void setupReplyTo(InvocableHandlerMethod handler) {
180180
replyTo = extractSendTo(method.toString(), ann);
181181
}
182182
if (ann == null) {
183-
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
184-
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
183+
Class<?> beanType = handler.getBeanType();
184+
ann = AnnotationUtils.getAnnotation(beanType, SendTo.class);
185+
replyTo = extractSendTo(beanType.getSimpleName(), ann);
185186
}
186187
if (ann != null && replyTo == null) {
187188
replyTo = AdapterUtils.getDefaultReplyTopicExpression();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
import javax.validation.ValidationException;
4848
import javax.validation.constraints.Max;
4949

50+
import org.aopalliance.intercept.MethodInterceptor;
51+
import org.aopalliance.intercept.MethodInvocation;
52+
import org.apache.commons.logging.Log;
53+
import org.apache.commons.logging.LogFactory;
5054
import org.apache.kafka.clients.admin.NewTopic;
5155
import org.apache.kafka.clients.consumer.Consumer;
5256
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -62,14 +66,21 @@
6266
import org.junit.jupiter.api.Test;
6367
import org.mockito.Mockito;
6468

69+
import org.springframework.aop.framework.ProxyFactory;
70+
import org.springframework.beans.BeansException;
6571
import org.springframework.beans.factory.ObjectProvider;
6672
import org.springframework.beans.factory.annotation.Autowired;
73+
import org.springframework.beans.factory.config.BeanDefinition;
74+
import org.springframework.beans.factory.config.BeanPostProcessor;
6775
import org.springframework.context.annotation.Bean;
6876
import org.springframework.context.annotation.Configuration;
6977
import org.springframework.context.annotation.Primary;
78+
import org.springframework.context.annotation.Role;
7079
import org.springframework.context.event.EventListener;
7180
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
7281
import org.springframework.core.MethodParameter;
82+
import org.springframework.core.Ordered;
83+
import org.springframework.core.annotation.Order;
7384
import org.springframework.core.convert.converter.Converter;
7485
import org.springframework.data.web.JsonPath;
7586
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -163,6 +174,8 @@
163174
"annotated38", "annotated38reply", "annotated39"})
164175
public class EnableKafkaIntegrationTests {
165176

177+
private static final Log logger = LogFactory.getLog(EnableKafkaIntegrationTests.class);
178+
166179
private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
167180

168181
@Autowired
@@ -1218,6 +1231,13 @@ public IfaceListener<String> ifaceListener() {
12181231
return new IfaceListenerImpl();
12191232
}
12201233

1234+
@Bean
1235+
@Order(Ordered.HIGHEST_PRECEDENCE)
1236+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
1237+
public ProxyListenerPostProcessor proxyListenerPostProcessor() {
1238+
return new ProxyListenerPostProcessor();
1239+
}
1240+
12211241
@Bean
12221242
public MultiListenerBean multiListener() {
12231243
return new MultiListenerBean();
@@ -1936,6 +1956,27 @@ public void registerSeekCallback(ConsumerSeekCallback callback) {
19361956

19371957
}
19381958

1959+
static class ProxyListenerPostProcessor implements BeanPostProcessor {
1960+
1961+
@Override
1962+
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
1963+
if ("multiListenerSendTo".equals(beanName)) {
1964+
ProxyFactory proxyFactory = new ProxyFactory(bean);
1965+
proxyFactory.setProxyTargetClass(true);
1966+
proxyFactory.addAdvice(new MethodInterceptor() {
1967+
@Override
1968+
public Object invoke(MethodInvocation invocation) throws Throwable {
1969+
logger.info(String.format("Proxy listener for %s.$s",
1970+
invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName()));
1971+
return invocation.proceed();
1972+
}
1973+
});
1974+
return proxyFactory.getProxy();
1975+
}
1976+
return bean;
1977+
}
1978+
}
1979+
19391980
public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
19401981

19411982
private final CountDownLatch latch1 = new CountDownLatch(10);

0 commit comments

Comments
 (0)