Skip to content

Commit 90b9f62

Browse files
Andrey Dyomingaryrussell
authored andcommitted
GH-1631: fix missing @sendto for cglib proxies
Resolves #1631 GH-1631: add test Fix imports.
1 parent 43c9527 commit 90b9f62

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,9 @@ private void setupReplyTo(InvocableHandlerMethod handler) {
207207
replyTo = extractSendTo(method.toString(), ann);
208208
}
209209
if (ann == null) {
210-
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
211-
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
210+
Class<?> beanType = handler.getBeanType();
211+
ann = AnnotationUtils.getAnnotation(beanType, SendTo.class);
212+
replyTo = extractSendTo(beanType.getSimpleName(), ann);
212213
}
213214
if (ann != null && replyTo == null) {
214215
replyTo = AdapterUtils.getDefaultReplyTopicExpression();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import javax.validation.ValidationException;
5050
import javax.validation.constraints.Max;
5151

52+
import org.aopalliance.intercept.MethodInterceptor;
53+
import org.aopalliance.intercept.MethodInvocation;
5254
import org.apache.commons.logging.Log;
5355
import org.apache.commons.logging.LogFactory;
5456
import org.apache.kafka.clients.admin.NewTopic;
@@ -67,14 +69,21 @@
6769
import org.junit.jupiter.api.Test;
6870
import org.mockito.Mockito;
6971

72+
import org.springframework.aop.framework.ProxyFactory;
73+
import org.springframework.beans.BeansException;
7074
import org.springframework.beans.factory.ObjectProvider;
7175
import org.springframework.beans.factory.annotation.Autowired;
76+
import org.springframework.beans.factory.config.BeanDefinition;
77+
import org.springframework.beans.factory.config.BeanPostProcessor;
7278
import org.springframework.context.annotation.Bean;
7379
import org.springframework.context.annotation.Configuration;
7480
import org.springframework.context.annotation.Primary;
81+
import org.springframework.context.annotation.Role;
7582
import org.springframework.context.event.EventListener;
7683
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
7784
import org.springframework.core.MethodParameter;
85+
import org.springframework.core.Ordered;
86+
import org.springframework.core.annotation.Order;
7887
import org.springframework.core.convert.converter.Converter;
7988
import org.springframework.data.web.JsonPath;
8089
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -169,7 +178,7 @@
169178
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
170179
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
171180
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
172-
"annotated38", "annotated38reply", "annotated39"})
181+
"annotated38", "annotated38reply", "annotated39", "sentto-in", "sendto-out"})
173182
public class EnableKafkaIntegrationTests {
174183

175184
private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
@@ -1278,6 +1287,13 @@ public IfaceListener<String> ifaceListener() {
12781287
return new IfaceListenerImpl();
12791288
}
12801289

1290+
@Bean
1291+
@Order(Ordered.HIGHEST_PRECEDENCE)
1292+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
1293+
public ProxyListenerPostProcessor proxyListenerPostProcessor() {
1294+
return new ProxyListenerPostProcessor();
1295+
}
1296+
12811297
@Bean
12821298
public MultiListenerBean multiListener() {
12831299
return new MultiListenerBean();
@@ -2012,6 +2028,27 @@ public void registerSeekCallback(ConsumerSeekCallback callback) {
20122028

20132029
}
20142030

2031+
static class ProxyListenerPostProcessor implements BeanPostProcessor {
2032+
2033+
@Override
2034+
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
2035+
if ("multiListenerSendTo".equals(beanName)) {
2036+
ProxyFactory proxyFactory = new ProxyFactory(bean);
2037+
proxyFactory.setProxyTargetClass(true);
2038+
proxyFactory.addAdvice(new MethodInterceptor() {
2039+
@Override
2040+
public Object invoke(MethodInvocation invocation) throws Throwable {
2041+
logger.info(String.format("Proxy listener for %s.$s",
2042+
invocation.getMethod().getDeclaringClass(), invocation.getMethod().getName()));
2043+
return invocation.proceed();
2044+
}
2045+
});
2046+
return proxyFactory.getProxy();
2047+
}
2048+
return bean;
2049+
}
2050+
}
2051+
20152052
public static class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
20162053

20172054
private final CountDownLatch latch1 = new CountDownLatch(10);

0 commit comments

Comments
 (0)