Skip to content

Commit 3b4a92f

Browse files
authored
[ISSUE #9564] fix(spring): robust proxy detection for @RocketMQMessageListener;
* fix: detect @RocketMQMessageListener on proxies; warn on @RefreshScope; add v4 test - v4: Use AopProxyUtils.ultimateTargetClass + AnnotationUtils.findAnnotation in RocketMQMessageListenerBeanPostProcessor to robustly detect annotations on proxied beans. Log WARN (RocketMQUtil style) only for likely listener beans (implement RocketMQListener/ RocketMQReplyListener or have onMessage method) when proxied/scoped/@RefreshScope is present. - v5: Apply the same enhancements to the v5 RocketMQMessageListenerBeanPostProcessor. - tests(v4): Add a scoped-proxy listener test in RocketMQMessageListenerBeanPostProcessorTest to ensure registration path is executed for proxied listeners. Refs: apache/rocketmq#9564 * fix: applied codestyle * chore: added License
1 parent dad3304 commit 3b4a92f

File tree

5 files changed

+223
-33
lines changed

5 files changed

+223
-33
lines changed

rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,25 @@
2121
import java.util.Map;
2222
import java.util.function.BiFunction;
2323
import java.util.stream.Collectors;
24+
import org.apache.rocketmq.logging.org.slf4j.Logger;
25+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2426
import org.apache.rocketmq.spring.support.RocketMQMessageListenerContainerRegistrar;
27+
import org.springframework.aop.framework.AopProxyUtils;
28+
import org.springframework.aop.scope.ScopedProxyUtils;
2529
import org.springframework.aop.support.AopUtils;
2630
import org.springframework.beans.BeansException;
2731
import org.springframework.beans.factory.ObjectProvider;
2832
import org.springframework.beans.factory.config.BeanPostProcessor;
2933
import org.springframework.context.SmartLifecycle;
3034
import org.springframework.core.OrderComparator;
3135
import org.springframework.core.annotation.AnnotationUtils;
36+
import org.springframework.util.ClassUtils;
3237

3338
public class RocketMQMessageListenerBeanPostProcessor implements BeanPostProcessor, SmartLifecycle {
3439

35-
private AnnotationEnhancer enhancer;
40+
private static final Logger log = LoggerFactory.getLogger(RocketMQMessageListenerBeanPostProcessor.class);
41+
42+
private final AnnotationEnhancer enhancer;
3643

3744
private final ObjectProvider<RocketMQMessageListenerContainerRegistrar> registrarObjectProvider;
3845

@@ -61,15 +68,76 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
6168

6269
@Override
6370
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
64-
Class<?> targetClass = AopUtils.getTargetClass(bean);
65-
RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
71+
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
72+
73+
RocketMQMessageListener ann = AnnotationUtils.findAnnotation(targetClass, RocketMQMessageListener.class);
74+
if (ann == null) {
75+
ann = AnnotationUtils.findAnnotation(bean.getClass(), RocketMQMessageListener.class);
76+
}
77+
6678
if (ann != null) {
6779
RocketMQMessageListener enhance = enhance(targetClass, ann);
6880
registrarObjectProvider.ifAvailable(registrar -> registrar.registerContainer(beanName, bean, enhance));
81+
return bean;
6982
}
83+
84+
boolean isAopProxy = AopUtils.isAopProxy(bean);
85+
boolean isScopedTarget = ScopedProxyUtils.isScopedTarget(beanName);
86+
boolean hasRefreshScope = hasRefreshScopeAnnotation(bean.getClass(), targetClass);
87+
boolean likelyListener = isLikelyListenerBean(bean.getClass(), targetClass);
88+
89+
if (likelyListener && (isAopProxy || isScopedTarget || hasRefreshScope)) {
90+
log.warn("[RocketMQ] Bean '{}' (class={}) appears to be proxied or annotated with @RefreshScope. "
91+
+ "@RocketMQMessageListener on proxied beans may not be detected or registered. "
92+
+ "Recommended: do NOT put @RefreshScope on listener classes. "
93+
+ "Instead, extract refreshable configs to a separate @RefreshScope bean and inject it. "
94+
+ "See: https://github.com/apache/rocketmq/issues/9564",
95+
beanName, bean.getClass().getName());
96+
}
97+
7098
return bean;
7199
}
72100

101+
private boolean hasRefreshScopeAnnotation(Class<?> proxyClass, Class<?> targetClass) {
102+
try {
103+
if (ClassUtils.isPresent("org.springframework.cloud.context.config.annotation.RefreshScope",
104+
proxyClass.getClassLoader())) {
105+
Class<?> refreshScopeClass = ClassUtils.forName(
106+
"org.springframework.cloud.context.config.annotation.RefreshScope",
107+
proxyClass.getClassLoader());
108+
return AnnotationUtils.findAnnotation(targetClass, (Class)refreshScopeClass) != null
109+
|| AnnotationUtils.findAnnotation(proxyClass, (Class)refreshScopeClass) != null;
110+
}
111+
}
112+
catch (Throwable ignored) {
113+
// ignore
114+
}
115+
return false;
116+
}
117+
118+
private boolean isLikelyListenerBean(Class<?> proxyClass, Class<?> targetClass) {
119+
try {
120+
Class<?> listener = ClassUtils.forName(
121+
"org.apache.rocketmq.spring.core.RocketMQListener", proxyClass.getClassLoader());
122+
if (listener.isAssignableFrom(proxyClass) || listener.isAssignableFrom(targetClass)) {
123+
return true;
124+
}
125+
}
126+
catch (Throwable ignored) {
127+
}
128+
try {
129+
Class<?> replyListener = ClassUtils.forName(
130+
"org.apache.rocketmq.spring.core.RocketMQReplyListener", proxyClass.getClassLoader());
131+
if (replyListener.isAssignableFrom(proxyClass) || replyListener.isAssignableFrom(targetClass)) {
132+
return true;
133+
}
134+
}
135+
catch (Throwable ignored) {
136+
}
137+
138+
return false;
139+
}
140+
73141
@Override
74142
public int getPhase() {
75143
return Integer.MAX_VALUE - 2000;
@@ -88,15 +156,15 @@ public void stop() {
88156

89157
}
90158

91-
public void setRunning(boolean running) {
92-
this.running = running;
93-
}
94-
95159
@Override
96160
public boolean isRunning() {
97161
return running;
98162
}
99163

164+
public void setRunning(boolean running) {
165+
this.running = running;
166+
}
167+
100168
private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
101169
if (this.enhancer == null) {
102170
return ann;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,15 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
115
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessorTest.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
2626
import org.springframework.context.annotation.Bean;
2727
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.context.annotation.Scope;
29+
import org.springframework.context.annotation.ScopedProxyMode;
2830

2931
import static org.assertj.core.api.Assertions.assertThat;
3032

@@ -34,26 +36,35 @@ public class RocketMQMessageListenerBeanPostProcessorTest {
3436
private static final String TEST_CLASS_SIMPLE_NAME = "Receiver";
3537

3638
private ApplicationContextRunner runner = new ApplicationContextRunner()
37-
.withConfiguration(AutoConfigurations.of(RocketMQAutoConfiguration.class));
39+
.withConfiguration(AutoConfigurations.of(RocketMQAutoConfiguration.class));
3840

3941
@Test
4042
public void testAnnotationEnhancer() {
4143
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876").
42-
withUserConfiguration(TestAnnotationEnhancerConfig.class, TestReceiverConfig.class).
43-
run((context) -> {
44-
// Started container failed. DefaultRocketMQListenerContainer{consumerGroup='Receiver-Custom-Consumer-Group' **
45-
assertThat(context).getFailure().hasMessageContaining("connect to null failed");
46-
});
44+
withUserConfiguration(TestAnnotationEnhancerConfig.class, TestReceiverConfig.class).
45+
run((context) -> {
46+
// Started container failed. DefaultRocketMQListenerContainer{consumerGroup='Receiver-Custom-Consumer-Group' **
47+
assertThat(context).getFailure().hasMessageContaining("connect to null failed");
48+
});
4749

4850
}
4951

52+
@Test
53+
public void testProxiedListenerAnnotationDetected() {
54+
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876")
55+
.withUserConfiguration(TestProxyConfig.class)
56+
.run((context) -> {
57+
assertThat(context).getFailure().hasMessageContaining("connect to");
58+
});
59+
}
60+
5061
@Configuration
5162
static class TestAnnotationEnhancerConfig {
5263
@Bean
5364
public RocketMQMessageListenerBeanPostProcessor.AnnotationEnhancer consumeContainerEnhancer() {
5465
return (attrs, element) -> {
5566
if (element instanceof Class) {
56-
Class targetClass = (Class) element;
67+
Class targetClass = (Class)element;
5768
String classSimpleName = targetClass.getSimpleName();
5869
if (TEST_CLASS_SIMPLE_NAME.equals(classSimpleName)) {
5970
String consumerGroup = "Receiver-Custom-Consumer-Group";
@@ -81,4 +92,20 @@ public void onMessage(Object message) {
8192

8293
}
8394
}
95+
96+
@Configuration
97+
static class TestProxyConfig {
98+
@Bean
99+
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
100+
public Object proxiedReceiverListener() {
101+
return new ProxiedReceiver();
102+
}
103+
}
104+
105+
@RocketMQMessageListener(consumerGroup = "proxy-group", topic = "test-proxy")
106+
static class ProxiedReceiver implements RocketMQListener<Object> {
107+
@Override
108+
public void onMessage(Object message) {
109+
}
110+
}
84111
}

rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@
1616
*/
1717
package org.apache.rocketmq.client.annotation;
1818

19+
import java.lang.reflect.AnnotatedElement;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.function.BiFunction;
23+
import java.util.stream.Collectors;
1924
import org.apache.rocketmq.client.autoconfigure.ListenerContainerConfiguration;
25+
import org.apache.rocketmq.shaded.org.slf4j.Logger;
26+
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
27+
import org.springframework.aop.framework.AopProxyUtils;
28+
import org.springframework.aop.scope.ScopedProxyUtils;
2029
import org.springframework.aop.support.AopUtils;
2130
import org.springframework.beans.BeansException;
2231
import org.springframework.beans.factory.InitializingBean;
@@ -26,15 +35,12 @@
2635
import org.springframework.context.SmartLifecycle;
2736
import org.springframework.core.OrderComparator;
2837
import org.springframework.core.annotation.AnnotationUtils;
29-
30-
import java.lang.reflect.AnnotatedElement;
31-
import java.util.List;
32-
import java.util.Map;
33-
import java.util.function.BiFunction;
34-
import java.util.stream.Collectors;
38+
import org.springframework.util.ClassUtils;
3539

3640
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {
3741

42+
private static final Logger log = LoggerFactory.getLogger(RocketMQMessageListenerBeanPostProcessor.class);
43+
3844
private ApplicationContext applicationContext;
3945

4046
private AnnotationEnhancer enhancer;
@@ -50,17 +56,78 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
5056

5157
@Override
5258
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
53-
Class<?> targetClass = AopUtils.getTargetClass(bean);
54-
RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
59+
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
60+
61+
RocketMQMessageListener ann = AnnotationUtils.findAnnotation(targetClass, RocketMQMessageListener.class);
62+
if (ann == null) {
63+
ann = AnnotationUtils.findAnnotation(bean.getClass(), RocketMQMessageListener.class);
64+
}
65+
5566
if (ann != null) {
5667
RocketMQMessageListener enhance = enhance(targetClass, ann);
5768
if (listenerContainerConfiguration != null) {
5869
listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
5970
}
71+
return bean;
72+
}
73+
74+
boolean isAopProxy = AopUtils.isAopProxy(bean);
75+
boolean isScopedTarget = ScopedProxyUtils.isScopedTarget(beanName);
76+
boolean hasRefreshScope = hasRefreshScopeAnnotation(bean.getClass(), targetClass);
77+
boolean likelyListener = isLikelyListenerBean(bean.getClass(), targetClass);
78+
79+
if (likelyListener && (isAopProxy || isScopedTarget || hasRefreshScope)) {
80+
log.warn("[RocketMQ] Bean '{}' (class={}) appears to be proxied or annotated with @RefreshScope. "
81+
+ "@RocketMQMessageListener on proxied beans may not be detected or registered. "
82+
+ "Recommended: do NOT put @RefreshScope on listener classes. "
83+
+ "Instead, extract refreshable configs to a separate @RefreshScope bean and inject it. "
84+
+ "See: https://github.com/apache/rocketmq/issues/9564",
85+
beanName, bean.getClass().getName());
6086
}
87+
6188
return bean;
6289
}
6390

91+
private boolean hasRefreshScopeAnnotation(Class<?> proxyClass, Class<?> targetClass) {
92+
try {
93+
if (ClassUtils.isPresent("org.springframework.cloud.context.config.annotation.RefreshScope",
94+
proxyClass.getClassLoader())) {
95+
Class<?> refreshScopeClass = ClassUtils.forName(
96+
"org.springframework.cloud.context.config.annotation.RefreshScope",
97+
proxyClass.getClassLoader());
98+
return AnnotationUtils.findAnnotation(targetClass, (Class)refreshScopeClass) != null
99+
|| AnnotationUtils.findAnnotation(proxyClass, (Class)refreshScopeClass) != null;
100+
}
101+
}
102+
catch (Throwable ignored) {
103+
// ignore
104+
}
105+
return false;
106+
}
107+
108+
private boolean isLikelyListenerBean(Class<?> proxyClass, Class<?> targetClass) {
109+
try {
110+
Class<?> listener = ClassUtils.forName(
111+
"org.apache.rocketmq.spring.core.RocketMQListener", proxyClass.getClassLoader());
112+
if (listener.isAssignableFrom(proxyClass) || listener.isAssignableFrom(targetClass)) {
113+
return true;
114+
}
115+
}
116+
catch (Throwable ignored) {
117+
}
118+
try {
119+
Class<?> replyListener = ClassUtils.forName(
120+
"org.apache.rocketmq.spring.core.RocketMQReplyListener", proxyClass.getClassLoader());
121+
if (replyListener.isAssignableFrom(proxyClass) || replyListener.isAssignableFrom(targetClass)) {
122+
return true;
123+
}
124+
}
125+
catch (Throwable ignored) {
126+
}
127+
128+
return false;
129+
}
130+
64131
@Override
65132
public int getPhase() {
66133
return Integer.MAX_VALUE - 2000;
@@ -79,16 +146,15 @@ public void stop() {
79146

80147
}
81148

82-
public void setRunning(boolean running) {
83-
this.running = running;
84-
}
85-
86-
87149
@Override
88150
public boolean isRunning() {
89151
return running;
90152
}
91153

154+
public void setRunning(boolean running) {
155+
this.running = running;
156+
}
157+
92158
@Override
93159
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
94160
this.applicationContext = applicationContext;
@@ -103,12 +169,12 @@ public void afterPropertiesSet() throws Exception {
103169
private void buildEnhancer() {
104170
if (this.applicationContext != null) {
105171
Map<String, AnnotationEnhancer> enhancersMap =
106-
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
172+
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
107173
if (enhancersMap.size() > 0) {
108174
List<AnnotationEnhancer> enhancers = enhancersMap.values()
109-
.stream()
110-
.sorted(new OrderComparator())
111-
.collect(Collectors.toList());
175+
.stream()
176+
.sorted(new OrderComparator())
177+
.collect(Collectors.toList());
112178
this.enhancer = (attrs, element) -> {
113179
Map<String, Object> newAttrs = attrs;
114180
for (AnnotationEnhancer enh : enhancers) {
@@ -123,9 +189,10 @@ private void buildEnhancer() {
123189
private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
124190
if (this.enhancer == null) {
125191
return ann;
126-
} else {
192+
}
193+
else {
127194
return AnnotationUtils.synthesizeAnnotation(
128-
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
195+
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
129196
}
130197
}
131198

0 commit comments

Comments
 (0)