4747import java .util .Objects ;
4848import java .util .stream .Collectors ;
4949
50-
5150@ Configuration
5251public class ExtConsumerResetConfiguration implements ApplicationContextAware , SmartInitializingSingleton {
5352 private static final Logger log = LoggerFactory .getLogger (ExtConsumerResetConfiguration .class );
@@ -61,7 +60,7 @@ public class ExtConsumerResetConfiguration implements ApplicationContextAware, S
6160 private RocketMQMessageConverter rocketMQMessageConverter ;
6261
6362 public ExtConsumerResetConfiguration (RocketMQMessageConverter rocketMQMessageConverter ,
64- ConfigurableEnvironment environment , RocketMQProperties rocketMQProperties ) {
63+ ConfigurableEnvironment environment , RocketMQProperties rocketMQProperties ) {
6564 this .rocketMQMessageConverter = rocketMQMessageConverter ;
6665 this .environment = environment ;
6766 this .rocketMQProperties = rocketMQProperties ;
@@ -75,9 +74,9 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
7574 @ Override
7675 public void afterSingletonsInstantiated () {
7776 Map <String , Object > beans = this .applicationContext
78- .getBeansWithAnnotation (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration .class )
79- .entrySet ().stream ().filter (entry -> !ScopedProxyUtils .isScopedTarget (entry .getKey ()))
80- .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
77+ .getBeansWithAnnotation (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration .class )
78+ .entrySet ().stream ().filter (entry -> !ScopedProxyUtils .isScopedTarget (entry .getKey ()))
79+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
8180 beans .forEach (this ::registerTemplate );
8281 }
8382
@@ -93,20 +92,25 @@ private void registerTemplate(String beanName, Object bean) {
9392
9493 SimpleConsumerBuilder consumerBuilder = null ;
9594 SimpleConsumer simpleConsumer = null ;
95+ SimpleConsumerInfo simpleConsumerInfo = null ;
96+
9697 try {
97- consumerBuilder = createConsumer (annotation );
98- simpleConsumer = consumerBuilder .build ();
98+ final ClientServiceProvider provider = ClientServiceProvider .loadService ();
99+ SimpleConsumerBuilder simpleConsumerBuilder = provider .newSimpleConsumerBuilder ();
100+ simpleConsumerInfo = createConsumer (annotation , simpleConsumerBuilder );
99101 } catch (Exception e ) {
100102 log .error ("Failed to startup SimpleConsumer for RocketMQTemplate {}" , beanName , e );
101103 }
102104 RocketMQClientTemplate rocketMQTemplate = (RocketMQClientTemplate ) bean ;
103105 rocketMQTemplate .setSimpleConsumerBuilder (consumerBuilder );
104106 rocketMQTemplate .setSimpleConsumer (simpleConsumer );
105107 rocketMQTemplate .setMessageConverter (rocketMQMessageConverter .getMessageConverter ());
106- log .info ("Set real simpleConsumer to : {} {}" , beanName , annotation . value () );
108+ log .info ("Set real simpleConsumer {} to {}" , simpleConsumerInfo , beanName );
107109 }
108110
109- private SimpleConsumerBuilder createConsumer (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ) {
111+ private SimpleConsumerInfo createConsumer (
112+ org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ,
113+ SimpleConsumerBuilder simpleConsumerBuilder ) {
110114 RocketMQProperties .SimpleConsumer simpleConsumer = rocketMQProperties .getSimpleConsumer ();
111115 String consumerGroupName = resolvePlaceholders (annotation .consumerGroup (), simpleConsumer .getConsumerGroup ());
112116 String topicName = resolvePlaceholders (annotation .topic (), simpleConsumer .getTopic ());
@@ -121,10 +125,8 @@ private SimpleConsumerBuilder createConsumer(org.apache.rocketmq.client.annotati
121125 Boolean sslEnabled = simpleConsumer .isSslEnabled ();
122126 Assert .hasText (topicName , "[topic] must not be null" );
123127 ClientConfiguration clientConfiguration = RocketMQUtil .createClientConfiguration (accessKey , secretKey , endPoints , requestTimeout , sslEnabled , namespace );
124- final ClientServiceProvider provider = ClientServiceProvider .loadService ();
125128 FilterExpression filterExpression = RocketMQUtil .createFilterExpression (tag , filterExpressionType );
126129 Duration duration = Duration .ofSeconds (awaitDuration );
127- SimpleConsumerBuilder simpleConsumerBuilder = provider .newSimpleConsumerBuilder ();
128130 simpleConsumerBuilder .setClientConfiguration (clientConfiguration );
129131 if (StringUtils .hasLength (consumerGroupName )) {
130132 simpleConsumerBuilder .setConsumerGroup (consumerGroupName );
@@ -133,7 +135,8 @@ private SimpleConsumerBuilder createConsumer(org.apache.rocketmq.client.annotati
133135 if (Objects .nonNull (filterExpression )) {
134136 simpleConsumerBuilder .setSubscriptionExpressions (Collections .singletonMap (topicName , filterExpression ));
135137 }
136- return simpleConsumerBuilder ;
138+
139+ return new SimpleConsumerInfo (consumerGroupName , topicName , endPoints , namespace , tag , filterExpressionType , requestTimeout , awaitDuration , sslEnabled );
137140 }
138141
139142 private String resolvePlaceholders (String text , String defaultValue ) {
@@ -142,12 +145,59 @@ private String resolvePlaceholders(String text, String defaultValue) {
142145 }
143146
144147 private void validate (org .apache .rocketmq .client .annotation .ExtConsumerResetConfiguration annotation ,
145- GenericApplicationContext genericApplicationContext ) {
148+ GenericApplicationContext genericApplicationContext ) {
146149 if (genericApplicationContext .isBeanNameInUse (annotation .value ())) {
147150 throw new BeanDefinitionValidationException (
148- String .format ("Bean {} has been used in Spring Application Context, " +
149- "please check the @ExtRocketMQConsumerConfiguration" ,
150- annotation .value ()));
151+ String .format ("Bean {} has been used in Spring Application Context, " +
152+ "please check the @ExtRocketMQConsumerConfiguration" ,
153+ annotation .value ()));
154+ }
155+ }
156+
157+ static class SimpleConsumerInfo {
158+ String consumerGroup ;
159+
160+ String topicName ;
161+
162+ String endPoints ;
163+
164+ String namespace ;
165+
166+ String tag ;
167+
168+ String filterExpressionType ;
169+
170+ Duration requestTimeout ;
171+
172+ int awaitDuration ;
173+
174+ Boolean sslEnabled ;
175+
176+ public SimpleConsumerInfo (String consumerGroupName , String topicName , String endPoints , String namespace ,
177+ String tag , String filterExpressionType , Duration requestTimeout , int awaitDuration , Boolean sslEnabled ) {
178+ this .consumerGroup = consumerGroupName ;
179+ this .topicName = topicName ;
180+ this .endPoints = endPoints ;
181+ this .namespace = namespace ;
182+ this .tag = tag ;
183+ this .filterExpressionType = filterExpressionType ;
184+ this .requestTimeout = requestTimeout ;
185+ this .awaitDuration = awaitDuration ;
186+ this .sslEnabled = sslEnabled ;
187+ }
188+
189+ @ Override public String toString () {
190+ return "SimpleConsumerInfo{" +
191+ "consumerGroup='" + consumerGroup + '\'' +
192+ ", topicName='" + topicName + '\'' +
193+ ", endPoints='" + endPoints + '\'' +
194+ ", namespace='" + namespace + '\'' +
195+ ", tag='" + tag + '\'' +
196+ ", filterExpressionType='" + filterExpressionType + '\'' +
197+ ", requestTimeout(seconds)=" + requestTimeout .getSeconds () +
198+ ", awaitDuration=" + awaitDuration +
199+ ", sslEnabled=" + sslEnabled +
200+ '}' ;
151201 }
152202 }
153203}
0 commit comments